9180: Changed some of the logic on ThreadLimiter and made unit tests to validate...
[arvados.git] / services / keep-balance / main.go
1 package main
2
3 import (
4         "encoding/json"
5         "flag"
6         "io/ioutil"
7         "log"
8         "os"
9         "os/signal"
10         "syscall"
11         "time"
12
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14 )
15
16 // Config specifies site configuration, like API credentials and the
17 // choice of which servers are to be balanced.
18 //
19 // Config is loaded from a JSON config file (see usage()).
20 type Config struct {
21         // Arvados API endpoint and credentials.
22         Client arvados.Client
23
24         // List of service types (e.g., "disk") to balance.
25         KeepServiceTypes []string
26
27         KeepServiceList arvados.KeepServiceList
28
29         // How often to check
30         RunPeriod arvados.Duration
31 }
32
33 // RunOptions controls runtime behavior. The flags/options that belong
34 // here are the ones that are useful for interactive use. For example,
35 // "CommitTrash" is a runtime option rather than a config item because
36 // it invokes a troubleshooting feature rather than expressing how
37 // balancing is meant to be done at a given site.
38 //
39 // RunOptions fields are controlled by command line flags.
40 type RunOptions struct {
41         Once        bool
42         CommitPulls bool
43         CommitTrash bool
44         Logger      *log.Logger
45         Dumper      *log.Logger
46 }
47
48 var debugf = func(string, ...interface{}) {}
49
50 func main() {
51         var config Config
52         var runOptions RunOptions
53
54         configPath := flag.String("config", "",
55                 "`path` of json configuration file")
56         serviceListPath := flag.String("config.KeepServiceList", "",
57                 "`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+
58                         "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
59         flag.BoolVar(&runOptions.Once, "once", false,
60                 "balance once and then exit")
61         flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
62                 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
63         flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
64                 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
65         dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
66         debugFlag := flag.Bool("debug", false, "enable debug messages")
67         flag.Usage = usage
68         flag.Parse()
69
70         if *configPath == "" {
71                 log.Fatal("You must specify a config file (see `keep-balance -help`)")
72         }
73         mustReadJSON(&config, *configPath)
74         if *serviceListPath != "" {
75                 mustReadJSON(&config.KeepServiceList, *serviceListPath)
76         }
77
78         if *debugFlag {
79                 debugf = log.Printf
80                 if j, err := json.Marshal(config); err != nil {
81                         log.Fatal(err)
82                 } else {
83                         log.Printf("config is %s", j)
84                 }
85         }
86         if *dumpFlag {
87                 runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
88         }
89         err := CheckConfig(config, runOptions)
90         if err != nil {
91                 // (don't run)
92         } else if runOptions.Once {
93                 err = (&Balancer{}).Run(config, runOptions)
94         } else {
95                 err = RunForever(config, runOptions, nil)
96         }
97         if err != nil {
98                 log.Fatal(err)
99         }
100 }
101
102 func mustReadJSON(dst interface{}, path string) {
103         if buf, err := ioutil.ReadFile(path); err != nil {
104                 log.Fatalf("Reading %q: %v", path, err)
105         } else if err = json.Unmarshal(buf, dst); err != nil {
106                 log.Fatalf("Decoding %q: %v", path, err)
107         }
108 }
109
110 // RunForever runs forever, or (for testing purposes) until the given
111 // stop channel is ready to receive.
112 func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
113         if runOptions.Logger == nil {
114                 runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
115         }
116         logger := runOptions.Logger
117
118         ticker := time.NewTicker(time.Duration(config.RunPeriod))
119
120         // The unbuffered channel here means we only hear SIGUSR1 if
121         // it arrives while we're waiting in select{}.
122         sigUSR1 := make(chan os.Signal)
123         signal.Notify(sigUSR1, syscall.SIGUSR1)
124
125         logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
126
127         for {
128                 if !runOptions.CommitPulls && !runOptions.CommitTrash {
129                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
130                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
131                 }
132
133                 err := (&Balancer{}).Run(config, runOptions)
134                 if err != nil {
135                         logger.Print("run failed: ", err)
136                 } else {
137                         logger.Print("run succeeded")
138                 }
139
140                 select {
141                 case <-stop:
142                         signal.Stop(sigUSR1)
143                         return nil
144                 case <-ticker.C:
145                         logger.Print("timer went off")
146                 case <-sigUSR1:
147                         logger.Print("received SIGUSR1, resetting timer")
148                         // Reset the timer so we don't start the N+1st
149                         // run too soon after the Nth run is triggered
150                         // by SIGUSR1.
151                         ticker.Stop()
152                         ticker = time.NewTicker(time.Duration(config.RunPeriod))
153                 }
154                 logger.Print("starting next run")
155         }
156 }