Merge branch 'main' into 19582-aws-s3v2-driver
[arvados.git] / services / keep-balance / server.go
index ad13be7511d55de4f334e23f4838f6886fa91694..fd53497f789ed4f5f1db458f99e69f8e7f10c1a7 100644 (file)
@@ -2,63 +2,22 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepbalance
 
 import (
-       "fmt"
+       "context"
        "net/http"
        "os"
        "os/signal"
        "syscall"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/auth"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/jmoiron/sqlx"
+       "github.com/sirupsen/logrus"
 )
 
-var version = "dev"
-
-const (
-       defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
-       rfc3339NanoFixed  = "2006-01-02T15:04:05.000000000Z07:00"
-)
-
-// Config specifies site configuration, like API credentials and the
-// choice of which servers are to be balanced.
-//
-// Config is loaded from a JSON config file (see usage()).
-type Config struct {
-       // Arvados API endpoint and credentials.
-       Client arvados.Client
-
-       // List of service types (e.g., "disk") to balance.
-       KeepServiceTypes []string
-
-       KeepServiceList arvados.KeepServiceList
-
-       // address, address:port, or :port for management interface
-       Listen string
-
-       // token for management APIs
-       ManagementToken string
-
-       // How often to check
-       RunPeriod arvados.Duration
-
-       // Number of collections to request in each API call
-       CollectionBatchSize int
-
-       // Max collections to buffer in memory (bigger values consume
-       // more memory, but can reduce store-and-forward latency when
-       // fetching pages)
-       CollectionBuffers int
-
-       // Timeout for outgoing http request/response cycle.
-       RequestTimeout arvados.Duration
-}
-
 // RunOptions controls runtime behavior. The flags/options that belong
 // here are the ones that are useful for interactive use. For example,
 // "CommitTrash" is a runtime option rather than a config item because
@@ -67,11 +26,12 @@ type Config struct {
 //
 // RunOptions fields are controlled by command line flags.
 type RunOptions struct {
-       Once        bool
-       CommitPulls bool
-       CommitTrash bool
-       Logger      *logrus.Logger
-       Dumper      *logrus.Logger
+       Once                  bool
+       CommitPulls           bool
+       CommitTrash           bool
+       CommitConfirmedFields bool
+       Logger                logrus.FieldLogger
+       Dumper                logrus.FieldLogger
 
        // SafeRendezvousState from the most recent balance operation,
        // or "" if unknown. If this changes from one run to the next,
@@ -81,97 +41,85 @@ type RunOptions struct {
 }
 
 type Server struct {
-       config     Config
-       runOptions RunOptions
-       metrics    *metrics
-       listening  string // for tests
+       http.Handler
 
-       Logger *logrus.Logger
-       Dumper *logrus.Logger
-}
+       Cluster    *arvados.Cluster
+       ArvClient  *arvados.Client
+       RunOptions RunOptions
+       Metrics    *metrics
 
-// NewServer returns a new Server that runs Balancers using the given
-// config and runOptions.
-func NewServer(config Config, runOptions RunOptions) (*Server, error) {
-       if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
-               return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
-       }
-       if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
-               return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
-       }
+       Logger logrus.FieldLogger
+       Dumper logrus.FieldLogger
 
-       if runOptions.Logger == nil {
-               log := logrus.New()
-               log.Formatter = &logrus.JSONFormatter{
-                       TimestampFormat: rfc3339NanoFixed,
-               }
-               log.Out = os.Stderr
-               runOptions.Logger = log
-       }
+       DB *sqlx.DB
+}
 
-       srv := &Server{
-               config:     config,
-               runOptions: runOptions,
-               metrics:    newMetrics(),
-               Logger:     runOptions.Logger,
-               Dumper:     runOptions.Dumper,
-       }
-       return srv, srv.start()
+// CheckHealth implements service.Handler.
+func (srv *Server) CheckHealth() error {
+       return srv.DB.Ping()
 }
 
-func (srv *Server) start() error {
-       if srv.config.Listen == "" {
-               return nil
-       }
-       server := &httpserver.Server{
-               Server: http.Server{
-                       Handler: httpserver.LogRequests(srv.Logger,
-                               auth.RequireLiteralToken(srv.config.ManagementToken,
-                                       srv.metrics.Handler(srv.Logger))),
-               },
-               Addr: srv.config.Listen,
+// Done implements service.Handler.
+func (srv *Server) Done() <-chan struct{} {
+       return nil
+}
+
+func (srv *Server) run(ctx context.Context) {
+       var err error
+       if srv.RunOptions.Once {
+               _, err = srv.runOnce(ctx)
+       } else {
+               err = srv.runForever(ctx)
        }
-       err := server.Start()
        if err != nil {
-               return err
+               srv.Logger.Error(err)
+               os.Exit(1)
+       } else {
+               os.Exit(0)
        }
-       srv.Logger.Printf("listening at %s", server.Addr)
-       srv.listening = server.Addr
-       return nil
 }
 
-func (srv *Server) Run() (*Balancer, error) {
+func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
        bal := &Balancer{
-               Logger:  srv.Logger,
-               Dumper:  srv.Dumper,
-               Metrics: srv.metrics,
+               DB:             srv.DB,
+               Logger:         srv.Logger,
+               Dumper:         srv.Dumper,
+               Metrics:        srv.Metrics,
+               LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
        }
        var err error
-       srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
+       srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
        return bal, err
 }
 
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) RunForever(stop <-chan interface{}) error {
-       logger := srv.runOptions.Logger
+// RunForever runs forever, or until ctx is cancelled.
+func (srv *Server) runForever(ctx context.Context) error {
+       logger := srv.Logger
 
-       ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
+       ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
 
        // The unbuffered channel here means we only hear SIGUSR1 if
        // it arrives while we're waiting in select{}.
        sigUSR1 := make(chan os.Signal)
        signal.Notify(sigUSR1, syscall.SIGUSR1)
 
-       logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
+       logger.Info("acquiring service lock")
+       dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
+       defer dblock.KeepBalanceService.Unlock()
+
+       logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
 
        for {
-               if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
+               if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
                        logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
                        logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
                }
 
-               _, err := srv.Run()
+               if !dblock.KeepBalanceService.Check() {
+                       // context canceled
+                       return nil
+               }
+               _, err := srv.runOnce(ctx)
                if err != nil {
                        logger.Print("run failed: ", err)
                } else {
@@ -179,7 +127,7 @@ func (srv *Server) RunForever(stop <-chan interface{}) error {
                }
 
                select {
-               case <-stop:
+               case <-ctx.Done():
                        signal.Stop(sigUSR1)
                        return nil
                case <-ticker.C:
@@ -190,7 +138,7 @@ func (srv *Server) RunForever(stop <-chan interface{}) error {
                        // run too soon after the Nth run is triggered
                        // by SIGUSR1.
                        ticker.Stop()
-                       ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
+                       ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
                }
                logger.Print("starting next run")
        }