X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4620478d694697eff07e501187d784c6c98ccfa9..07baa0ed049746514495d1648c1aef0c40545141:/services/keep-balance/server.go diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go index 613a2f7d3c..fd53497f78 100644 --- a/services/keep-balance/server.go +++ b/services/keep-balance/server.go @@ -2,63 +2,22 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepbalance import ( - "fmt" + "context" "net/http" "os" "os/signal" "syscall" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/auth" - "git.curoverse.com/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/lib/controller/dblock" + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" ) -var version = "dev" - -const ( - defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml" - rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" -) - -// Config specifies site configuration, like API credentials and the -// choice of which servers are to be balanced. -// -// Config is loaded from a JSON config file (see usage()). -type Config struct { - // Arvados API endpoint and credentials. - Client arvados.Client - - // List of service types (e.g., "disk") to balance. - KeepServiceTypes []string - - KeepServiceList arvados.KeepServiceList - - // address, address:port, or :port for management interface - Listen string - - // token for management APIs - ManagementToken string - - // How often to check - RunPeriod arvados.Duration - - // Number of collections to request in each API call - CollectionBatchSize int - - // Max collections to buffer in memory (bigger values consume - // more memory, but can reduce store-and-forward latency when - // fetching pages) - CollectionBuffers int - - // Timeout for outgoing http request/response cycle. - RequestTimeout arvados.Duration -} - // RunOptions controls runtime behavior. The flags/options that belong // here are the ones that are useful for interactive use. For example, // "CommitTrash" is a runtime option rather than a config item because @@ -67,11 +26,12 @@ type Config struct { // // RunOptions fields are controlled by command line flags. type RunOptions struct { - Once bool - CommitPulls bool - CommitTrash bool - Logger *logrus.Logger - Dumper *logrus.Logger + Once bool + CommitPulls bool + CommitTrash bool + CommitConfirmedFields bool + Logger logrus.FieldLogger + Dumper logrus.FieldLogger // SafeRendezvousState from the most recent balance operation, // or "" if unknown. If this changes from one run to the next, @@ -81,97 +41,85 @@ type RunOptions struct { } type Server struct { - config Config - runOptions RunOptions - metrics *metrics - listening string // for tests + http.Handler - Logger *logrus.Logger - Dumper *logrus.Logger -} + Cluster *arvados.Cluster + ArvClient *arvados.Client + RunOptions RunOptions + Metrics *metrics -// NewServer returns a new Server that runs Balancers using the given -// config and runOptions. -func NewServer(config Config, runOptions RunOptions) (*Server, error) { - if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil { - return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config") - } - if !runOptions.Once && config.RunPeriod == arvados.Duration(0) { - return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config") - } + Logger logrus.FieldLogger + Dumper logrus.FieldLogger - if runOptions.Logger == nil { - log := logrus.New() - log.Formatter = &logrus.JSONFormatter{ - TimestampFormat: rfc3339NanoFixed, - } - log.Out = os.Stderr - runOptions.Logger = log - } + DB *sqlx.DB +} - srv := &Server{ - config: config, - runOptions: runOptions, - metrics: newMetrics(), - Logger: runOptions.Logger, - Dumper: runOptions.Dumper, - } - return srv, srv.start() +// CheckHealth implements service.Handler. +func (srv *Server) CheckHealth() error { + return srv.DB.Ping() } -func (srv *Server) start() error { - if srv.config.Listen == "" { - return nil - } - server := &httpserver.Server{ - Server: http.Server{ - Handler: httpserver.LogRequests(srv.Logger, - auth.RequireLiteralToken(srv.config.ManagementToken, - srv.metrics.Handler(srv.Logger))), - }, - Addr: srv.config.Listen, +// Done implements service.Handler. +func (srv *Server) Done() <-chan struct{} { + return nil +} + +func (srv *Server) run(ctx context.Context) { + var err error + if srv.RunOptions.Once { + _, err = srv.runOnce(ctx) + } else { + err = srv.runForever(ctx) } - err := server.Start() if err != nil { - return err + srv.Logger.Error(err) + os.Exit(1) + } else { + os.Exit(0) } - srv.Logger.Printf("listening at %s", server.Addr) - srv.listening = server.Addr - return nil } -func (srv *Server) Run() (*Balancer, error) { +func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) { bal := &Balancer{ - Logger: srv.Logger, - Dumper: srv.Dumper, - Metrics: srv.metrics, + DB: srv.DB, + Logger: srv.Logger, + Dumper: srv.Dumper, + Metrics: srv.Metrics, + LostBlocksFile: srv.Cluster.Collections.BlobMissingReport, } var err error - srv.runOptions, err = bal.Run(srv.config, srv.runOptions) + srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions) return bal, err } -// RunForever runs forever, or (for testing purposes) until the given -// stop channel is ready to receive. -func (srv *Server) RunForever(stop <-chan interface{}) error { - logger := srv.runOptions.Logger +// RunForever runs forever, or until ctx is cancelled. +func (srv *Server) runForever(ctx context.Context) error { + logger := srv.Logger - ticker := time.NewTicker(time.Duration(srv.config.RunPeriod)) + ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod)) // The unbuffered channel here means we only hear SIGUSR1 if // it arrives while we're waiting in select{}. sigUSR1 := make(chan os.Signal) signal.Notify(sigUSR1, syscall.SIGUSR1) - logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod) + logger.Info("acquiring service lock") + dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil }) + defer dblock.KeepBalanceService.Unlock() + + logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod) for { - if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash { + if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash { logger.Print("WARNING: Will scan periodically, but no changes will be committed.") logger.Print("======= Consider using -commit-pulls and -commit-trash flags.") } - _, err := srv.Run() + if !dblock.KeepBalanceService.Check() { + // context canceled + return nil + } + _, err := srv.runOnce(ctx) if err != nil { logger.Print("run failed: ", err) } else { @@ -179,7 +127,7 @@ func (srv *Server) RunForever(stop <-chan interface{}) error { } select { - case <-stop: + case <-ctx.Done(): signal.Stop(sigUSR1) return nil case <-ticker.C: @@ -190,7 +138,7 @@ func (srv *Server) RunForever(stop <-chan interface{}) error { // run too soon after the Nth run is triggered // by SIGUSR1. ticker.Stop() - ticker = time.NewTicker(time.Duration(srv.config.RunPeriod)) + ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod)) } logger.Print("starting next run") }