Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / services / keep-balance / server.go
index e485f5b2061f28134306d1d897b22cb62e4190e9..fd53497f789ed4f5f1db458f99e69f8e7f10c1a7 100644 (file)
@@ -5,12 +5,14 @@
 package keepbalance
 
 import (
+       "context"
        "net/http"
        "os"
        "os/signal"
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/controller/dblock"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/jmoiron/sqlx"
        "github.com/sirupsen/logrus"
@@ -62,12 +64,12 @@ func (srv *Server) Done() <-chan struct{} {
        return nil
 }
 
-func (srv *Server) run() {
+func (srv *Server) run(ctx context.Context) {
        var err error
        if srv.RunOptions.Once {
-               _, err = srv.runOnce()
+               _, err = srv.runOnce(ctx)
        } else {
-               err = srv.runForever(nil)
+               err = srv.runForever(ctx)
        }
        if err != nil {
                srv.Logger.Error(err)
@@ -77,7 +79,7 @@ func (srv *Server) run() {
        }
 }
 
-func (srv *Server) runOnce() (*Balancer, error) {
+func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
        bal := &Balancer{
                DB:             srv.DB,
                Logger:         srv.Logger,
@@ -86,13 +88,12 @@ func (srv *Server) runOnce() (*Balancer, error) {
                LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
        }
        var err error
-       srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
+       srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
        return bal, err
 }
 
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) runForever(stop <-chan interface{}) error {
+// RunForever runs forever, or until ctx is cancelled.
+func (srv *Server) runForever(ctx context.Context) error {
        logger := srv.Logger
 
        ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
@@ -102,6 +103,10 @@ func (srv *Server) runForever(stop <-chan interface{}) error {
        sigUSR1 := make(chan os.Signal)
        signal.Notify(sigUSR1, syscall.SIGUSR1)
 
+       logger.Info("acquiring service lock")
+       dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
+       defer dblock.KeepBalanceService.Unlock()
+
        logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
 
        for {
@@ -110,7 +115,11 @@ func (srv *Server) runForever(stop <-chan interface{}) error {
                        logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
                }
 
-               _, err := srv.runOnce()
+               if !dblock.KeepBalanceService.Check() {
+                       // context canceled
+                       return nil
+               }
+               _, err := srv.runOnce(ctx)
                if err != nil {
                        logger.Print("run failed: ", err)
                } else {
@@ -118,7 +127,7 @@ func (srv *Server) runForever(stop <-chan interface{}) error {
                }
 
                select {
-               case <-stop:
+               case <-ctx.Done():
                        signal.Stop(sigUSR1)
                        return nil
                case <-ticker.C: