Merge branch '19678-job-loader' refs #19678
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "context"
9         "net/http"
10         "os"
11         "os/signal"
12         "syscall"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/controller/dblock"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "github.com/jmoiron/sqlx"
18         "github.com/sirupsen/logrus"
19 )
20
21 // RunOptions controls runtime behavior. The flags/options that belong
22 // here are the ones that are useful for interactive use. For example,
23 // "CommitTrash" is a runtime option rather than a config item because
24 // it invokes a troubleshooting feature rather than expressing how
25 // balancing is meant to be done at a given site.
26 //
27 // RunOptions fields are controlled by command line flags.
28 type RunOptions struct {
29         Once                  bool
30         CommitPulls           bool
31         CommitTrash           bool
32         CommitConfirmedFields bool
33         Logger                logrus.FieldLogger
34         Dumper                logrus.FieldLogger
35
36         // SafeRendezvousState from the most recent balance operation,
37         // or "" if unknown. If this changes from one run to the next,
38         // we need to watch out for races. See
39         // (*Balancer)ClearTrashLists.
40         SafeRendezvousState string
41 }
42
43 type Server struct {
44         http.Handler
45
46         Cluster    *arvados.Cluster
47         ArvClient  *arvados.Client
48         RunOptions RunOptions
49         Metrics    *metrics
50
51         Logger logrus.FieldLogger
52         Dumper logrus.FieldLogger
53
54         DB *sqlx.DB
55 }
56
57 // CheckHealth implements service.Handler.
58 func (srv *Server) CheckHealth() error {
59         return srv.DB.Ping()
60 }
61
62 // Done implements service.Handler.
63 func (srv *Server) Done() <-chan struct{} {
64         return nil
65 }
66
67 func (srv *Server) run(ctx context.Context) {
68         var err error
69         if srv.RunOptions.Once {
70                 _, err = srv.runOnce(ctx)
71         } else {
72                 err = srv.runForever(ctx)
73         }
74         if err != nil {
75                 srv.Logger.Error(err)
76                 os.Exit(1)
77         } else {
78                 os.Exit(0)
79         }
80 }
81
82 func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
83         bal := &Balancer{
84                 DB:             srv.DB,
85                 Logger:         srv.Logger,
86                 Dumper:         srv.Dumper,
87                 Metrics:        srv.Metrics,
88                 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
89         }
90         var err error
91         srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
92         return bal, err
93 }
94
95 // RunForever runs forever, or until ctx is cancelled.
96 func (srv *Server) runForever(ctx context.Context) error {
97         logger := srv.Logger
98
99         ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
100
101         // The unbuffered channel here means we only hear SIGUSR1 if
102         // it arrives while we're waiting in select{}.
103         sigUSR1 := make(chan os.Signal)
104         signal.Notify(sigUSR1, syscall.SIGUSR1)
105
106         logger.Info("acquiring service lock")
107         dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
108         defer dblock.KeepBalanceService.Unlock()
109
110         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
111
112         for {
113                 if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
114                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
115                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
116                 }
117
118                 if !dblock.KeepBalanceService.Check() {
119                         // context canceled
120                         return nil
121                 }
122                 _, err := srv.runOnce(ctx)
123                 if err != nil {
124                         logger.Print("run failed: ", err)
125                 } else {
126                         logger.Print("run succeeded")
127                 }
128
129                 select {
130                 case <-ctx.Done():
131                         signal.Stop(sigUSR1)
132                         return nil
133                 case <-ticker.C:
134                         logger.Print("timer went off")
135                 case <-sigUSR1:
136                         logger.Print("received SIGUSR1, resetting timer")
137                         // Reset the timer so we don't start the N+1st
138                         // run too soon after the Nth run is triggered
139                         // by SIGUSR1.
140                         ticker.Stop()
141                         ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
142                 }
143                 logger.Print("starting next run")
144         }
145 }