21254: Fix racy keep-balance test.
[arvados.git] / services / keep-balance / server.go
index 23e597c89e63e384aef6ab9dff4ea297f74e76c3..7a59c1e8c0edace3a8cb7637c6759a4962d44a99 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepbalance
 
 import (
        "context"
@@ -12,11 +12,9 @@ import (
        "syscall"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/auth"
-       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
-       "github.com/julienschmidt/httprouter"
-       "github.com/prometheus/client_golang/prometheus/promhttp"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/jmoiron/sqlx"
        "github.com/sirupsen/logrus"
 )
 
@@ -28,11 +26,11 @@ import (
 //
 // RunOptions fields are controlled by command line flags.
 type RunOptions struct {
-       Once        bool
-       CommitPulls bool
-       CommitTrash bool
-       Logger      logrus.FieldLogger
-       Dumper      logrus.FieldLogger
+       Once                  bool
+       CommitConfirmedFields bool
+       ChunkPrefix           string
+       Logger                logrus.FieldLogger
+       Dumper                logrus.FieldLogger
 
        // SafeRendezvousState from the most recent balance operation,
        // or "" if unknown. If this changes from one run to the next,
@@ -42,98 +40,85 @@ type RunOptions struct {
 }
 
 type Server struct {
+       http.Handler
+
        Cluster    *arvados.Cluster
        ArvClient  *arvados.Client
        RunOptions RunOptions
        Metrics    *metrics
 
-       httpHandler http.Handler
-
        Logger logrus.FieldLogger
        Dumper logrus.FieldLogger
-}
 
-// ServeHTTP implements service.Handler.
-func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-       srv.httpHandler.ServeHTTP(w, r)
+       DB *sqlx.DB
 }
 
 // CheckHealth implements service.Handler.
 func (srv *Server) CheckHealth() error {
-       return nil
+       return srv.DB.Ping()
 }
 
-// Start sets up and runs the balancer.
-func (srv *Server) Start(ctx context.Context) {
-       srv.init(ctx)
+// Done implements service.Handler.
+func (srv *Server) Done() <-chan struct{} {
+       return nil
+}
 
+func (srv *Server) run(ctx context.Context) {
        var err error
        if srv.RunOptions.Once {
-               _, err = srv.runOnce()
+               _, err = srv.runOnce(ctx)
        } else {
-               err = srv.runForever(nil)
+               err = srv.runForever(ctx)
        }
        if err != nil {
                srv.Logger.Error(err)
-       }
-}
-
-func (srv *Server) init(ctx context.Context) {
-       if srv.RunOptions.Logger == nil {
-               srv.RunOptions.Logger = ctxlog.FromContext(ctx)
-       }
-
-       srv.Logger = srv.RunOptions.Logger
-       srv.Dumper = srv.RunOptions.Dumper
-
-       if srv.Cluster.ManagementToken == "" {
-               srv.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-                       http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
-               })
+               os.Exit(1)
        } else {
-               mux := httprouter.New()
-               metricsH := promhttp.HandlerFor(srv.Metrics.reg, promhttp.HandlerOpts{
-                       ErrorLog: srv.Logger,
-               })
-               mux.Handler("GET", "/metrics", metricsH)
-               mux.Handler("GET", "/metrics.json", metricsH)
-               srv.httpHandler = auth.RequireLiteralToken(srv.Cluster.ManagementToken, mux)
+               os.Exit(0)
        }
 }
 
-func (srv *Server) runOnce() (*Balancer, error) {
+func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
        bal := &Balancer{
+               DB:             srv.DB,
                Logger:         srv.Logger,
                Dumper:         srv.Dumper,
                Metrics:        srv.Metrics,
                LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
+               ChunkPrefix:    srv.RunOptions.ChunkPrefix,
        }
        var err error
-       srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
+       srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
        return bal, err
 }
 
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) runForever(stop <-chan interface{}) error {
+// RunForever runs forever, or until ctx is cancelled.
+func (srv *Server) runForever(ctx context.Context) error {
        logger := srv.Logger
 
        ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
 
-       // The unbuffered channel here means we only hear SIGUSR1 if
-       // it arrives while we're waiting in select{}.
-       sigUSR1 := make(chan os.Signal)
+       sigUSR1 := make(chan os.Signal, 1)
        signal.Notify(sigUSR1, syscall.SIGUSR1)
+       defer signal.Stop(sigUSR1)
+
+       logger.Info("acquiring service lock")
+       dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
+       defer dblock.KeepBalanceService.Unlock()
 
        logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
 
        for {
-               if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
+               if srv.Cluster.Collections.BalancePullLimit < 1 && srv.Cluster.Collections.BalanceTrashLimit < 1 {
                        logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
-                       logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
+                       logger.Print("=======  To commit changes, set BalancePullLimit and BalanceTrashLimit values greater than zero.")
                }
 
-               _, err := srv.runOnce()
+               if !dblock.KeepBalanceService.Check() {
+                       // context canceled
+                       return nil
+               }
+               _, err := srv.runOnce(ctx)
                if err != nil {
                        logger.Print("run failed: ", err)
                } else {
@@ -141,8 +126,7 @@ func (srv *Server) runForever(stop <-chan interface{}) error {
                }
 
                select {
-               case <-stop:
-                       signal.Stop(sigUSR1)
+               case <-ctx.Done():
                        return nil
                case <-ticker.C:
                        logger.Print("timer went off")
@@ -151,8 +135,7 @@ func (srv *Server) runForever(stop <-chan interface{}) error {
                        // Reset the timer so we don't start the N+1st
                        // run too soon after the Nth run is triggered
                        // by SIGUSR1.
-                       ticker.Stop()
-                       ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
+                       ticker.Reset(time.Duration(srv.Cluster.Collections.BalancePeriod))
                }
                logger.Print("starting next run")
        }