Merge branch '9180-avoid-overreplication-keepclient'
[arvados.git] / services / keep-balance / main.go
1 package main
2
3 import (
4         "encoding/json"
5         "flag"
6         "io/ioutil"
7         "log"
8         "os"
9         "os/signal"
10         "syscall"
11         "time"
12
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14 )
15
16 // Config specifies site configuration, like API credentials and the
17 // choice of which servers are to be balanced.
18 //
19 // Config is loaded from a JSON config file (see usage()).
20 type Config struct {
21         // Arvados API endpoint and credentials.
22         Client arvados.Client
23
24         // List of service types (e.g., "disk") to balance.
25         KeepServiceTypes []string
26
27         KeepServiceList arvados.KeepServiceList
28
29         // How often to check
30         RunPeriod arvados.Duration
31
32         // Number of collections to request in each API call
33         CollectionBatchSize int
34
35         // Max collections to buffer in memory (bigger values consume
36         // more memory, but can reduce store-and-forward latency when
37         // fetching pages)
38         CollectionBuffers int
39 }
40
41 // RunOptions controls runtime behavior. The flags/options that belong
42 // here are the ones that are useful for interactive use. For example,
43 // "CommitTrash" is a runtime option rather than a config item because
44 // it invokes a troubleshooting feature rather than expressing how
45 // balancing is meant to be done at a given site.
46 //
47 // RunOptions fields are controlled by command line flags.
48 type RunOptions struct {
49         Once        bool
50         CommitPulls bool
51         CommitTrash bool
52         Logger      *log.Logger
53         Dumper      *log.Logger
54 }
55
56 var debugf = func(string, ...interface{}) {}
57
58 func main() {
59         var config Config
60         var runOptions RunOptions
61
62         configPath := flag.String("config", "",
63                 "`path` of json configuration file")
64         serviceListPath := flag.String("config.KeepServiceList", "",
65                 "`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+
66                         "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
67         flag.BoolVar(&runOptions.Once, "once", false,
68                 "balance once and then exit")
69         flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
70                 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
71         flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
72                 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
73         dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
74         debugFlag := flag.Bool("debug", false, "enable debug messages")
75         flag.Usage = usage
76         flag.Parse()
77
78         if *configPath == "" {
79                 log.Fatal("You must specify a config file (see `keep-balance -help`)")
80         }
81         mustReadJSON(&config, *configPath)
82         if *serviceListPath != "" {
83                 mustReadJSON(&config.KeepServiceList, *serviceListPath)
84         }
85
86         if *debugFlag {
87                 debugf = log.Printf
88                 if j, err := json.Marshal(config); err != nil {
89                         log.Fatal(err)
90                 } else {
91                         log.Printf("config is %s", j)
92                 }
93         }
94         if *dumpFlag {
95                 runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
96         }
97         err := CheckConfig(config, runOptions)
98         if err != nil {
99                 // (don't run)
100         } else if runOptions.Once {
101                 err = (&Balancer{}).Run(config, runOptions)
102         } else {
103                 err = RunForever(config, runOptions, nil)
104         }
105         if err != nil {
106                 log.Fatal(err)
107         }
108 }
109
110 func mustReadJSON(dst interface{}, path string) {
111         if buf, err := ioutil.ReadFile(path); err != nil {
112                 log.Fatalf("Reading %q: %v", path, err)
113         } else if err = json.Unmarshal(buf, dst); err != nil {
114                 log.Fatalf("Decoding %q: %v", path, err)
115         }
116 }
117
118 // RunForever runs forever, or (for testing purposes) until the given
119 // stop channel is ready to receive.
120 func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
121         if runOptions.Logger == nil {
122                 runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
123         }
124         logger := runOptions.Logger
125
126         ticker := time.NewTicker(time.Duration(config.RunPeriod))
127
128         // The unbuffered channel here means we only hear SIGUSR1 if
129         // it arrives while we're waiting in select{}.
130         sigUSR1 := make(chan os.Signal)
131         signal.Notify(sigUSR1, syscall.SIGUSR1)
132
133         logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
134
135         for {
136                 if !runOptions.CommitPulls && !runOptions.CommitTrash {
137                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
138                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
139                 }
140
141                 err := (&Balancer{}).Run(config, runOptions)
142                 if err != nil {
143                         logger.Print("run failed: ", err)
144                 } else {
145                         logger.Print("run succeeded")
146                 }
147
148                 select {
149                 case <-stop:
150                         signal.Stop(sigUSR1)
151                         return nil
152                 case <-ticker.C:
153                         logger.Print("timer went off")
154                 case <-sigUSR1:
155                         logger.Print("received SIGUSR1, resetting timer")
156                         // Reset the timer so we don't start the N+1st
157                         // run too soon after the Nth run is triggered
158                         // by SIGUSR1.
159                         ticker.Stop()
160                         ticker = time.NewTicker(time.Duration(config.RunPeriod))
161                 }
162                 logger.Print("starting next run")
163         }
164 }