Skip to content

Commit

Permalink
address code comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vpatelsj committed Feb 8, 2025
1 parent 475fe8a commit 1ae39d4
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
8 changes: 7 additions & 1 deletion ingestor/runner/shutdown/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"os"
"time"

"github.com/Azure/adx-mon/ingestor"
"github.com/Azure/adx-mon/pkg/logger"
Expand All @@ -16,6 +17,7 @@ const (
namespace = "adx-mon"
SHUTDOWN_COMPLETED = "shutdown-completed"
SHUTDOWN_REQUESTED = "shutdown-requested"
shutdownTimeout = 5 * time.Minute
)

type ShutDownRunner struct {
Expand Down Expand Up @@ -52,9 +54,13 @@ func (r *ShutDownRunner) Run(ctx context.Context) error {
if err := r.httpServer.Close(); err != nil {
return fmt.Errorf("failed to close http server: %v", err)
}
if err := r.service.Shutdown(ctx); err != nil {
timeoutCtx, cancel := context.WithTimeout(ctx, shutdownTimeout)
defer cancel()

if err := r.service.Shutdown(timeoutCtx); err != nil {
return fmt.Errorf("failed to shutdown the service: %v", err)
}

logger.Infof("Service shutdown completed")
//set shutdown-completed annotation
pod.Annotations[SHUTDOWN_COMPLETED] = "true"
Expand Down
10 changes: 2 additions & 8 deletions ingestor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
shutdownTimeout = 5 * time.Minute
)

// invalidEntityCharacters is a regex that matches invalid characters for Kusto entities and segment files.
// This is a subset of the invalid characters for Kusto entities and segment files naming patterns. This should
// match tranform.Normalize.
Expand Down Expand Up @@ -414,8 +410,6 @@ func (s *Service) UploadSegments(ctx context.Context) error {
logger.Infof("Waiting for upload queue to drain, %d batches remaining", len(s.uploader.UploadQueue()))
logger.Infof("Waiting for transfer queue to drain, %d batches remaining", len(s.replicator.TransferQueue()))

timeoutCtx, cancel := context.WithTimeout(ctx, shutdownTimeout)
defer cancel()
t := time.NewTicker(time.Second)
defer t.Stop()

Expand All @@ -432,8 +426,8 @@ func (s *Service) UploadSegments(ctx context.Context) error {
if len(s.replicator.TransferQueue()) != 0 {
logger.Infof("Waiting for transfer queue to drain, %d batches remaining", len(s.replicator.TransferQueue()))
}
case <-timeoutCtx.Done():
return fmt.Errorf("failed to upload segments after %v", shutdownTimeout)
case <-ctx.Done():
return fmt.Errorf("timed out to upload segments")
}
}
}
Expand Down

0 comments on commit 1ae39d4

Please sign in to comment.