1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "git.curoverse.com/arvados.git/sdk/go/auth"
17 "git.curoverse.com/arvados.git/sdk/go/httpserver"
18 "github.com/Sirupsen/logrus"
24 defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
25 rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
28 // Config specifies site configuration, like API credentials and the
29 // choice of which servers are to be balanced.
31 // Config is loaded from a JSON config file (see usage()).
33 // Arvados API endpoint and credentials.
36 // List of service types (e.g., "disk") to balance.
37 KeepServiceTypes []string
39 KeepServiceList arvados.KeepServiceList
41 // address, address:port, or :port for management interface
44 // token for management APIs
45 ManagementToken string
48 RunPeriod arvados.Duration
50 // Number of collections to request in each API call
51 CollectionBatchSize int
53 // Max collections to buffer in memory (bigger values consume
54 // more memory, but can reduce store-and-forward latency when
58 // Timeout for outgoing http request/response cycle.
59 RequestTimeout arvados.Duration
62 // RunOptions controls runtime behavior. The flags/options that belong
63 // here are the ones that are useful for interactive use. For example,
64 // "CommitTrash" is a runtime option rather than a config item because
65 // it invokes a troubleshooting feature rather than expressing how
66 // balancing is meant to be done at a given site.
68 // RunOptions fields are controlled by command line flags.
69 type RunOptions struct {
76 // SafeRendezvousState from the most recent balance operation,
77 // or "" if unknown. If this changes from one run to the next,
78 // we need to watch out for races. See
79 // (*Balancer)ClearTrashLists.
80 SafeRendezvousState string
87 listening string // for tests
93 // NewServer returns a new Server that runs Balancers using the given
94 // config and runOptions.
95 func NewServer(config Config, runOptions RunOptions) (*Server, error) {
96 if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
97 return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
99 if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
100 return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
103 if runOptions.Logger == nil {
105 log.Formatter = &logrus.JSONFormatter{
106 TimestampFormat: rfc3339NanoFixed,
109 runOptions.Logger = log
114 runOptions: runOptions,
115 metrics: newMetrics(),
116 Logger: runOptions.Logger,
117 Dumper: runOptions.Dumper,
119 return srv, srv.start()
122 func (srv *Server) start() error {
123 if srv.config.Listen == "" {
126 server := &httpserver.Server{
128 Handler: httpserver.LogRequests(srv.Logger,
129 auth.RequireLiteralToken(srv.config.ManagementToken,
130 srv.metrics.Handler(srv.Logger))),
132 Addr: srv.config.Listen,
134 err := server.Start()
138 srv.Logger.Printf("listening at %s", server.Addr)
139 srv.listening = server.Addr
143 func (srv *Server) Run() (*Balancer, error) {
147 Metrics: srv.metrics,
150 srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
154 // RunForever runs forever, or (for testing purposes) until the given
155 // stop channel is ready to receive.
156 func (srv *Server) RunForever(stop <-chan interface{}) error {
157 logger := srv.runOptions.Logger
159 ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
161 // The unbuffered channel here means we only hear SIGUSR1 if
162 // it arrives while we're waiting in select{}.
163 sigUSR1 := make(chan os.Signal)
164 signal.Notify(sigUSR1, syscall.SIGUSR1)
166 logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
169 if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
170 logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
171 logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
176 logger.Print("run failed: ", err)
178 logger.Print("run succeeded")
186 logger.Print("timer went off")
188 logger.Print("received SIGUSR1, resetting timer")
189 // Reset the timer so we don't start the N+1st
190 // run too soon after the Nth run is triggered
193 ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
195 logger.Print("starting next run")