14714: Removes context from Server functions. Adds sync.Once to setup
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "net/http"
9         "os"
10         "os/signal"
11         "sync"
12         "syscall"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "git.curoverse.com/arvados.git/sdk/go/auth"
17         "github.com/julienschmidt/httprouter"
18         "github.com/prometheus/client_golang/prometheus/promhttp"
19         "github.com/sirupsen/logrus"
20 )
21
22 // RunOptions controls runtime behavior. The flags/options that belong
23 // here are the ones that are useful for interactive use. For example,
24 // "CommitTrash" is a runtime option rather than a config item because
25 // it invokes a troubleshooting feature rather than expressing how
26 // balancing is meant to be done at a given site.
27 //
28 // RunOptions fields are controlled by command line flags.
29 type RunOptions struct {
30         Once        bool
31         CommitPulls bool
32         CommitTrash bool
33         Logger      logrus.FieldLogger
34         Dumper      logrus.FieldLogger
35
36         // SafeRendezvousState from the most recent balance operation,
37         // or "" if unknown. If this changes from one run to the next,
38         // we need to watch out for races. See
39         // (*Balancer)ClearTrashLists.
40         SafeRendezvousState string
41 }
42
43 type Server struct {
44         Cluster    *arvados.Cluster
45         ArvClient  *arvados.Client
46         RunOptions RunOptions
47         Metrics    *metrics
48
49         httpHandler http.Handler
50         setupOnce   sync.Once
51
52         Logger logrus.FieldLogger
53         Dumper logrus.FieldLogger
54 }
55
56 // ServeHTTP implements service.Handler.
57 func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
58         srv.httpHandler.ServeHTTP(w, r)
59 }
60
61 // CheckHealth implements service.Handler.
62 func (srv *Server) CheckHealth() error {
63         return nil
64 }
65
66 // Start sets up and runs the balancer.
67 func (srv *Server) Start() {
68         srv.setupOnce.Do(srv.setup)
69         go srv.run()
70 }
71
72 func (srv *Server) setup() {
73         if srv.Cluster.ManagementToken == "" {
74                 srv.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
75                         http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
76                 })
77         } else {
78                 mux := httprouter.New()
79                 metricsH := promhttp.HandlerFor(srv.Metrics.reg, promhttp.HandlerOpts{
80                         ErrorLog: srv.Logger,
81                 })
82                 mux.Handler("GET", "/metrics", metricsH)
83                 mux.Handler("GET", "/metrics.json", metricsH)
84                 srv.httpHandler = auth.RequireLiteralToken(srv.Cluster.ManagementToken, mux)
85         }
86 }
87
88 func (srv *Server) run() {
89         var err error
90         if srv.RunOptions.Once {
91                 _, err = srv.runOnce()
92         } else {
93                 err = srv.runForever(nil)
94         }
95         if err != nil {
96                 srv.Logger.Error(err)
97         }
98 }
99
100 func (srv *Server) runOnce() (*Balancer, error) {
101         bal := &Balancer{
102                 Logger:         srv.Logger,
103                 Dumper:         srv.Dumper,
104                 Metrics:        srv.Metrics,
105                 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
106         }
107         var err error
108         srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
109         return bal, err
110 }
111
112 // RunForever runs forever, or (for testing purposes) until the given
113 // stop channel is ready to receive.
114 func (srv *Server) runForever(stop <-chan interface{}) error {
115         logger := srv.Logger
116
117         ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
118
119         // The unbuffered channel here means we only hear SIGUSR1 if
120         // it arrives while we're waiting in select{}.
121         sigUSR1 := make(chan os.Signal)
122         signal.Notify(sigUSR1, syscall.SIGUSR1)
123
124         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
125
126         for {
127                 if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
128                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
129                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
130                 }
131
132                 _, err := srv.runOnce()
133                 if err != nil {
134                         logger.Print("run failed: ", err)
135                 } else {
136                         logger.Print("run succeeded")
137                 }
138
139                 select {
140                 case <-stop:
141                         signal.Stop(sigUSR1)
142                         return nil
143                 case <-ticker.C:
144                         logger.Print("timer went off")
145                 case <-sigUSR1:
146                         logger.Print("received SIGUSR1, resetting timer")
147                         // Reset the timer so we don't start the N+1st
148                         // run too soon after the Nth run is triggered
149                         // by SIGUSR1.
150                         ticker.Stop()
151                         ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
152                 }
153                 logger.Print("starting next run")
154         }
155 }