Merge branch '13330-intermediates-test' of git.curoverse.com:arvados into 13330-cwl...
[arvados.git] / services / keep-balance / main.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "encoding/json"
9         "flag"
10         "fmt"
11         "log"
12         "net/http"
13         "os"
14         "os/signal"
15         "syscall"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "git.curoverse.com/arvados.git/sdk/go/config"
20 )
21
22 var version = "dev"
23
24 const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
25
26 // Config specifies site configuration, like API credentials and the
27 // choice of which servers are to be balanced.
28 //
29 // Config is loaded from a JSON config file (see usage()).
30 type Config struct {
31         // Arvados API endpoint and credentials.
32         Client arvados.Client
33
34         // List of service types (e.g., "disk") to balance.
35         KeepServiceTypes []string
36
37         KeepServiceList arvados.KeepServiceList
38
39         // How often to check
40         RunPeriod arvados.Duration
41
42         // Number of collections to request in each API call
43         CollectionBatchSize int
44
45         // Max collections to buffer in memory (bigger values consume
46         // more memory, but can reduce store-and-forward latency when
47         // fetching pages)
48         CollectionBuffers int
49
50         // Timeout for outgoing http request/response cycle.
51         RequestTimeout arvados.Duration
52 }
53
54 // RunOptions controls runtime behavior. The flags/options that belong
55 // here are the ones that are useful for interactive use. For example,
56 // "CommitTrash" is a runtime option rather than a config item because
57 // it invokes a troubleshooting feature rather than expressing how
58 // balancing is meant to be done at a given site.
59 //
60 // RunOptions fields are controlled by command line flags.
61 type RunOptions struct {
62         Once        bool
63         CommitPulls bool
64         CommitTrash bool
65         Logger      *log.Logger
66         Dumper      *log.Logger
67
68         // SafeRendezvousState from the most recent balance operation,
69         // or "" if unknown. If this changes from one run to the next,
70         // we need to watch out for races. See
71         // (*Balancer)ClearTrashLists.
72         SafeRendezvousState string
73 }
74
75 var debugf = func(string, ...interface{}) {}
76
77 func main() {
78         var cfg Config
79         var runOptions RunOptions
80
81         configPath := flag.String("config", defaultConfigPath,
82                 "`path` of JSON or YAML configuration file")
83         serviceListPath := flag.String("config.KeepServiceList", "",
84                 "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
85                         "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
86         flag.BoolVar(&runOptions.Once, "once", false,
87                 "balance once and then exit")
88         flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
89                 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
90         flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
91                 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
92         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
93         dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
94         debugFlag := flag.Bool("debug", false, "enable debug messages")
95         getVersion := flag.Bool("version", false, "Print version information and exit.")
96         flag.Usage = usage
97         flag.Parse()
98
99         // Print version information if requested
100         if *getVersion {
101                 fmt.Printf("keep-balance %s\n", version)
102                 return
103         }
104
105         mustReadConfig(&cfg, *configPath)
106         if *serviceListPath != "" {
107                 mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
108         }
109
110         if *dumpConfig {
111                 log.Fatal(config.DumpAndExit(cfg))
112         }
113
114         to := time.Duration(cfg.RequestTimeout)
115         if to == 0 {
116                 to = 30 * time.Minute
117         }
118         arvados.DefaultSecureClient.Timeout = to
119         arvados.InsecureHTTPClient.Timeout = to
120         http.DefaultClient.Timeout = to
121
122         log.Printf("keep-balance %s started", version)
123
124         if *debugFlag {
125                 debugf = log.Printf
126                 if j, err := json.Marshal(cfg); err != nil {
127                         log.Fatal(err)
128                 } else {
129                         log.Printf("config is %s", j)
130                 }
131         }
132         if *dumpFlag {
133                 runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
134         }
135         err := CheckConfig(cfg, runOptions)
136         if err != nil {
137                 // (don't run)
138         } else if runOptions.Once {
139                 _, err = (&Balancer{}).Run(cfg, runOptions)
140         } else {
141                 err = RunForever(cfg, runOptions, nil)
142         }
143         if err != nil {
144                 log.Fatal(err)
145         }
146 }
147
148 func mustReadConfig(dst interface{}, path string) {
149         if err := config.LoadFile(dst, path); err != nil {
150                 log.Fatal(err)
151         }
152 }
153
154 // RunForever runs forever, or (for testing purposes) until the given
155 // stop channel is ready to receive.
156 func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
157         if runOptions.Logger == nil {
158                 runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
159         }
160         logger := runOptions.Logger
161
162         ticker := time.NewTicker(time.Duration(config.RunPeriod))
163
164         // The unbuffered channel here means we only hear SIGUSR1 if
165         // it arrives while we're waiting in select{}.
166         sigUSR1 := make(chan os.Signal)
167         signal.Notify(sigUSR1, syscall.SIGUSR1)
168
169         logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
170
171         for {
172                 if !runOptions.CommitPulls && !runOptions.CommitTrash {
173                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
174                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
175                 }
176
177                 bal := &Balancer{}
178                 var err error
179                 runOptions, err = bal.Run(config, runOptions)
180                 if err != nil {
181                         logger.Print("run failed: ", err)
182                 } else {
183                         logger.Print("run succeeded")
184                 }
185
186                 select {
187                 case <-stop:
188                         signal.Stop(sigUSR1)
189                         return nil
190                 case <-ticker.C:
191                         logger.Print("timer went off")
192                 case <-sigUSR1:
193                         logger.Print("received SIGUSR1, resetting timer")
194                         // Reset the timer so we don't start the N+1st
195                         // run too soon after the Nth run is triggered
196                         // by SIGUSR1.
197                         ticker.Stop()
198                         ticker = time.NewTicker(time.Duration(config.RunPeriod))
199                 }
200                 logger.Print("starting next run")
201         }
202 }