Merge branch '15781-multi-value-property-search'
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "net/http"
9         "os"
10         "os/signal"
11         "syscall"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "github.com/sirupsen/logrus"
16 )
17
18 // RunOptions controls runtime behavior. The flags/options that belong
19 // here are the ones that are useful for interactive use. For example,
20 // "CommitTrash" is a runtime option rather than a config item because
21 // it invokes a troubleshooting feature rather than expressing how
22 // balancing is meant to be done at a given site.
23 //
24 // RunOptions fields are controlled by command line flags.
25 type RunOptions struct {
26         Once        bool
27         CommitPulls bool
28         CommitTrash bool
29         Logger      logrus.FieldLogger
30         Dumper      logrus.FieldLogger
31
32         // SafeRendezvousState from the most recent balance operation,
33         // or "" if unknown. If this changes from one run to the next,
34         // we need to watch out for races. See
35         // (*Balancer)ClearTrashLists.
36         SafeRendezvousState string
37 }
38
39 type Server struct {
40         http.Handler
41
42         Cluster    *arvados.Cluster
43         ArvClient  *arvados.Client
44         RunOptions RunOptions
45         Metrics    *metrics
46
47         Logger logrus.FieldLogger
48         Dumper logrus.FieldLogger
49 }
50
51 // CheckHealth implements service.Handler.
52 func (srv *Server) CheckHealth() error {
53         return nil
54 }
55
56 func (srv *Server) run() {
57         var err error
58         if srv.RunOptions.Once {
59                 _, err = srv.runOnce()
60         } else {
61                 err = srv.runForever(nil)
62         }
63         if err != nil {
64                 srv.Logger.Error(err)
65                 os.Exit(1)
66         } else {
67                 os.Exit(0)
68         }
69 }
70
71 func (srv *Server) runOnce() (*Balancer, error) {
72         bal := &Balancer{
73                 Logger:         srv.Logger,
74                 Dumper:         srv.Dumper,
75                 Metrics:        srv.Metrics,
76                 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
77         }
78         var err error
79         srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
80         return bal, err
81 }
82
83 // RunForever runs forever, or (for testing purposes) until the given
84 // stop channel is ready to receive.
85 func (srv *Server) runForever(stop <-chan interface{}) error {
86         logger := srv.Logger
87
88         ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
89
90         // The unbuffered channel here means we only hear SIGUSR1 if
91         // it arrives while we're waiting in select{}.
92         sigUSR1 := make(chan os.Signal)
93         signal.Notify(sigUSR1, syscall.SIGUSR1)
94
95         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
96
97         for {
98                 if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
99                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
100                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
101                 }
102
103                 _, err := srv.runOnce()
104                 if err != nil {
105                         logger.Print("run failed: ", err)
106                 } else {
107                         logger.Print("run succeeded")
108                 }
109
110                 select {
111                 case <-stop:
112                         signal.Stop(sigUSR1)
113                         return nil
114                 case <-ticker.C:
115                         logger.Print("timer went off")
116                 case <-sigUSR1:
117                         logger.Print("received SIGUSR1, resetting timer")
118                         // Reset the timer so we don't start the N+1st
119                         // run too soon after the Nth run is triggered
120                         // by SIGUSR1.
121                         ticker.Stop()
122                         ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
123                 }
124                 logger.Print("starting next run")
125         }
126 }