14714: keep-balance uses cluster config
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "net/http"
10         "os"
11         "os/signal"
12         "syscall"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
17         "github.com/sirupsen/logrus"
18 )
19
20 // RunOptions controls runtime behavior. The flags/options that belong
21 // here are the ones that are useful for interactive use. For example,
22 // "CommitTrash" is a runtime option rather than a config item because
23 // it invokes a troubleshooting feature rather than expressing how
24 // balancing is meant to be done at a given site.
25 //
26 // RunOptions fields are controlled by command line flags.
27 type RunOptions struct {
28         Once        bool
29         CommitPulls bool
30         CommitTrash bool
31         Logger      logrus.FieldLogger
32         Dumper      logrus.FieldLogger
33
34         // SafeRendezvousState from the most recent balance operation,
35         // or "" if unknown. If this changes from one run to the next,
36         // we need to watch out for races. See
37         // (*Balancer)ClearTrashLists.
38         SafeRendezvousState string
39 }
40
41 type Server struct {
42         http.Handler
43         Cluster    *arvados.Cluster
44         ArvClient  *arvados.Client
45         RunOptions RunOptions
46         Metrics    *metrics
47
48         Logger logrus.FieldLogger
49         Dumper logrus.FieldLogger
50 }
51
52 // CheckHealth implements service.Handler.
53 func (srv *Server) CheckHealth() error {
54         return nil
55 }
56
57 // Start sets up and runs the balancer.
58 func (srv *Server) Start(ctx context.Context) {
59         if srv.RunOptions.Logger == nil {
60                 srv.RunOptions.Logger = ctxlog.FromContext(ctx)
61         }
62
63         srv.Logger = srv.RunOptions.Logger
64         srv.Dumper = srv.RunOptions.Dumper
65
66         var err error
67         if srv.RunOptions.Once {
68                 _, err = srv.run()
69         } else {
70                 err = srv.runForever(nil)
71         }
72         if err != nil {
73                 srv.Logger.Error(err)
74         }
75 }
76
77 func (srv *Server) run() (*Balancer, error) {
78         bal := &Balancer{
79                 Logger:         srv.Logger,
80                 Dumper:         srv.Dumper,
81                 Metrics:        srv.Metrics,
82                 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
83         }
84         var err error
85         srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
86         return bal, err
87 }
88
89 // RunForever runs forever, or (for testing purposes) until the given
90 // stop channel is ready to receive.
91 func (srv *Server) runForever(stop <-chan interface{}) error {
92         logger := srv.Logger
93
94         ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
95
96         // The unbuffered channel here means we only hear SIGUSR1 if
97         // it arrives while we're waiting in select{}.
98         sigUSR1 := make(chan os.Signal)
99         signal.Notify(sigUSR1, syscall.SIGUSR1)
100
101         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
102
103         for {
104                 if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
105                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
106                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
107                 }
108
109                 _, err := srv.run()
110                 if err != nil {
111                         logger.Print("run failed: ", err)
112                 } else {
113                         logger.Print("run succeeded")
114                 }
115
116                 select {
117                 case <-stop:
118                         signal.Stop(sigUSR1)
119                         return nil
120                 case <-ticker.C:
121                         logger.Print("timer went off")
122                 case <-sigUSR1:
123                         logger.Print("received SIGUSR1, resetting timer")
124                         // Reset the timer so we don't start the N+1st
125                         // run too soon after the Nth run is triggered
126                         // by SIGUSR1.
127                         ticker.Stop()
128                         ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
129                 }
130                 logger.Print("starting next run")
131         }
132 }