20937: fix deadlock with duplicated blocks
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "context"
9         "net/http"
10         "os"
11         "os/signal"
12         "syscall"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/controller/dblock"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "github.com/jmoiron/sqlx"
18         "github.com/sirupsen/logrus"
19 )
20
21 // RunOptions controls runtime behavior. The flags/options that belong
22 // here are the ones that are useful for interactive use. For example,
23 // "CommitTrash" is a runtime option rather than a config item because
24 // it invokes a troubleshooting feature rather than expressing how
25 // balancing is meant to be done at a given site.
26 //
27 // RunOptions fields are controlled by command line flags.
28 type RunOptions struct {
29         Once                  bool
30         CommitPulls           bool
31         CommitTrash           bool
32         CommitConfirmedFields bool
33         ChunkPrefix           string
34         Logger                logrus.FieldLogger
35         Dumper                logrus.FieldLogger
36
37         // SafeRendezvousState from the most recent balance operation,
38         // or "" if unknown. If this changes from one run to the next,
39         // we need to watch out for races. See
40         // (*Balancer)ClearTrashLists.
41         SafeRendezvousState string
42 }
43
44 type Server struct {
45         http.Handler
46
47         Cluster    *arvados.Cluster
48         ArvClient  *arvados.Client
49         RunOptions RunOptions
50         Metrics    *metrics
51
52         Logger logrus.FieldLogger
53         Dumper logrus.FieldLogger
54
55         DB *sqlx.DB
56 }
57
58 // CheckHealth implements service.Handler.
59 func (srv *Server) CheckHealth() error {
60         return srv.DB.Ping()
61 }
62
63 // Done implements service.Handler.
64 func (srv *Server) Done() <-chan struct{} {
65         return nil
66 }
67
68 func (srv *Server) run(ctx context.Context) {
69         var err error
70         if srv.RunOptions.Once {
71                 _, err = srv.runOnce(ctx)
72         } else {
73                 err = srv.runForever(ctx)
74         }
75         if err != nil {
76                 srv.Logger.Error(err)
77                 os.Exit(1)
78         } else {
79                 os.Exit(0)
80         }
81 }
82
83 func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
84         bal := &Balancer{
85                 DB:             srv.DB,
86                 Logger:         srv.Logger,
87                 Dumper:         srv.Dumper,
88                 Metrics:        srv.Metrics,
89                 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
90                 ChunkPrefix:    srv.RunOptions.ChunkPrefix,
91         }
92         var err error
93         srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
94         return bal, err
95 }
96
97 // RunForever runs forever, or until ctx is cancelled.
98 func (srv *Server) runForever(ctx context.Context) error {
99         logger := srv.Logger
100
101         ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
102
103         // The unbuffered channel here means we only hear SIGUSR1 if
104         // it arrives while we're waiting in select{}.
105         sigUSR1 := make(chan os.Signal)
106         signal.Notify(sigUSR1, syscall.SIGUSR1)
107
108         logger.Info("acquiring service lock")
109         dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
110         defer dblock.KeepBalanceService.Unlock()
111
112         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
113
114         for {
115                 if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
116                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
117                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
118                 }
119
120                 if !dblock.KeepBalanceService.Check() {
121                         // context canceled
122                         return nil
123                 }
124                 _, err := srv.runOnce(ctx)
125                 if err != nil {
126                         logger.Print("run failed: ", err)
127                 } else {
128                         logger.Print("run succeeded")
129                 }
130
131                 select {
132                 case <-ctx.Done():
133                         signal.Stop(sigUSR1)
134                         return nil
135                 case <-ticker.C:
136                         logger.Print("timer went off")
137                 case <-sigUSR1:
138                         logger.Print("received SIGUSR1, resetting timer")
139                         // Reset the timer so we don't start the N+1st
140                         // run too soon after the Nth run is triggered
141                         // by SIGUSR1.
142                         ticker.Stop()
143                         ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
144                 }
145                 logger.Print("starting next run")
146         }
147 }