//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepbalance
import (
"context"
"syscall"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
)
//
// RunOptions fields are controlled by command line flags.
type RunOptions struct {
- Once bool
- CommitPulls bool
- CommitTrash bool
- Logger logrus.FieldLogger
- Dumper logrus.FieldLogger
+ Once bool
+ CommitPulls bool
+ CommitTrash bool
+ CommitConfirmedFields bool
+ ChunkPrefix string
+ Logger logrus.FieldLogger
+ Dumper logrus.FieldLogger
// SafeRendezvousState from the most recent balance operation,
// or "" if unknown. If this changes from one run to the next,
type Server struct {
http.Handler
+
Cluster *arvados.Cluster
ArvClient *arvados.Client
RunOptions RunOptions
Logger logrus.FieldLogger
Dumper logrus.FieldLogger
+
+ DB *sqlx.DB
}
// CheckHealth implements service.Handler.
func (srv *Server) CheckHealth() error {
- return nil
+ return srv.DB.Ping()
}
-// Start sets up and runs the balancer.
-func (srv *Server) Start(ctx context.Context) {
- if srv.RunOptions.Logger == nil {
- srv.RunOptions.Logger = ctxlog.FromContext(ctx)
- }
-
- srv.Logger = srv.RunOptions.Logger
- srv.Dumper = srv.RunOptions.Dumper
+// Done implements service.Handler.
+func (srv *Server) Done() <-chan struct{} {
+ return nil
+}
+func (srv *Server) run(ctx context.Context) {
var err error
if srv.RunOptions.Once {
- _, err = srv.run()
+ _, err = srv.runOnce(ctx)
} else {
- err = srv.runForever(nil)
+ err = srv.runForever(ctx)
}
if err != nil {
srv.Logger.Error(err)
+ os.Exit(1)
+ } else {
+ os.Exit(0)
}
}
-func (srv *Server) run() (*Balancer, error) {
+func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
bal := &Balancer{
+ DB: srv.DB,
Logger: srv.Logger,
Dumper: srv.Dumper,
Metrics: srv.Metrics,
LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
+ ChunkPrefix: srv.RunOptions.ChunkPrefix,
}
var err error
- srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
+ srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
return bal, err
}
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) runForever(stop <-chan interface{}) error {
+// RunForever runs forever, or until ctx is cancelled.
+func (srv *Server) runForever(ctx context.Context) error {
logger := srv.Logger
ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
sigUSR1 := make(chan os.Signal)
signal.Notify(sigUSR1, syscall.SIGUSR1)
+ logger.Info("acquiring service lock")
+ dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
+ defer dblock.KeepBalanceService.Unlock()
+
logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
for {
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
- _, err := srv.run()
+ if !dblock.KeepBalanceService.Check() {
+ // context canceled
+ return nil
+ }
+ _, err := srv.runOnce(ctx)
if err != nil {
logger.Print("run failed: ", err)
} else {
}
select {
- case <-stop:
+ case <-ctx.Done():
signal.Stop(sigUSR1)
return nil
case <-ticker.C: