X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5c0d99ac1c8371910676b6375282a4fcfb5d213d..07baa0ed049746514495d1648c1aef0c40545141:/services/keep-balance/server.go diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go index e485f5b206..fd53497f78 100644 --- a/services/keep-balance/server.go +++ b/services/keep-balance/server.go @@ -5,12 +5,14 @@ package keepbalance import ( + "context" "net/http" "os" "os/signal" "syscall" "time" + "git.arvados.org/arvados.git/lib/controller/dblock" "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" @@ -62,12 +64,12 @@ func (srv *Server) Done() <-chan struct{} { return nil } -func (srv *Server) run() { +func (srv *Server) run(ctx context.Context) { var err error if srv.RunOptions.Once { - _, err = srv.runOnce() + _, err = srv.runOnce(ctx) } else { - err = srv.runForever(nil) + err = srv.runForever(ctx) } if err != nil { srv.Logger.Error(err) @@ -77,7 +79,7 @@ func (srv *Server) run() { } } -func (srv *Server) runOnce() (*Balancer, error) { +func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) { bal := &Balancer{ DB: srv.DB, Logger: srv.Logger, @@ -86,13 +88,12 @@ func (srv *Server) runOnce() (*Balancer, error) { LostBlocksFile: srv.Cluster.Collections.BlobMissingReport, } var err error - srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions) + srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions) return bal, err } -// RunForever runs forever, or (for testing purposes) until the given -// stop channel is ready to receive. -func (srv *Server) runForever(stop <-chan interface{}) error { +// RunForever runs forever, or until ctx is cancelled. +func (srv *Server) runForever(ctx context.Context) error { logger := srv.Logger ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod)) @@ -102,6 +103,10 @@ func (srv *Server) runForever(stop <-chan interface{}) error { sigUSR1 := make(chan os.Signal) signal.Notify(sigUSR1, syscall.SIGUSR1) + logger.Info("acquiring service lock") + dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil }) + defer dblock.KeepBalanceService.Unlock() + logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod) for { @@ -110,7 +115,11 @@ func (srv *Server) runForever(stop <-chan interface{}) error { logger.Print("======= Consider using -commit-pulls and -commit-trash flags.") } - _, err := srv.runOnce() + if !dblock.KeepBalanceService.Check() { + // context canceled + return nil + } + _, err := srv.runOnce(ctx) if err != nil { logger.Print("run failed: ", err) } else { @@ -118,7 +127,7 @@ func (srv *Server) runForever(stop <-chan interface{}) error { } select { - case <-stop: + case <-ctx.Done(): signal.Stop(sigUSR1) return nil case <-ticker.C: