1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/sdk/go/arvados"
19 "git.curoverse.com/arvados.git/sdk/go/config"
20 "git.curoverse.com/arvados.git/sdk/go/httpserver"
21 "github.com/Sirupsen/logrus"
27 defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
28 rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
31 // Config specifies site configuration, like API credentials and the
32 // choice of which servers are to be balanced.
34 // Config is loaded from a JSON config file (see usage()).
36 // Arvados API endpoint and credentials.
39 // List of service types (e.g., "disk") to balance.
40 KeepServiceTypes []string
42 KeepServiceList arvados.KeepServiceList
44 // address, address:port, or :port for management interface
48 RunPeriod arvados.Duration
50 // Number of collections to request in each API call
51 CollectionBatchSize int
53 // Max collections to buffer in memory (bigger values consume
54 // more memory, but can reduce store-and-forward latency when
58 // Timeout for outgoing http request/response cycle.
59 RequestTimeout arvados.Duration
62 // RunOptions controls runtime behavior. The flags/options that belong
63 // here are the ones that are useful for interactive use. For example,
64 // "CommitTrash" is a runtime option rather than a config item because
65 // it invokes a troubleshooting feature rather than expressing how
66 // balancing is meant to be done at a given site.
68 // RunOptions fields are controlled by command line flags.
69 type RunOptions struct {
76 // SafeRendezvousState from the most recent balance operation,
77 // or "" if unknown. If this changes from one run to the next,
78 // we need to watch out for races. See
79 // (*Balancer)ClearTrashLists.
80 SafeRendezvousState string
83 var debugf = func(string, ...interface{}) {}
87 var runOptions RunOptions
89 configPath := flag.String("config", defaultConfigPath,
90 "`path` of JSON or YAML configuration file")
91 serviceListPath := flag.String("config.KeepServiceList", "",
92 "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
93 "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
94 flag.BoolVar(&runOptions.Once, "once", false,
95 "balance once and then exit")
96 flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
97 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
98 flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
99 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
100 dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
101 dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
102 debugFlag := flag.Bool("debug", false, "enable debug messages")
103 getVersion := flag.Bool("version", false, "Print version information and exit.")
107 // Print version information if requested
109 fmt.Printf("keep-balance %s\n", version)
113 mustReadConfig(&cfg, *configPath)
114 if *serviceListPath != "" {
115 mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
119 log.Fatal(config.DumpAndExit(cfg))
122 to := time.Duration(cfg.RequestTimeout)
124 to = 30 * time.Minute
126 arvados.DefaultSecureClient.Timeout = to
127 arvados.InsecureHTTPClient.Timeout = to
128 http.DefaultClient.Timeout = to
130 log.Printf("keep-balance %s started", version)
134 if j, err := json.Marshal(cfg); err != nil {
137 log.Printf("config is %s", j)
141 runOptions.Dumper = logrus.New()
142 runOptions.Dumper.Out = os.Stdout
143 runOptions.Dumper.Formatter = &logrus.TextFormatter{}
145 srv, err := NewServer(cfg, runOptions)
148 } else if runOptions.Once {
151 err = srv.RunForever(nil)
158 func mustReadConfig(dst interface{}, path string) {
159 if err := config.LoadFile(dst, path); err != nil {
166 runOptions RunOptions
168 listening string // for tests
170 Logger *logrus.Logger
171 Dumper *logrus.Logger
174 // NewServer returns a new Server that runs Balancers using the given
175 // config and runOptions.
176 func NewServer(config Config, runOptions RunOptions) (*Server, error) {
177 if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
178 return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
180 if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
181 return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
184 if runOptions.Logger == nil {
186 log.Formatter = &logrus.JSONFormatter{
187 TimestampFormat: rfc3339NanoFixed,
190 runOptions.Logger = log
195 runOptions: runOptions,
196 metrics: newMetrics(),
197 Logger: runOptions.Logger,
198 Dumper: runOptions.Dumper,
200 return srv, srv.start()
203 func (srv *Server) start() error {
204 if srv.config.Listen == "" {
207 server := &httpserver.Server{
209 Handler: httpserver.LogRequests(srv.Logger, srv.metrics.Handler(srv.Logger)),
211 Addr: srv.config.Listen,
213 err := server.Start()
217 srv.Logger.Printf("listening at %s", server.Addr)
218 srv.listening = server.Addr
222 func (srv *Server) Run() (*Balancer, error) {
226 Metrics: srv.metrics,
229 srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
233 // RunForever runs forever, or (for testing purposes) until the given
234 // stop channel is ready to receive.
235 func (srv *Server) RunForever(stop <-chan interface{}) error {
236 logger := srv.runOptions.Logger
238 ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
240 // The unbuffered channel here means we only hear SIGUSR1 if
241 // it arrives while we're waiting in select{}.
242 sigUSR1 := make(chan os.Signal)
243 signal.Notify(sigUSR1, syscall.SIGUSR1)
245 logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
248 if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
249 logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
250 logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
255 logger.Print("run failed: ", err)
257 logger.Print("run succeeded")
265 logger.Print("timer went off")
267 logger.Print("received SIGUSR1, resetting timer")
268 // Reset the timer so we don't start the N+1st
269 // run too soon after the Nth run is triggered
272 ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
274 logger.Print("starting next run")