14714: Tests use cluster config
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "net/http"
10         "os"
11         "os/signal"
12         "syscall"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "git.curoverse.com/arvados.git/sdk/go/auth"
17         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
18         "github.com/julienschmidt/httprouter"
19         "github.com/prometheus/client_golang/prometheus/promhttp"
20         "github.com/sirupsen/logrus"
21 )
22
23 // RunOptions controls runtime behavior. The flags/options that belong
24 // here are the ones that are useful for interactive use. For example,
25 // "CommitTrash" is a runtime option rather than a config item because
26 // it invokes a troubleshooting feature rather than expressing how
27 // balancing is meant to be done at a given site.
28 //
29 // RunOptions fields are controlled by command line flags.
30 type RunOptions struct {
31         Once        bool
32         CommitPulls bool
33         CommitTrash bool
34         Logger      logrus.FieldLogger
35         Dumper      logrus.FieldLogger
36
37         // SafeRendezvousState from the most recent balance operation,
38         // or "" if unknown. If this changes from one run to the next,
39         // we need to watch out for races. See
40         // (*Balancer)ClearTrashLists.
41         SafeRendezvousState string
42 }
43
44 type Server struct {
45         Cluster    *arvados.Cluster
46         ArvClient  *arvados.Client
47         RunOptions RunOptions
48         Metrics    *metrics
49
50         httpHandler http.Handler
51
52         Logger logrus.FieldLogger
53         Dumper logrus.FieldLogger
54 }
55
56 // ServeHTTP implements service.Handler.
57 func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
58         srv.httpHandler.ServeHTTP(w, r)
59 }
60
61 // CheckHealth implements service.Handler.
62 func (srv *Server) CheckHealth() error {
63         return nil
64 }
65
66 // Start sets up and runs the balancer.
67 func (srv *Server) Start(ctx context.Context) {
68         srv.init(ctx)
69
70         var err error
71         if srv.RunOptions.Once {
72                 _, err = srv.runOnce()
73         } else {
74                 err = srv.runForever(nil)
75         }
76         if err != nil {
77                 srv.Logger.Error(err)
78         }
79 }
80
81 func (srv *Server) init(ctx context.Context) {
82         if srv.RunOptions.Logger == nil {
83                 srv.RunOptions.Logger = ctxlog.FromContext(ctx)
84         }
85
86         srv.Logger = srv.RunOptions.Logger
87         srv.Dumper = srv.RunOptions.Dumper
88
89         if srv.Cluster.ManagementToken == "" {
90                 srv.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
91                         http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
92                 })
93         } else {
94                 mux := httprouter.New()
95                 metricsH := promhttp.HandlerFor(srv.Metrics.reg, promhttp.HandlerOpts{
96                         ErrorLog: srv.Logger,
97                 })
98                 mux.Handler("GET", "/metrics", metricsH)
99                 mux.Handler("GET", "/metrics.json", metricsH)
100                 srv.httpHandler = auth.RequireLiteralToken(srv.Cluster.ManagementToken, mux)
101         }
102 }
103
104 func (srv *Server) runOnce() (*Balancer, error) {
105         bal := &Balancer{
106                 Logger:         srv.Logger,
107                 Dumper:         srv.Dumper,
108                 Metrics:        srv.Metrics,
109                 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
110         }
111         var err error
112         srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
113         return bal, err
114 }
115
116 // RunForever runs forever, or (for testing purposes) until the given
117 // stop channel is ready to receive.
118 func (srv *Server) runForever(stop <-chan interface{}) error {
119         logger := srv.Logger
120
121         ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
122
123         // The unbuffered channel here means we only hear SIGUSR1 if
124         // it arrives while we're waiting in select{}.
125         sigUSR1 := make(chan os.Signal)
126         signal.Notify(sigUSR1, syscall.SIGUSR1)
127
128         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
129
130         for {
131                 if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
132                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
133                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
134                 }
135
136                 _, err := srv.runOnce()
137                 if err != nil {
138                         logger.Print("run failed: ", err)
139                 } else {
140                         logger.Print("run succeeded")
141                 }
142
143                 select {
144                 case <-stop:
145                         signal.Stop(sigUSR1)
146                         return nil
147                 case <-ticker.C:
148                         logger.Print("timer went off")
149                 case <-sigUSR1:
150                         logger.Print("received SIGUSR1, resetting timer")
151                         // Reset the timer so we don't start the N+1st
152                         // run too soon after the Nth run is triggered
153                         // by SIGUSR1.
154                         ticker.Stop()
155                         ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
156                 }
157                 logger.Print("starting next run")
158         }
159 }