14285: Export stats as prometheus metrics.
[arvados.git] / services / keep-balance / main.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "encoding/json"
9         "flag"
10         "fmt"
11         "log"
12         "net/http"
13         "os"
14         "os/signal"
15         "syscall"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "git.curoverse.com/arvados.git/sdk/go/config"
20         "git.curoverse.com/arvados.git/sdk/go/httpserver"
21         "github.com/Sirupsen/logrus"
22 )
23
24 var version = "dev"
25
26 const (
27         defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
28         rfc3339NanoFixed  = "2006-01-02T15:04:05.000000000Z07:00"
29 )
30
31 // Config specifies site configuration, like API credentials and the
32 // choice of which servers are to be balanced.
33 //
34 // Config is loaded from a JSON config file (see usage()).
35 type Config struct {
36         // Arvados API endpoint and credentials.
37         Client arvados.Client
38
39         // List of service types (e.g., "disk") to balance.
40         KeepServiceTypes []string
41
42         KeepServiceList arvados.KeepServiceList
43
44         // address, address:port, or :port for management interface
45         Listen string
46
47         // How often to check
48         RunPeriod arvados.Duration
49
50         // Number of collections to request in each API call
51         CollectionBatchSize int
52
53         // Max collections to buffer in memory (bigger values consume
54         // more memory, but can reduce store-and-forward latency when
55         // fetching pages)
56         CollectionBuffers int
57
58         // Timeout for outgoing http request/response cycle.
59         RequestTimeout arvados.Duration
60 }
61
62 // RunOptions controls runtime behavior. The flags/options that belong
63 // here are the ones that are useful for interactive use. For example,
64 // "CommitTrash" is a runtime option rather than a config item because
65 // it invokes a troubleshooting feature rather than expressing how
66 // balancing is meant to be done at a given site.
67 //
68 // RunOptions fields are controlled by command line flags.
69 type RunOptions struct {
70         Once        bool
71         CommitPulls bool
72         CommitTrash bool
73         Logger      *logrus.Logger
74         Dumper      *logrus.Logger
75
76         // SafeRendezvousState from the most recent balance operation,
77         // or "" if unknown. If this changes from one run to the next,
78         // we need to watch out for races. See
79         // (*Balancer)ClearTrashLists.
80         SafeRendezvousState string
81 }
82
83 var debugf = func(string, ...interface{}) {}
84
85 func main() {
86         var cfg Config
87         var runOptions RunOptions
88
89         configPath := flag.String("config", defaultConfigPath,
90                 "`path` of JSON or YAML configuration file")
91         serviceListPath := flag.String("config.KeepServiceList", "",
92                 "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
93                         "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
94         flag.BoolVar(&runOptions.Once, "once", false,
95                 "balance once and then exit")
96         flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
97                 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
98         flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
99                 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
100         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
101         dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
102         debugFlag := flag.Bool("debug", false, "enable debug messages")
103         getVersion := flag.Bool("version", false, "Print version information and exit.")
104         flag.Usage = usage
105         flag.Parse()
106
107         // Print version information if requested
108         if *getVersion {
109                 fmt.Printf("keep-balance %s\n", version)
110                 return
111         }
112
113         mustReadConfig(&cfg, *configPath)
114         if *serviceListPath != "" {
115                 mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
116         }
117
118         if *dumpConfig {
119                 log.Fatal(config.DumpAndExit(cfg))
120         }
121
122         to := time.Duration(cfg.RequestTimeout)
123         if to == 0 {
124                 to = 30 * time.Minute
125         }
126         arvados.DefaultSecureClient.Timeout = to
127         arvados.InsecureHTTPClient.Timeout = to
128         http.DefaultClient.Timeout = to
129
130         log.Printf("keep-balance %s started", version)
131
132         if *debugFlag {
133                 debugf = log.Printf
134                 if j, err := json.Marshal(cfg); err != nil {
135                         log.Fatal(err)
136                 } else {
137                         log.Printf("config is %s", j)
138                 }
139         }
140         if *dumpFlag {
141                 runOptions.Dumper = logrus.New()
142                 runOptions.Dumper.Out = os.Stdout
143                 runOptions.Dumper.Formatter = &logrus.TextFormatter{}
144         }
145         srv, err := NewServer(cfg, runOptions)
146         if err != nil {
147                 // (don't run)
148         } else if runOptions.Once {
149                 _, err = srv.Run()
150         } else {
151                 err = srv.RunForever(nil)
152         }
153         if err != nil {
154                 log.Fatal(err)
155         }
156 }
157
158 func mustReadConfig(dst interface{}, path string) {
159         if err := config.LoadFile(dst, path); err != nil {
160                 log.Fatal(err)
161         }
162 }
163
164 type Server struct {
165         config     Config
166         runOptions RunOptions
167         metrics    *metrics
168         listening  string // for tests
169
170         Logger *logrus.Logger
171         Dumper *logrus.Logger
172 }
173
174 // NewServer returns a new Server that runs Balancers using the given
175 // config and runOptions.
176 func NewServer(config Config, runOptions RunOptions) (*Server, error) {
177         if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
178                 return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
179         }
180         if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
181                 return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
182         }
183
184         if runOptions.Logger == nil {
185                 log := logrus.New()
186                 log.Formatter = &logrus.JSONFormatter{
187                         TimestampFormat: rfc3339NanoFixed,
188                 }
189                 log.Out = os.Stderr
190                 runOptions.Logger = log
191         }
192
193         srv := &Server{
194                 config:     config,
195                 runOptions: runOptions,
196                 metrics:    newMetrics(),
197                 Logger:     runOptions.Logger,
198                 Dumper:     runOptions.Dumper,
199         }
200         return srv, srv.start()
201 }
202
203 func (srv *Server) start() error {
204         if srv.config.Listen == "" {
205                 return nil
206         }
207         server := &httpserver.Server{
208                 Server: http.Server{
209                         Handler: httpserver.LogRequests(srv.Logger, srv.metrics.Handler(srv.Logger)),
210                 },
211                 Addr: srv.config.Listen,
212         }
213         err := server.Start()
214         if err != nil {
215                 return err
216         }
217         srv.Logger.Printf("listening at %s", server.Addr)
218         srv.listening = server.Addr
219         return nil
220 }
221
222 func (srv *Server) Run() (*Balancer, error) {
223         bal := &Balancer{
224                 Logger:  srv.Logger,
225                 Dumper:  srv.Dumper,
226                 Metrics: srv.metrics,
227         }
228         var err error
229         srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
230         return bal, err
231 }
232
233 // RunForever runs forever, or (for testing purposes) until the given
234 // stop channel is ready to receive.
235 func (srv *Server) RunForever(stop <-chan interface{}) error {
236         logger := srv.runOptions.Logger
237
238         ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
239
240         // The unbuffered channel here means we only hear SIGUSR1 if
241         // it arrives while we're waiting in select{}.
242         sigUSR1 := make(chan os.Signal)
243         signal.Notify(sigUSR1, syscall.SIGUSR1)
244
245         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
246
247         for {
248                 if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
249                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
250                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
251                 }
252
253                 _, err := srv.Run()
254                 if err != nil {
255                         logger.Print("run failed: ", err)
256                 } else {
257                         logger.Print("run succeeded")
258                 }
259
260                 select {
261                 case <-stop:
262                         signal.Stop(sigUSR1)
263                         return nil
264                 case <-ticker.C:
265                         logger.Print("timer went off")
266                 case <-sigUSR1:
267                         logger.Print("received SIGUSR1, resetting timer")
268                         // Reset the timer so we don't start the N+1st
269                         // run too soon after the Nth run is triggered
270                         // by SIGUSR1.
271                         ticker.Stop()
272                         ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
273                 }
274                 logger.Print("starting next run")
275         }
276 }