Merge branch '10129-cwl-remove-listing' closes #10129
[arvados.git] / services / keep-balance / main.go
1 package main
2
3 import (
4         "encoding/json"
5         "flag"
6         "log"
7         "os"
8         "os/signal"
9         "syscall"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13         "git.curoverse.com/arvados.git/sdk/go/config"
14 )
15
16 // Config specifies site configuration, like API credentials and the
17 // choice of which servers are to be balanced.
18 //
19 // Config is loaded from a JSON config file (see usage()).
20 type Config struct {
21         // Arvados API endpoint and credentials.
22         Client arvados.Client
23
24         // List of service types (e.g., "disk") to balance.
25         KeepServiceTypes []string
26
27         KeepServiceList arvados.KeepServiceList
28
29         // How often to check
30         RunPeriod arvados.Duration
31
32         // Number of collections to request in each API call
33         CollectionBatchSize int
34
35         // Max collections to buffer in memory (bigger values consume
36         // more memory, but can reduce store-and-forward latency when
37         // fetching pages)
38         CollectionBuffers int
39 }
40
41 // RunOptions controls runtime behavior. The flags/options that belong
42 // here are the ones that are useful for interactive use. For example,
43 // "CommitTrash" is a runtime option rather than a config item because
44 // it invokes a troubleshooting feature rather than expressing how
45 // balancing is meant to be done at a given site.
46 //
47 // RunOptions fields are controlled by command line flags.
48 type RunOptions struct {
49         Once        bool
50         CommitPulls bool
51         CommitTrash bool
52         Logger      *log.Logger
53         Dumper      *log.Logger
54
55         // SafeRendezvousState from the most recent balance operation,
56         // or "" if unknown. If this changes from one run to the next,
57         // we need to watch out for races. See
58         // (*Balancer)ClearTrashLists.
59         SafeRendezvousState string
60 }
61
62 var debugf = func(string, ...interface{}) {}
63
64 func main() {
65         var config Config
66         var runOptions RunOptions
67
68         configPath := flag.String("config", "",
69                 "`path` of JSON or YAML configuration file")
70         serviceListPath := flag.String("config.KeepServiceList", "",
71                 "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
72                         "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
73         flag.BoolVar(&runOptions.Once, "once", false,
74                 "balance once and then exit")
75         flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
76                 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
77         flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
78                 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
79         dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
80         debugFlag := flag.Bool("debug", false, "enable debug messages")
81         flag.Usage = usage
82         flag.Parse()
83
84         if *configPath == "" {
85                 log.Fatal("You must specify a config file (see `keep-balance -help`)")
86         }
87         mustReadConfig(&config, *configPath)
88         if *serviceListPath != "" {
89                 mustReadConfig(&config.KeepServiceList, *serviceListPath)
90         }
91
92         if *debugFlag {
93                 debugf = log.Printf
94                 if j, err := json.Marshal(config); err != nil {
95                         log.Fatal(err)
96                 } else {
97                         log.Printf("config is %s", j)
98                 }
99         }
100         if *dumpFlag {
101                 runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
102         }
103         err := CheckConfig(config, runOptions)
104         if err != nil {
105                 // (don't run)
106         } else if runOptions.Once {
107                 _, err = (&Balancer{}).Run(config, runOptions)
108         } else {
109                 err = RunForever(config, runOptions, nil)
110         }
111         if err != nil {
112                 log.Fatal(err)
113         }
114 }
115
116 func mustReadConfig(dst interface{}, path string) {
117         if err := config.LoadFile(dst, path); err != nil {
118                 log.Fatal(err)
119         }
120 }
121
122 // RunForever runs forever, or (for testing purposes) until the given
123 // stop channel is ready to receive.
124 func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
125         if runOptions.Logger == nil {
126                 runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
127         }
128         logger := runOptions.Logger
129
130         ticker := time.NewTicker(time.Duration(config.RunPeriod))
131
132         // The unbuffered channel here means we only hear SIGUSR1 if
133         // it arrives while we're waiting in select{}.
134         sigUSR1 := make(chan os.Signal)
135         signal.Notify(sigUSR1, syscall.SIGUSR1)
136
137         logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
138
139         for {
140                 if !runOptions.CommitPulls && !runOptions.CommitTrash {
141                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
142                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
143                 }
144
145                 bal := &Balancer{}
146                 var err error
147                 runOptions, err = bal.Run(config, runOptions)
148                 if err != nil {
149                         logger.Print("run failed: ", err)
150                 } else {
151                         logger.Print("run succeeded")
152                 }
153
154                 select {
155                 case <-stop:
156                         signal.Stop(sigUSR1)
157                         return nil
158                 case <-ticker.C:
159                         logger.Print("timer went off")
160                 case <-sigUSR1:
161                         logger.Print("received SIGUSR1, resetting timer")
162                         // Reset the timer so we don't start the N+1st
163                         // run too soon after the Nth run is triggered
164                         // by SIGUSR1.
165                         ticker.Stop()
166                         ticker = time.NewTicker(time.Duration(config.RunPeriod))
167                 }
168                 logger.Print("starting next run")
169         }
170 }