Merge branch '13168-cr-modified-user'
[arvados.git] / services / keep-balance / main.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "encoding/json"
9         "flag"
10         "fmt"
11         "log"
12         "os"
13         "os/signal"
14         "syscall"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "git.curoverse.com/arvados.git/sdk/go/config"
19 )
20
21 var version = "dev"
22
23 const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
24
25 // Config specifies site configuration, like API credentials and the
26 // choice of which servers are to be balanced.
27 //
28 // Config is loaded from a JSON config file (see usage()).
29 type Config struct {
30         // Arvados API endpoint and credentials.
31         Client arvados.Client
32
33         // List of service types (e.g., "disk") to balance.
34         KeepServiceTypes []string
35
36         KeepServiceList arvados.KeepServiceList
37
38         // How often to check
39         RunPeriod arvados.Duration
40
41         // Number of collections to request in each API call
42         CollectionBatchSize int
43
44         // Max collections to buffer in memory (bigger values consume
45         // more memory, but can reduce store-and-forward latency when
46         // fetching pages)
47         CollectionBuffers int
48 }
49
50 // RunOptions controls runtime behavior. The flags/options that belong
51 // here are the ones that are useful for interactive use. For example,
52 // "CommitTrash" is a runtime option rather than a config item because
53 // it invokes a troubleshooting feature rather than expressing how
54 // balancing is meant to be done at a given site.
55 //
56 // RunOptions fields are controlled by command line flags.
57 type RunOptions struct {
58         Once        bool
59         CommitPulls bool
60         CommitTrash bool
61         Logger      *log.Logger
62         Dumper      *log.Logger
63
64         // SafeRendezvousState from the most recent balance operation,
65         // or "" if unknown. If this changes from one run to the next,
66         // we need to watch out for races. See
67         // (*Balancer)ClearTrashLists.
68         SafeRendezvousState string
69 }
70
71 var debugf = func(string, ...interface{}) {}
72
73 func main() {
74         var cfg Config
75         var runOptions RunOptions
76
77         configPath := flag.String("config", defaultConfigPath,
78                 "`path` of JSON or YAML configuration file")
79         serviceListPath := flag.String("config.KeepServiceList", "",
80                 "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
81                         "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
82         flag.BoolVar(&runOptions.Once, "once", false,
83                 "balance once and then exit")
84         flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
85                 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
86         flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
87                 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
88         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
89         dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
90         debugFlag := flag.Bool("debug", false, "enable debug messages")
91         getVersion := flag.Bool("version", false, "Print version information and exit.")
92         flag.Usage = usage
93         flag.Parse()
94
95         // Print version information if requested
96         if *getVersion {
97                 fmt.Printf("keep-balance %s\n", version)
98                 return
99         }
100
101         mustReadConfig(&cfg, *configPath)
102         if *serviceListPath != "" {
103                 mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
104         }
105
106         if *dumpConfig {
107                 log.Fatal(config.DumpAndExit(cfg))
108         }
109
110         log.Printf("keep-balance %s started", version)
111
112         if *debugFlag {
113                 debugf = log.Printf
114                 if j, err := json.Marshal(cfg); err != nil {
115                         log.Fatal(err)
116                 } else {
117                         log.Printf("config is %s", j)
118                 }
119         }
120         if *dumpFlag {
121                 runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
122         }
123         err := CheckConfig(cfg, runOptions)
124         if err != nil {
125                 // (don't run)
126         } else if runOptions.Once {
127                 _, err = (&Balancer{}).Run(cfg, runOptions)
128         } else {
129                 err = RunForever(cfg, runOptions, nil)
130         }
131         if err != nil {
132                 log.Fatal(err)
133         }
134 }
135
136 func mustReadConfig(dst interface{}, path string) {
137         if err := config.LoadFile(dst, path); err != nil {
138                 log.Fatal(err)
139         }
140 }
141
142 // RunForever runs forever, or (for testing purposes) until the given
143 // stop channel is ready to receive.
144 func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
145         if runOptions.Logger == nil {
146                 runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
147         }
148         logger := runOptions.Logger
149
150         ticker := time.NewTicker(time.Duration(config.RunPeriod))
151
152         // The unbuffered channel here means we only hear SIGUSR1 if
153         // it arrives while we're waiting in select{}.
154         sigUSR1 := make(chan os.Signal)
155         signal.Notify(sigUSR1, syscall.SIGUSR1)
156
157         logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
158
159         for {
160                 if !runOptions.CommitPulls && !runOptions.CommitTrash {
161                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
162                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
163                 }
164
165                 bal := &Balancer{}
166                 var err error
167                 runOptions, err = bal.Run(config, runOptions)
168                 if err != nil {
169                         logger.Print("run failed: ", err)
170                 } else {
171                         logger.Print("run succeeded")
172                 }
173
174                 select {
175                 case <-stop:
176                         signal.Stop(sigUSR1)
177                         return nil
178                 case <-ticker.C:
179                         logger.Print("timer went off")
180                 case <-sigUSR1:
181                         logger.Print("received SIGUSR1, resetting timer")
182                         // Reset the timer so we don't start the N+1st
183                         // run too soon after the Nth run is triggered
184                         // by SIGUSR1.
185                         ticker.Stop()
186                         ticker = time.NewTicker(time.Duration(config.RunPeriod))
187                 }
188                 logger.Print("starting next run")
189         }
190 }