10516: set finished_at to updated_at on pipeline_instances if the pipeline is finishe...
[arvados.git] / services / keep-balance / main.go
1 package main
2
3 import (
4         "encoding/json"
5         "flag"
6         "log"
7         "os"
8         "os/signal"
9         "syscall"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13         "git.curoverse.com/arvados.git/sdk/go/config"
14 )
15
16 const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
17
18 // Config specifies site configuration, like API credentials and the
19 // choice of which servers are to be balanced.
20 //
21 // Config is loaded from a JSON config file (see usage()).
22 type Config struct {
23         // Arvados API endpoint and credentials.
24         Client arvados.Client
25
26         // List of service types (e.g., "disk") to balance.
27         KeepServiceTypes []string
28
29         KeepServiceList arvados.KeepServiceList
30
31         // How often to check
32         RunPeriod arvados.Duration
33
34         // Number of collections to request in each API call
35         CollectionBatchSize int
36
37         // Max collections to buffer in memory (bigger values consume
38         // more memory, but can reduce store-and-forward latency when
39         // fetching pages)
40         CollectionBuffers int
41 }
42
43 // RunOptions controls runtime behavior. The flags/options that belong
44 // here are the ones that are useful for interactive use. For example,
45 // "CommitTrash" is a runtime option rather than a config item because
46 // it invokes a troubleshooting feature rather than expressing how
47 // balancing is meant to be done at a given site.
48 //
49 // RunOptions fields are controlled by command line flags.
50 type RunOptions struct {
51         Once        bool
52         CommitPulls bool
53         CommitTrash bool
54         Logger      *log.Logger
55         Dumper      *log.Logger
56
57         // SafeRendezvousState from the most recent balance operation,
58         // or "" if unknown. If this changes from one run to the next,
59         // we need to watch out for races. See
60         // (*Balancer)ClearTrashLists.
61         SafeRendezvousState string
62 }
63
64 var debugf = func(string, ...interface{}) {}
65
66 func main() {
67         var config Config
68         var runOptions RunOptions
69
70         configPath := flag.String("config", defaultConfigPath,
71                 "`path` of JSON or YAML configuration file")
72         serviceListPath := flag.String("config.KeepServiceList", "",
73                 "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
74                         "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
75         flag.BoolVar(&runOptions.Once, "once", false,
76                 "balance once and then exit")
77         flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
78                 "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
79         flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
80                 "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
81         dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
82         debugFlag := flag.Bool("debug", false, "enable debug messages")
83         flag.Usage = usage
84         flag.Parse()
85
86         mustReadConfig(&config, *configPath)
87         if *serviceListPath != "" {
88                 mustReadConfig(&config.KeepServiceList, *serviceListPath)
89         }
90
91         if *debugFlag {
92                 debugf = log.Printf
93                 if j, err := json.Marshal(config); err != nil {
94                         log.Fatal(err)
95                 } else {
96                         log.Printf("config is %s", j)
97                 }
98         }
99         if *dumpFlag {
100                 runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
101         }
102         err := CheckConfig(config, runOptions)
103         if err != nil {
104                 // (don't run)
105         } else if runOptions.Once {
106                 _, err = (&Balancer{}).Run(config, runOptions)
107         } else {
108                 err = RunForever(config, runOptions, nil)
109         }
110         if err != nil {
111                 log.Fatal(err)
112         }
113 }
114
115 func mustReadConfig(dst interface{}, path string) {
116         if err := config.LoadFile(dst, path); err != nil {
117                 log.Fatal(err)
118         }
119 }
120
121 // RunForever runs forever, or (for testing purposes) until the given
122 // stop channel is ready to receive.
123 func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
124         if runOptions.Logger == nil {
125                 runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
126         }
127         logger := runOptions.Logger
128
129         ticker := time.NewTicker(time.Duration(config.RunPeriod))
130
131         // The unbuffered channel here means we only hear SIGUSR1 if
132         // it arrives while we're waiting in select{}.
133         sigUSR1 := make(chan os.Signal)
134         signal.Notify(sigUSR1, syscall.SIGUSR1)
135
136         logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
137
138         for {
139                 if !runOptions.CommitPulls && !runOptions.CommitTrash {
140                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
141                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
142                 }
143
144                 bal := &Balancer{}
145                 var err error
146                 runOptions, err = bal.Run(config, runOptions)
147                 if err != nil {
148                         logger.Print("run failed: ", err)
149                 } else {
150                         logger.Print("run succeeded")
151                 }
152
153                 select {
154                 case <-stop:
155                         signal.Stop(sigUSR1)
156                         return nil
157                 case <-ticker.C:
158                         logger.Print("timer went off")
159                 case <-sigUSR1:
160                         logger.Print("received SIGUSR1, resetting timer")
161                         // Reset the timer so we don't start the N+1st
162                         // run too soon after the Nth run is triggered
163                         // by SIGUSR1.
164                         ticker.Stop()
165                         ticker = time.NewTicker(time.Duration(config.RunPeriod))
166                 }
167                 logger.Print("starting next run")
168         }
169 }