1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "git.curoverse.com/arvados.git/sdk/go/auth"
18 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
19 "git.curoverse.com/arvados.git/sdk/go/httpserver"
20 "github.com/sirupsen/logrus"
26 defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
27 rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
30 // Config specifies site configuration, like API credentials and the
31 // choice of which servers are to be balanced.
33 // Config is loaded from a JSON config file (see usage()).
35 // Arvados API endpoint and credentials.
38 // List of service types (e.g., "disk") to balance.
39 KeepServiceTypes []string
41 KeepServiceList arvados.KeepServiceList
43 // address, address:port, or :port for management interface
46 // token for management APIs
47 ManagementToken string
50 RunPeriod arvados.Duration
52 // Number of collections to request in each API call
53 CollectionBatchSize int
55 // Max collections to buffer in memory (bigger values consume
56 // more memory, but can reduce store-and-forward latency when
60 // Timeout for outgoing http request/response cycle.
61 RequestTimeout arvados.Duration
63 // Destination filename for the list of lost block hashes, one
64 // per line. Updated atomically during each successful run.
68 // RunOptions controls runtime behavior. The flags/options that belong
69 // here are the ones that are useful for interactive use. For example,
70 // "CommitTrash" is a runtime option rather than a config item because
71 // it invokes a troubleshooting feature rather than expressing how
72 // balancing is meant to be done at a given site.
74 // RunOptions fields are controlled by command line flags.
75 type RunOptions struct {
79 Logger logrus.FieldLogger
80 Dumper logrus.FieldLogger
82 // SafeRendezvousState from the most recent balance operation,
83 // or "" if unknown. If this changes from one run to the next,
84 // we need to watch out for races. See
85 // (*Balancer)ClearTrashLists.
86 SafeRendezvousState string
93 listening string // for tests
95 Logger logrus.FieldLogger
96 Dumper logrus.FieldLogger
99 // NewServer returns a new Server that runs Balancers using the given
100 // config and runOptions.
101 func NewServer(config Config, runOptions RunOptions) (*Server, error) {
102 if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
103 return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
105 if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
106 return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
109 if runOptions.Logger == nil {
111 log.Formatter = &logrus.JSONFormatter{
112 TimestampFormat: rfc3339NanoFixed,
115 runOptions.Logger = log
120 runOptions: runOptions,
121 metrics: newMetrics(),
122 Logger: runOptions.Logger,
123 Dumper: runOptions.Dumper,
125 return srv, srv.start()
128 func (srv *Server) start() error {
129 if srv.config.Listen == "" {
132 ctx := ctxlog.Context(context.Background(), srv.Logger)
133 server := &httpserver.Server{
135 Handler: httpserver.HandlerWithContext(ctx,
136 httpserver.LogRequests(
137 auth.RequireLiteralToken(srv.config.ManagementToken,
138 srv.metrics.Handler(srv.Logger)))),
140 Addr: srv.config.Listen,
142 err := server.Start()
146 srv.Logger.Printf("listening at %s", server.Addr)
147 srv.listening = server.Addr
151 func (srv *Server) Run() (*Balancer, error) {
155 Metrics: srv.metrics,
156 LostBlocksFile: srv.config.LostBlocksFile,
159 srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
163 // RunForever runs forever, or (for testing purposes) until the given
164 // stop channel is ready to receive.
165 func (srv *Server) RunForever(stop <-chan interface{}) error {
166 logger := srv.runOptions.Logger
168 ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
170 // The unbuffered channel here means we only hear SIGUSR1 if
171 // it arrives while we're waiting in select{}.
172 sigUSR1 := make(chan os.Signal)
173 signal.Notify(sigUSR1, syscall.SIGUSR1)
175 logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
178 if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
179 logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
180 logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
185 logger.Print("run failed: ", err)
187 logger.Print("run succeeded")
195 logger.Print("timer went off")
197 logger.Print("received SIGUSR1, resetting timer")
198 // Reset the timer so we don't start the N+1st
199 // run too soon after the Nth run is triggered
202 ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
204 logger.Print("starting next run")