14287: Use ctxlog for httpserver logging.
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "fmt"
10         "net/http"
11         "os"
12         "os/signal"
13         "syscall"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/auth"
18         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
19         "git.curoverse.com/arvados.git/sdk/go/httpserver"
20         "github.com/sirupsen/logrus"
21 )
22
23 var version = "dev"
24
25 const (
26         defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
27         rfc3339NanoFixed  = "2006-01-02T15:04:05.000000000Z07:00"
28 )
29
30 // Config specifies site configuration, like API credentials and the
31 // choice of which servers are to be balanced.
32 //
33 // Config is loaded from a JSON config file (see usage()).
34 type Config struct {
35         // Arvados API endpoint and credentials.
36         Client arvados.Client
37
38         // List of service types (e.g., "disk") to balance.
39         KeepServiceTypes []string
40
41         KeepServiceList arvados.KeepServiceList
42
43         // address, address:port, or :port for management interface
44         Listen string
45
46         // token for management APIs
47         ManagementToken string
48
49         // How often to check
50         RunPeriod arvados.Duration
51
52         // Number of collections to request in each API call
53         CollectionBatchSize int
54
55         // Max collections to buffer in memory (bigger values consume
56         // more memory, but can reduce store-and-forward latency when
57         // fetching pages)
58         CollectionBuffers int
59
60         // Timeout for outgoing http request/response cycle.
61         RequestTimeout arvados.Duration
62
63         // Destination filename for the list of lost block hashes, one
64         // per line. Updated atomically during each successful run.
65         LostBlocksFile string
66 }
67
68 // RunOptions controls runtime behavior. The flags/options that belong
69 // here are the ones that are useful for interactive use. For example,
70 // "CommitTrash" is a runtime option rather than a config item because
71 // it invokes a troubleshooting feature rather than expressing how
72 // balancing is meant to be done at a given site.
73 //
74 // RunOptions fields are controlled by command line flags.
75 type RunOptions struct {
76         Once        bool
77         CommitPulls bool
78         CommitTrash bool
79         Logger      logrus.FieldLogger
80         Dumper      logrus.FieldLogger
81
82         // SafeRendezvousState from the most recent balance operation,
83         // or "" if unknown. If this changes from one run to the next,
84         // we need to watch out for races. See
85         // (*Balancer)ClearTrashLists.
86         SafeRendezvousState string
87 }
88
89 type Server struct {
90         config     Config
91         runOptions RunOptions
92         metrics    *metrics
93         listening  string // for tests
94
95         Logger logrus.FieldLogger
96         Dumper logrus.FieldLogger
97 }
98
99 // NewServer returns a new Server that runs Balancers using the given
100 // config and runOptions.
101 func NewServer(config Config, runOptions RunOptions) (*Server, error) {
102         if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
103                 return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
104         }
105         if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
106                 return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
107         }
108
109         if runOptions.Logger == nil {
110                 log := logrus.New()
111                 log.Formatter = &logrus.JSONFormatter{
112                         TimestampFormat: rfc3339NanoFixed,
113                 }
114                 log.Out = os.Stderr
115                 runOptions.Logger = log
116         }
117
118         srv := &Server{
119                 config:     config,
120                 runOptions: runOptions,
121                 metrics:    newMetrics(),
122                 Logger:     runOptions.Logger,
123                 Dumper:     runOptions.Dumper,
124         }
125         return srv, srv.start()
126 }
127
128 func (srv *Server) start() error {
129         if srv.config.Listen == "" {
130                 return nil
131         }
132         ctx := ctxlog.Context(context.Background(), srv.Logger)
133         server := &httpserver.Server{
134                 Server: http.Server{
135                         Handler: httpserver.HandlerWithContext(ctx,
136                                 httpserver.LogRequests(
137                                         auth.RequireLiteralToken(srv.config.ManagementToken,
138                                                 srv.metrics.Handler(srv.Logger)))),
139                 },
140                 Addr: srv.config.Listen,
141         }
142         err := server.Start()
143         if err != nil {
144                 return err
145         }
146         srv.Logger.Printf("listening at %s", server.Addr)
147         srv.listening = server.Addr
148         return nil
149 }
150
151 func (srv *Server) Run() (*Balancer, error) {
152         bal := &Balancer{
153                 Logger:         srv.Logger,
154                 Dumper:         srv.Dumper,
155                 Metrics:        srv.metrics,
156                 LostBlocksFile: srv.config.LostBlocksFile,
157         }
158         var err error
159         srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
160         return bal, err
161 }
162
163 // RunForever runs forever, or (for testing purposes) until the given
164 // stop channel is ready to receive.
165 func (srv *Server) RunForever(stop <-chan interface{}) error {
166         logger := srv.runOptions.Logger
167
168         ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
169
170         // The unbuffered channel here means we only hear SIGUSR1 if
171         // it arrives while we're waiting in select{}.
172         sigUSR1 := make(chan os.Signal)
173         signal.Notify(sigUSR1, syscall.SIGUSR1)
174
175         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
176
177         for {
178                 if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
179                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
180                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
181                 }
182
183                 _, err := srv.Run()
184                 if err != nil {
185                         logger.Print("run failed: ", err)
186                 } else {
187                         logger.Print("run succeeded")
188                 }
189
190                 select {
191                 case <-stop:
192                         signal.Stop(sigUSR1)
193                         return nil
194                 case <-ticker.C:
195                         logger.Print("timer went off")
196                 case <-sigUSR1:
197                         logger.Print("received SIGUSR1, resetting timer")
198                         // Reset the timer so we don't start the N+1st
199                         // run too soon after the Nth run is triggered
200                         // by SIGUSR1.
201                         ticker.Stop()
202                         ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
203                 }
204                 logger.Print("starting next run")
205         }
206 }