Merge branch '18339-sweep-trash-lock'
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "net/http"
9         "os"
10         "os/signal"
11         "syscall"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "github.com/jmoiron/sqlx"
16         "github.com/sirupsen/logrus"
17 )
18
19 // RunOptions controls runtime behavior. The flags/options that belong
20 // here are the ones that are useful for interactive use. For example,
21 // "CommitTrash" is a runtime option rather than a config item because
22 // it invokes a troubleshooting feature rather than expressing how
23 // balancing is meant to be done at a given site.
24 //
25 // RunOptions fields are controlled by command line flags.
26 type RunOptions struct {
27         Once                  bool
28         CommitPulls           bool
29         CommitTrash           bool
30         CommitConfirmedFields bool
31         Logger                logrus.FieldLogger
32         Dumper                logrus.FieldLogger
33
34         // SafeRendezvousState from the most recent balance operation,
35         // or "" if unknown. If this changes from one run to the next,
36         // we need to watch out for races. See
37         // (*Balancer)ClearTrashLists.
38         SafeRendezvousState string
39 }
40
41 type Server struct {
42         http.Handler
43
44         Cluster    *arvados.Cluster
45         ArvClient  *arvados.Client
46         RunOptions RunOptions
47         Metrics    *metrics
48
49         Logger logrus.FieldLogger
50         Dumper logrus.FieldLogger
51
52         DB *sqlx.DB
53 }
54
55 // CheckHealth implements service.Handler.
56 func (srv *Server) CheckHealth() error {
57         return srv.DB.Ping()
58 }
59
60 // Done implements service.Handler.
61 func (srv *Server) Done() <-chan struct{} {
62         return nil
63 }
64
65 func (srv *Server) run() {
66         var err error
67         if srv.RunOptions.Once {
68                 _, err = srv.runOnce()
69         } else {
70                 err = srv.runForever(nil)
71         }
72         if err != nil {
73                 srv.Logger.Error(err)
74                 os.Exit(1)
75         } else {
76                 os.Exit(0)
77         }
78 }
79
80 func (srv *Server) runOnce() (*Balancer, error) {
81         bal := &Balancer{
82                 DB:             srv.DB,
83                 Logger:         srv.Logger,
84                 Dumper:         srv.Dumper,
85                 Metrics:        srv.Metrics,
86                 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
87         }
88         var err error
89         srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
90         return bal, err
91 }
92
93 // RunForever runs forever, or (for testing purposes) until the given
94 // stop channel is ready to receive.
95 func (srv *Server) runForever(stop <-chan interface{}) error {
96         logger := srv.Logger
97
98         ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
99
100         // The unbuffered channel here means we only hear SIGUSR1 if
101         // it arrives while we're waiting in select{}.
102         sigUSR1 := make(chan os.Signal)
103         signal.Notify(sigUSR1, syscall.SIGUSR1)
104
105         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
106
107         for {
108                 if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
109                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
110                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
111                 }
112
113                 _, err := srv.runOnce()
114                 if err != nil {
115                         logger.Print("run failed: ", err)
116                 } else {
117                         logger.Print("run succeeded")
118                 }
119
120                 select {
121                 case <-stop:
122                         signal.Stop(sigUSR1)
123                         return nil
124                 case <-ticker.C:
125                         logger.Print("timer went off")
126                 case <-sigUSR1:
127                         logger.Print("received SIGUSR1, resetting timer")
128                         // Reset the timer so we don't start the N+1st
129                         // run too soon after the Nth run is triggered
130                         // by SIGUSR1.
131                         ticker.Stop()
132                         ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
133                 }
134                 logger.Print("starting next run")
135         }
136 }