X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/da79c28c17eb5de3196ab49718b86c3f73db2380..9251549928dd5206d4a14e5f9811caa66aa64c65:/services/keep-balance/server.go diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go index 0f4bb7176a..7a59c1e8c0 100644 --- a/services/keep-balance/server.go +++ b/services/keep-balance/server.go @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepbalance import ( "context" @@ -12,8 +12,9 @@ import ( "syscall" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/lib/controller/dblock" + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" ) @@ -25,11 +26,11 @@ import ( // // RunOptions fields are controlled by command line flags. type RunOptions struct { - Once bool - CommitPulls bool - CommitTrash bool - Logger logrus.FieldLogger - Dumper logrus.FieldLogger + Once bool + CommitConfirmedFields bool + ChunkPrefix string + Logger logrus.FieldLogger + Dumper logrus.FieldLogger // SafeRendezvousState from the most recent balance operation, // or "" if unknown. If this changes from one run to the next, @@ -40,6 +41,7 @@ type RunOptions struct { type Server struct { http.Handler + Cluster *arvados.Cluster ArvClient *arvados.Client RunOptions RunOptions @@ -47,66 +49,76 @@ type Server struct { Logger logrus.FieldLogger Dumper logrus.FieldLogger + + DB *sqlx.DB } // CheckHealth implements service.Handler. func (srv *Server) CheckHealth() error { - return nil + return srv.DB.Ping() } -// Start sets up and runs the balancer. -func (srv *Server) Start(ctx context.Context) { - if srv.RunOptions.Logger == nil { - srv.RunOptions.Logger = ctxlog.FromContext(ctx) - } - - srv.Logger = srv.RunOptions.Logger - srv.Dumper = srv.RunOptions.Dumper +// Done implements service.Handler. +func (srv *Server) Done() <-chan struct{} { + return nil +} +func (srv *Server) run(ctx context.Context) { var err error if srv.RunOptions.Once { - _, err = srv.run() + _, err = srv.runOnce(ctx) } else { - err = srv.runForever(nil) + err = srv.runForever(ctx) } if err != nil { srv.Logger.Error(err) + os.Exit(1) + } else { + os.Exit(0) } } -func (srv *Server) run() (*Balancer, error) { +func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) { bal := &Balancer{ + DB: srv.DB, Logger: srv.Logger, Dumper: srv.Dumper, Metrics: srv.Metrics, LostBlocksFile: srv.Cluster.Collections.BlobMissingReport, + ChunkPrefix: srv.RunOptions.ChunkPrefix, } var err error - srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions) + srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions) return bal, err } -// RunForever runs forever, or (for testing purposes) until the given -// stop channel is ready to receive. -func (srv *Server) runForever(stop <-chan interface{}) error { +// RunForever runs forever, or until ctx is cancelled. +func (srv *Server) runForever(ctx context.Context) error { logger := srv.Logger ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod)) - // The unbuffered channel here means we only hear SIGUSR1 if - // it arrives while we're waiting in select{}. - sigUSR1 := make(chan os.Signal) + sigUSR1 := make(chan os.Signal, 1) signal.Notify(sigUSR1, syscall.SIGUSR1) + defer signal.Stop(sigUSR1) + + logger.Info("acquiring service lock") + dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil }) + defer dblock.KeepBalanceService.Unlock() logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod) for { - if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash { + if srv.Cluster.Collections.BalancePullLimit < 1 && srv.Cluster.Collections.BalanceTrashLimit < 1 { logger.Print("WARNING: Will scan periodically, but no changes will be committed.") - logger.Print("======= Consider using -commit-pulls and -commit-trash flags.") + logger.Print("======= To commit changes, set BalancePullLimit and BalanceTrashLimit values greater than zero.") } - _, err := srv.run() + if !dblock.KeepBalanceService.Check() { + // context canceled + return nil + } + _, err := srv.runOnce(ctx) if err != nil { logger.Print("run failed: ", err) } else { @@ -114,8 +126,7 @@ func (srv *Server) runForever(stop <-chan interface{}) error { } select { - case <-stop: - signal.Stop(sigUSR1) + case <-ctx.Done(): return nil case <-ticker.C: logger.Print("timer went off") @@ -124,8 +135,7 @@ func (srv *Server) runForever(stop <-chan interface{}) error { // Reset the timer so we don't start the N+1st // run too soon after the Nth run is triggered // by SIGUSR1. - ticker.Stop() - ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod)) + ticker.Reset(time.Duration(srv.Cluster.Collections.BalancePeriod)) } logger.Print("starting next run") }