8a938ccf5308393311a121835c1eaef631f194ca
[arvados.git] / services / keep-balance / main.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "encoding/json"
9         "flag"
10         "log"
11         "os"
12         "os/signal"
13         "syscall"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/config"
18 )
19
20 const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
21
22 // Config specifies site configuration, like API credentials and the
23 // choice of which servers are to be balanced.
24 //
25 // Config is loaded from a JSON config file (see usage()).
26 type Config struct {
27         // Arvados API endpoint and credentials.
28         Client arvados.Client
29
30         // List of service types (e.g., "disk") to balance.
31         KeepServiceTypes []string
32
33         KeepServiceList arvados.KeepServiceList
34
35         // How often to check
36         RunPeriod arvados.Duration
37
38         // Number of collections to request in each API call
39         CollectionBatchSize int
40
41         // Max collections to buffer in memory (bigger values consume
42         // more memory, but can reduce store-and-forward latency when
43         // fetching pages)
44         CollectionBuffers int
45 }
46
47 // RunOptions controls runtime behavior. The flags/options that belong
48 // here are the ones that are useful for interactive use. For example,
49 // "CommitTrash" is a runtime option rather than a config item because
50 // it invokes a troubleshooting feature rather than expressing how
51 // balancing is meant to be done at a given site.
52 //
53 // RunOptions fields are controlled by command line flags.
54 type RunOptions struct {
55         Once        bool
56         CommitPulls bool
57         CommitTrash bool
58         Logger      *log.Logger
59         Dumper      *log.Logger
60
61         // SafeRendezvousState from the most recent balance operation,
62         // or "" if unknown. If this changes from one run to the next,
63         // we need to watch out for races. See
64         // (*Balancer)ClearTrashLists.
65         SafeRendezvousState string
66 }
67
68 var debugf = func(string, ...interface{}) {}
69
70 func main() {
71         var cfg Config
72         var runOptions RunOptions
73
74         configPath := flag.String("config", defaultConfigPath,
75                 "`path` of JSON or YAML configuration file")
76         serviceListPath := flag.String("config.KeepServiceList", "",
77                 "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
78                         "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
79         flag.BoolVar(&runOptions.Once, "once", false,
80                 "balance once and then exit")
81         flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
82                 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
83         flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
84                 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
85         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
86         dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
87         debugFlag := flag.Bool("debug", false, "enable debug messages")
88         flag.Usage = usage
89         flag.Parse()
90
91         mustReadConfig(&cfg, *configPath)
92         if *serviceListPath != "" {
93                 mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
94         }
95
96         if *dumpConfig {
97                 log.Fatal(config.DumpAndExit(cfg))
98         }
99
100         if *debugFlag {
101                 debugf = log.Printf
102                 if j, err := json.Marshal(cfg); err != nil {
103                         log.Fatal(err)
104                 } else {
105                         log.Printf("config is %s", j)
106                 }
107         }
108         if *dumpFlag {
109                 runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
110         }
111         err := CheckConfig(cfg, runOptions)
112         if err != nil {
113                 // (don't run)
114         } else if runOptions.Once {
115                 _, err = (&Balancer{}).Run(cfg, runOptions)
116         } else {
117                 err = RunForever(cfg, runOptions, nil)
118         }
119         if err != nil {
120                 log.Fatal(err)
121         }
122 }
123
124 func mustReadConfig(dst interface{}, path string) {
125         if err := config.LoadFile(dst, path); err != nil {
126                 log.Fatal(err)
127         }
128 }
129
130 // RunForever runs forever, or (for testing purposes) until the given
131 // stop channel is ready to receive.
132 func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
133         if runOptions.Logger == nil {
134                 runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
135         }
136         logger := runOptions.Logger
137
138         ticker := time.NewTicker(time.Duration(config.RunPeriod))
139
140         // The unbuffered channel here means we only hear SIGUSR1 if
141         // it arrives while we're waiting in select{}.
142         sigUSR1 := make(chan os.Signal)
143         signal.Notify(sigUSR1, syscall.SIGUSR1)
144
145         logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
146
147         for {
148                 if !runOptions.CommitPulls && !runOptions.CommitTrash {
149                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
150                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
151                 }
152
153                 bal := &Balancer{}
154                 var err error
155                 runOptions, err = bal.Run(config, runOptions)
156                 if err != nil {
157                         logger.Print("run failed: ", err)
158                 } else {
159                         logger.Print("run succeeded")
160                 }
161
162                 select {
163                 case <-stop:
164                         signal.Stop(sigUSR1)
165                         return nil
166                 case <-ticker.C:
167                         logger.Print("timer went off")
168                 case <-sigUSR1:
169                         logger.Print("received SIGUSR1, resetting timer")
170                         // Reset the timer so we don't start the N+1st
171                         // run too soon after the Nth run is triggered
172                         // by SIGUSR1.
173                         ticker.Stop()
174                         ticker = time.NewTicker(time.Duration(config.RunPeriod))
175                 }
176                 logger.Print("starting next run")
177         }
178 }