14714: Simplifies server startup and removes globals
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "net/http"
9         "os"
10         "os/signal"
11         "syscall"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "git.curoverse.com/arvados.git/sdk/go/auth"
16         "github.com/julienschmidt/httprouter"
17         "github.com/prometheus/client_golang/prometheus/promhttp"
18         "github.com/sirupsen/logrus"
19 )
20
21 // RunOptions controls runtime behavior. The flags/options that belong
22 // here are the ones that are useful for interactive use. For example,
23 // "CommitTrash" is a runtime option rather than a config item because
24 // it invokes a troubleshooting feature rather than expressing how
25 // balancing is meant to be done at a given site.
26 //
27 // RunOptions fields are controlled by command line flags.
28 type RunOptions struct {
29         Once        bool
30         CommitPulls bool
31         CommitTrash bool
32         Logger      logrus.FieldLogger
33         Dumper      logrus.FieldLogger
34
35         // SafeRendezvousState from the most recent balance operation,
36         // or "" if unknown. If this changes from one run to the next,
37         // we need to watch out for races. See
38         // (*Balancer)ClearTrashLists.
39         SafeRendezvousState string
40 }
41
42 type Server struct {
43         Cluster    *arvados.Cluster
44         ArvClient  *arvados.Client
45         RunOptions RunOptions
46         Metrics    *metrics
47
48         httpHandler http.Handler
49
50         Logger logrus.FieldLogger
51         Dumper logrus.FieldLogger
52 }
53
54 // ServeHTTP implements service.Handler.
55 func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
56         srv.httpHandler.ServeHTTP(w, r)
57 }
58
59 // CheckHealth implements service.Handler.
60 func (srv *Server) CheckHealth() error {
61         return nil
62 }
63
64 func (srv *Server) setup() {
65         if srv.Cluster.ManagementToken == "" {
66                 srv.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
67                         http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
68                 })
69         } else {
70                 mux := httprouter.New()
71                 metricsH := promhttp.HandlerFor(srv.Metrics.reg, promhttp.HandlerOpts{
72                         ErrorLog: srv.Logger,
73                 })
74                 mux.Handler("GET", "/metrics", metricsH)
75                 mux.Handler("GET", "/metrics.json", metricsH)
76                 srv.httpHandler = auth.RequireLiteralToken(srv.Cluster.ManagementToken, mux)
77         }
78 }
79
80 func (srv *Server) run() {
81         var err error
82         if srv.RunOptions.Once {
83                 _, err = srv.runOnce()
84         } else {
85                 err = srv.runForever(nil)
86         }
87         if err != nil {
88                 srv.Logger.Error(err)
89         }
90 }
91
92 func (srv *Server) runOnce() (*Balancer, error) {
93         bal := &Balancer{
94                 Logger:         srv.Logger,
95                 Dumper:         srv.Dumper,
96                 Metrics:        srv.Metrics,
97                 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
98         }
99         var err error
100         srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
101         return bal, err
102 }
103
104 // RunForever runs forever, or (for testing purposes) until the given
105 // stop channel is ready to receive.
106 func (srv *Server) runForever(stop <-chan interface{}) error {
107         logger := srv.Logger
108
109         ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
110
111         // The unbuffered channel here means we only hear SIGUSR1 if
112         // it arrives while we're waiting in select{}.
113         sigUSR1 := make(chan os.Signal)
114         signal.Notify(sigUSR1, syscall.SIGUSR1)
115
116         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
117
118         for {
119                 if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
120                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
121                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
122                 }
123
124                 _, err := srv.runOnce()
125                 if err != nil {
126                         logger.Print("run failed: ", err)
127                 } else {
128                         logger.Print("run succeeded")
129                 }
130
131                 select {
132                 case <-stop:
133                         signal.Stop(sigUSR1)
134                         return nil
135                 case <-ticker.C:
136                         logger.Print("timer went off")
137                 case <-sigUSR1:
138                         logger.Print("received SIGUSR1, resetting timer")
139                         // Reset the timer so we don't start the N+1st
140                         // run too soon after the Nth run is triggered
141                         // by SIGUSR1.
142                         ticker.Stop()
143                         ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
144                 }
145                 logger.Print("starting next run")
146         }
147 }