12 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 "git.curoverse.com/arvados.git/sdk/go/config"
16 const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
18 // Config specifies site configuration, like API credentials and the
19 // choice of which servers are to be balanced.
21 // Config is loaded from a JSON config file (see usage()).
23 // Arvados API endpoint and credentials.
26 // List of service types (e.g., "disk") to balance.
27 KeepServiceTypes []string
29 KeepServiceList arvados.KeepServiceList
32 RunPeriod arvados.Duration
34 // Number of collections to request in each API call
35 CollectionBatchSize int
37 // Max collections to buffer in memory (bigger values consume
38 // more memory, but can reduce store-and-forward latency when
43 // RunOptions controls runtime behavior. The flags/options that belong
44 // here are the ones that are useful for interactive use. For example,
45 // "CommitTrash" is a runtime option rather than a config item because
46 // it invokes a troubleshooting feature rather than expressing how
47 // balancing is meant to be done at a given site.
49 // RunOptions fields are controlled by command line flags.
50 type RunOptions struct {
57 // SafeRendezvousState from the most recent balance operation,
58 // or "" if unknown. If this changes from one run to the next,
59 // we need to watch out for races. See
60 // (*Balancer)ClearTrashLists.
61 SafeRendezvousState string
64 var debugf = func(string, ...interface{}) {}
68 var runOptions RunOptions
70 configPath := flag.String("config", defaultConfigPath,
71 "`path` of JSON or YAML configuration file")
72 serviceListPath := flag.String("config.KeepServiceList", "",
73 "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
74 "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
75 flag.BoolVar(&runOptions.Once, "once", false,
76 "balance once and then exit")
77 flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
78 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
79 flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
80 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
81 dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
82 debugFlag := flag.Bool("debug", false, "enable debug messages")
86 mustReadConfig(&config, *configPath)
87 if *serviceListPath != "" {
88 mustReadConfig(&config.KeepServiceList, *serviceListPath)
93 if j, err := json.Marshal(config); err != nil {
96 log.Printf("config is %s", j)
100 runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
102 err := CheckConfig(config, runOptions)
105 } else if runOptions.Once {
106 _, err = (&Balancer{}).Run(config, runOptions)
108 err = RunForever(config, runOptions, nil)
115 func mustReadConfig(dst interface{}, path string) {
116 if err := config.LoadFile(dst, path); err != nil {
121 // RunForever runs forever, or (for testing purposes) until the given
122 // stop channel is ready to receive.
123 func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
124 if runOptions.Logger == nil {
125 runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
127 logger := runOptions.Logger
129 ticker := time.NewTicker(time.Duration(config.RunPeriod))
131 // The unbuffered channel here means we only hear SIGUSR1 if
132 // it arrives while we're waiting in select{}.
133 sigUSR1 := make(chan os.Signal)
134 signal.Notify(sigUSR1, syscall.SIGUSR1)
136 logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
139 if !runOptions.CommitPulls && !runOptions.CommitTrash {
140 logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
141 logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
146 runOptions, err = bal.Run(config, runOptions)
148 logger.Print("run failed: ", err)
150 logger.Print("run succeeded")
158 logger.Print("timer went off")
160 logger.Print("received SIGUSR1, resetting timer")
161 // Reset the timer so we don't start the N+1st
162 // run too soon after the Nth run is triggered
165 ticker = time.NewTicker(time.Duration(config.RunPeriod))
167 logger.Print("starting next run")