16718: Merge branch 'master' into 16718-group-contents-collection-versions
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "net/http"
9         "os"
10         "os/signal"
11         "syscall"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "github.com/sirupsen/logrus"
16 )
17
18 // RunOptions controls runtime behavior. The flags/options that belong
19 // here are the ones that are useful for interactive use. For example,
20 // "CommitTrash" is a runtime option rather than a config item because
21 // it invokes a troubleshooting feature rather than expressing how
22 // balancing is meant to be done at a given site.
23 //
24 // RunOptions fields are controlled by command line flags.
25 type RunOptions struct {
26         Once        bool
27         CommitPulls bool
28         CommitTrash bool
29         Logger      logrus.FieldLogger
30         Dumper      logrus.FieldLogger
31
32         // SafeRendezvousState from the most recent balance operation,
33         // or "" if unknown. If this changes from one run to the next,
34         // we need to watch out for races. See
35         // (*Balancer)ClearTrashLists.
36         SafeRendezvousState string
37 }
38
39 type Server struct {
40         http.Handler
41
42         Cluster    *arvados.Cluster
43         ArvClient  *arvados.Client
44         RunOptions RunOptions
45         Metrics    *metrics
46
47         Logger logrus.FieldLogger
48         Dumper logrus.FieldLogger
49 }
50
51 // CheckHealth implements service.Handler.
52 func (srv *Server) CheckHealth() error {
53         return nil
54 }
55
56 // Done implements service.Handler.
57 func (srv *Server) Done() <-chan struct{} {
58         return nil
59 }
60
61 func (srv *Server) run() {
62         var err error
63         if srv.RunOptions.Once {
64                 _, err = srv.runOnce()
65         } else {
66                 err = srv.runForever(nil)
67         }
68         if err != nil {
69                 srv.Logger.Error(err)
70                 os.Exit(1)
71         } else {
72                 os.Exit(0)
73         }
74 }
75
76 func (srv *Server) runOnce() (*Balancer, error) {
77         bal := &Balancer{
78                 Logger:         srv.Logger,
79                 Dumper:         srv.Dumper,
80                 Metrics:        srv.Metrics,
81                 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
82         }
83         var err error
84         srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
85         return bal, err
86 }
87
88 // RunForever runs forever, or (for testing purposes) until the given
89 // stop channel is ready to receive.
90 func (srv *Server) runForever(stop <-chan interface{}) error {
91         logger := srv.Logger
92
93         ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
94
95         // The unbuffered channel here means we only hear SIGUSR1 if
96         // it arrives while we're waiting in select{}.
97         sigUSR1 := make(chan os.Signal)
98         signal.Notify(sigUSR1, syscall.SIGUSR1)
99
100         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
101
102         for {
103                 if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
104                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
105                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
106                 }
107
108                 _, err := srv.runOnce()
109                 if err != nil {
110                         logger.Print("run failed: ", err)
111                 } else {
112                         logger.Print("run succeeded")
113                 }
114
115                 select {
116                 case <-stop:
117                         signal.Stop(sigUSR1)
118                         return nil
119                 case <-ticker.C:
120                         logger.Print("timer went off")
121                 case <-sigUSR1:
122                         logger.Print("received SIGUSR1, resetting timer")
123                         // Reset the timer so we don't start the N+1st
124                         // run too soon after the Nth run is triggered
125                         // by SIGUSR1.
126                         ticker.Stop()
127                         ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
128                 }
129                 logger.Print("starting next run")
130         }
131 }