1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/sdk/go/arvados"
19 "git.curoverse.com/arvados.git/sdk/go/config"
24 const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
26 // Config specifies site configuration, like API credentials and the
27 // choice of which servers are to be balanced.
29 // Config is loaded from a JSON config file (see usage()).
31 // Arvados API endpoint and credentials.
34 // List of service types (e.g., "disk") to balance.
35 KeepServiceTypes []string
37 KeepServiceList arvados.KeepServiceList
40 RunPeriod arvados.Duration
42 // Number of collections to request in each API call
43 CollectionBatchSize int
45 // Max collections to buffer in memory (bigger values consume
46 // more memory, but can reduce store-and-forward latency when
50 // Timeout for outgoing http request/response cycle.
51 RequestTimeout arvados.Duration
54 // RunOptions controls runtime behavior. The flags/options that belong
55 // here are the ones that are useful for interactive use. For example,
56 // "CommitTrash" is a runtime option rather than a config item because
57 // it invokes a troubleshooting feature rather than expressing how
58 // balancing is meant to be done at a given site.
60 // RunOptions fields are controlled by command line flags.
61 type RunOptions struct {
68 // SafeRendezvousState from the most recent balance operation,
69 // or "" if unknown. If this changes from one run to the next,
70 // we need to watch out for races. See
71 // (*Balancer)ClearTrashLists.
72 SafeRendezvousState string
75 var debugf = func(string, ...interface{}) {}
79 var runOptions RunOptions
81 configPath := flag.String("config", defaultConfigPath,
82 "`path` of JSON or YAML configuration file")
83 serviceListPath := flag.String("config.KeepServiceList", "",
84 "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
85 "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
86 flag.BoolVar(&runOptions.Once, "once", false,
87 "balance once and then exit")
88 flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
89 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
90 flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
91 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
92 dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
93 dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
94 debugFlag := flag.Bool("debug", false, "enable debug messages")
95 getVersion := flag.Bool("version", false, "Print version information and exit.")
99 // Print version information if requested
101 fmt.Printf("keep-balance %s\n", version)
105 mustReadConfig(&cfg, *configPath)
106 if *serviceListPath != "" {
107 mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
111 log.Fatal(config.DumpAndExit(cfg))
114 to := time.Duration(cfg.RequestTimeout)
116 to = 30 * time.Minute
118 arvados.DefaultSecureClient.Timeout = to
119 arvados.InsecureHTTPClient.Timeout = to
120 http.DefaultClient.Timeout = to
122 log.Printf("keep-balance %s started", version)
126 if j, err := json.Marshal(cfg); err != nil {
129 log.Printf("config is %s", j)
133 runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
135 err := CheckConfig(cfg, runOptions)
138 } else if runOptions.Once {
139 _, err = (&Balancer{}).Run(cfg, runOptions)
141 err = RunForever(cfg, runOptions, nil)
148 func mustReadConfig(dst interface{}, path string) {
149 if err := config.LoadFile(dst, path); err != nil {
154 // RunForever runs forever, or (for testing purposes) until the given
155 // stop channel is ready to receive.
156 func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
157 if runOptions.Logger == nil {
158 runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
160 logger := runOptions.Logger
162 ticker := time.NewTicker(time.Duration(config.RunPeriod))
164 // The unbuffered channel here means we only hear SIGUSR1 if
165 // it arrives while we're waiting in select{}.
166 sigUSR1 := make(chan os.Signal)
167 signal.Notify(sigUSR1, syscall.SIGUSR1)
169 logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
172 if !runOptions.CommitPulls && !runOptions.CommitTrash {
173 logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
174 logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
179 runOptions, err = bal.Run(config, runOptions)
181 logger.Print("run failed: ", err)
183 logger.Print("run succeeded")
191 logger.Print("timer went off")
193 logger.Print("received SIGUSR1, resetting timer")
194 // Reset the timer so we don't start the N+1st
195 // run too soon after the Nth run is triggered
198 ticker = time.NewTicker(time.Duration(config.RunPeriod))
200 logger.Print("starting next run")