1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.arvados.org/arvados.git/lib/controller/dblock"
16 "git.arvados.org/arvados.git/sdk/go/arvados"
17 "github.com/jmoiron/sqlx"
18 "github.com/sirupsen/logrus"
21 // RunOptions controls runtime behavior. The flags/options that belong
22 // here are the ones that are useful for interactive use. For example,
23 // "CommitTrash" is a runtime option rather than a config item because
24 // it invokes a troubleshooting feature rather than expressing how
25 // balancing is meant to be done at a given site.
27 // RunOptions fields are controlled by command line flags.
28 type RunOptions struct {
30 CommitConfirmedFields bool
32 Logger logrus.FieldLogger
33 Dumper logrus.FieldLogger
35 // SafeRendezvousState from the most recent balance operation,
36 // or "" if unknown. If this changes from one run to the next,
37 // we need to watch out for races. See
38 // (*Balancer)ClearTrashLists.
39 SafeRendezvousState string
45 Cluster *arvados.Cluster
46 ArvClient *arvados.Client
50 Logger logrus.FieldLogger
51 Dumper logrus.FieldLogger
56 // CheckHealth implements service.Handler.
57 func (srv *Server) CheckHealth() error {
61 // Done implements service.Handler.
62 func (srv *Server) Done() <-chan struct{} {
66 func (srv *Server) run(ctx context.Context) {
68 if srv.RunOptions.Once {
69 _, err = srv.runOnce(ctx)
71 err = srv.runForever(ctx)
81 func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
87 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
88 ChunkPrefix: srv.RunOptions.ChunkPrefix,
91 srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
95 // RunForever runs forever, or until ctx is cancelled.
96 func (srv *Server) runForever(ctx context.Context) error {
99 ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
101 // The unbuffered channel here means we only hear SIGUSR1 if
102 // it arrives while we're waiting in select{}.
103 sigUSR1 := make(chan os.Signal)
104 signal.Notify(sigUSR1, syscall.SIGUSR1)
106 logger.Info("acquiring service lock")
107 dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
108 defer dblock.KeepBalanceService.Unlock()
110 logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
113 if srv.Cluster.Collections.BalancePullLimit < 1 && srv.Cluster.Collections.BalanceTrashLimit < 1 {
114 logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
115 logger.Print("======= To commit changes, set BalancePullLimit and BalanceTrashLimit values greater than zero.")
118 if !dblock.KeepBalanceService.Check() {
122 _, err := srv.runOnce(ctx)
124 logger.Print("run failed: ", err)
126 logger.Print("run succeeded")
134 logger.Print("timer went off")
136 logger.Print("received SIGUSR1, resetting timer")
137 // Reset the timer so we don't start the N+1st
138 // run too soon after the Nth run is triggered
141 ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
143 logger.Print("starting next run")