Increase ping delay in WatchdogActorTest to try and reduce spurious test failures...
[arvados.git] / services / keep-balance / main.go
1 package main
2
3 import (
4         "encoding/json"
5         "flag"
6         "log"
7         "os"
8         "os/signal"
9         "syscall"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13         "git.curoverse.com/arvados.git/sdk/go/config"
14 )
15
16 const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
17
18 // Config specifies site configuration, like API credentials and the
19 // choice of which servers are to be balanced.
20 //
21 // Config is loaded from a JSON config file (see usage()).
22 type Config struct {
23         // Arvados API endpoint and credentials.
24         Client arvados.Client
25
26         // List of service types (e.g., "disk") to balance.
27         KeepServiceTypes []string
28
29         KeepServiceList arvados.KeepServiceList
30
31         // How often to check
32         RunPeriod arvados.Duration
33
34         // Number of collections to request in each API call
35         CollectionBatchSize int
36
37         // Max collections to buffer in memory (bigger values consume
38         // more memory, but can reduce store-and-forward latency when
39         // fetching pages)
40         CollectionBuffers int
41 }
42
43 // RunOptions controls runtime behavior. The flags/options that belong
44 // here are the ones that are useful for interactive use. For example,
45 // "CommitTrash" is a runtime option rather than a config item because
46 // it invokes a troubleshooting feature rather than expressing how
47 // balancing is meant to be done at a given site.
48 //
49 // RunOptions fields are controlled by command line flags.
50 type RunOptions struct {
51         Once        bool
52         CommitPulls bool
53         CommitTrash bool
54         Logger      *log.Logger
55         Dumper      *log.Logger
56
57         // SafeRendezvousState from the most recent balance operation,
58         // or "" if unknown. If this changes from one run to the next,
59         // we need to watch out for races. See
60         // (*Balancer)ClearTrashLists.
61         SafeRendezvousState string
62 }
63
64 var debugf = func(string, ...interface{}) {}
65
66 func main() {
67         var cfg Config
68         var runOptions RunOptions
69
70         configPath := flag.String("config", defaultConfigPath,
71                 "`path` of JSON or YAML configuration file")
72         serviceListPath := flag.String("config.KeepServiceList", "",
73                 "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
74                         "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
75         flag.BoolVar(&runOptions.Once, "once", false,
76                 "balance once and then exit")
77         flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
78                 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
79         flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
80                 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
81         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
82         dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
83         debugFlag := flag.Bool("debug", false, "enable debug messages")
84         flag.Usage = usage
85         flag.Parse()
86
87         mustReadConfig(&cfg, *configPath)
88         if *serviceListPath != "" {
89                 mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
90         }
91
92         if *dumpConfig {
93                 log.Fatal(config.DumpAndExit(cfg))
94         }
95
96         if *debugFlag {
97                 debugf = log.Printf
98                 if j, err := json.Marshal(cfg); err != nil {
99                         log.Fatal(err)
100                 } else {
101                         log.Printf("config is %s", j)
102                 }
103         }
104         if *dumpFlag {
105                 runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
106         }
107         err := CheckConfig(cfg, runOptions)
108         if err != nil {
109                 // (don't run)
110         } else if runOptions.Once {
111                 _, err = (&Balancer{}).Run(cfg, runOptions)
112         } else {
113                 err = RunForever(cfg, runOptions, nil)
114         }
115         if err != nil {
116                 log.Fatal(err)
117         }
118 }
119
120 func mustReadConfig(dst interface{}, path string) {
121         if err := config.LoadFile(dst, path); err != nil {
122                 log.Fatal(err)
123         }
124 }
125
126 // RunForever runs forever, or (for testing purposes) until the given
127 // stop channel is ready to receive.
128 func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
129         if runOptions.Logger == nil {
130                 runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
131         }
132         logger := runOptions.Logger
133
134         ticker := time.NewTicker(time.Duration(config.RunPeriod))
135
136         // The unbuffered channel here means we only hear SIGUSR1 if
137         // it arrives while we're waiting in select{}.
138         sigUSR1 := make(chan os.Signal)
139         signal.Notify(sigUSR1, syscall.SIGUSR1)
140
141         logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
142
143         for {
144                 if !runOptions.CommitPulls && !runOptions.CommitTrash {
145                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
146                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
147                 }
148
149                 bal := &Balancer{}
150                 var err error
151                 runOptions, err = bal.Run(config, runOptions)
152                 if err != nil {
153                         logger.Print("run failed: ", err)
154                 } else {
155                         logger.Print("run succeeded")
156                 }
157
158                 select {
159                 case <-stop:
160                         signal.Stop(sigUSR1)
161                         return nil
162                 case <-ticker.C:
163                         logger.Print("timer went off")
164                 case <-sigUSR1:
165                         logger.Print("received SIGUSR1, resetting timer")
166                         // Reset the timer so we don't start the N+1st
167                         // run too soon after the Nth run is triggered
168                         // by SIGUSR1.
169                         ticker.Stop()
170                         ticker = time.NewTicker(time.Duration(config.RunPeriod))
171                 }
172                 logger.Print("starting next run")
173         }
174 }