//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepbalance
import (
- "fmt"
+ "context"
"net/http"
"os"
"os/signal"
"syscall"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/auth"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "github.com/Sirupsen/logrus"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/jmoiron/sqlx"
+ "github.com/sirupsen/logrus"
)
-var version = "dev"
-
-const (
- defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
- rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-)
-
-// Config specifies site configuration, like API credentials and the
-// choice of which servers are to be balanced.
-//
-// Config is loaded from a JSON config file (see usage()).
-type Config struct {
- // Arvados API endpoint and credentials.
- Client arvados.Client
-
- // List of service types (e.g., "disk") to balance.
- KeepServiceTypes []string
-
- KeepServiceList arvados.KeepServiceList
-
- // address, address:port, or :port for management interface
- Listen string
-
- // token for management APIs
- ManagementToken string
-
- // How often to check
- RunPeriod arvados.Duration
-
- // Number of collections to request in each API call
- CollectionBatchSize int
-
- // Max collections to buffer in memory (bigger values consume
- // more memory, but can reduce store-and-forward latency when
- // fetching pages)
- CollectionBuffers int
-
- // Timeout for outgoing http request/response cycle.
- RequestTimeout arvados.Duration
-}
-
// RunOptions controls runtime behavior. The flags/options that belong
// here are the ones that are useful for interactive use. For example,
// "CommitTrash" is a runtime option rather than a config item because
//
// RunOptions fields are controlled by command line flags.
type RunOptions struct {
- Once bool
- CommitPulls bool
- CommitTrash bool
- Logger *logrus.Logger
- Dumper *logrus.Logger
+ Once bool
+ CommitPulls bool
+ CommitTrash bool
+ CommitConfirmedFields bool
+ ChunkPrefix string
+ Logger logrus.FieldLogger
+ Dumper logrus.FieldLogger
// SafeRendezvousState from the most recent balance operation,
// or "" if unknown. If this changes from one run to the next,
}
type Server struct {
- config Config
- runOptions RunOptions
- metrics *metrics
- listening string // for tests
+ http.Handler
- Logger *logrus.Logger
- Dumper *logrus.Logger
-}
+ Cluster *arvados.Cluster
+ ArvClient *arvados.Client
+ RunOptions RunOptions
+ Metrics *metrics
-// NewServer returns a new Server that runs Balancers using the given
-// config and runOptions.
-func NewServer(config Config, runOptions RunOptions) (*Server, error) {
- if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
- return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
- }
- if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
- return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
- }
+ Logger logrus.FieldLogger
+ Dumper logrus.FieldLogger
- if runOptions.Logger == nil {
- log := logrus.New()
- log.Formatter = &logrus.JSONFormatter{
- TimestampFormat: rfc3339NanoFixed,
- }
- log.Out = os.Stderr
- runOptions.Logger = log
- }
+ DB *sqlx.DB
+}
- srv := &Server{
- config: config,
- runOptions: runOptions,
- metrics: newMetrics(),
- Logger: runOptions.Logger,
- Dumper: runOptions.Dumper,
- }
- return srv, srv.start()
+// CheckHealth implements service.Handler.
+func (srv *Server) CheckHealth() error {
+ return srv.DB.Ping()
}
-func (srv *Server) start() error {
- if srv.config.Listen == "" {
- return nil
- }
- server := &httpserver.Server{
- Server: http.Server{
- Handler: httpserver.LogRequests(srv.Logger,
- auth.RequireLiteralToken(srv.config.ManagementToken,
- srv.metrics.Handler(srv.Logger))),
- },
- Addr: srv.config.Listen,
+// Done implements service.Handler.
+func (srv *Server) Done() <-chan struct{} {
+ return nil
+}
+
+func (srv *Server) run(ctx context.Context) {
+ var err error
+ if srv.RunOptions.Once {
+ _, err = srv.runOnce(ctx)
+ } else {
+ err = srv.runForever(ctx)
}
- err := server.Start()
if err != nil {
- return err
+ srv.Logger.Error(err)
+ os.Exit(1)
+ } else {
+ os.Exit(0)
}
- srv.Logger.Printf("listening at %s", server.Addr)
- srv.listening = server.Addr
- return nil
}
-func (srv *Server) Run() (*Balancer, error) {
+func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
bal := &Balancer{
- Logger: srv.Logger,
- Dumper: srv.Dumper,
- Metrics: srv.metrics,
+ DB: srv.DB,
+ Logger: srv.Logger,
+ Dumper: srv.Dumper,
+ Metrics: srv.Metrics,
+ LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
+ ChunkPrefix: srv.RunOptions.ChunkPrefix,
}
var err error
- srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
+ srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
return bal, err
}
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) RunForever(stop <-chan interface{}) error {
- logger := srv.runOptions.Logger
+// RunForever runs forever, or until ctx is cancelled.
+func (srv *Server) runForever(ctx context.Context) error {
+ logger := srv.Logger
- ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
+ ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
// The unbuffered channel here means we only hear SIGUSR1 if
// it arrives while we're waiting in select{}.
sigUSR1 := make(chan os.Signal)
signal.Notify(sigUSR1, syscall.SIGUSR1)
- logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
+ logger.Info("acquiring service lock")
+ dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
+ defer dblock.KeepBalanceService.Unlock()
+
+ logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
for {
- if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
+ if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
- _, err := srv.Run()
+ if !dblock.KeepBalanceService.Check() {
+ // context canceled
+ return nil
+ }
+ _, err := srv.runOnce(ctx)
if err != nil {
logger.Print("run failed: ", err)
} else {
}
select {
- case <-stop:
+ case <-ctx.Done():
signal.Stop(sigUSR1)
return nil
case <-ticker.C:
// run too soon after the Nth run is triggered
// by SIGUSR1.
ticker.Stop()
- ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
+ ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
}
logger.Print("starting next run")
}