21227: Fix unbuffered channels used for signal.Notify.
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "context"
9         "net/http"
10         "os"
11         "os/signal"
12         "syscall"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/controller/dblock"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "github.com/jmoiron/sqlx"
18         "github.com/sirupsen/logrus"
19 )
20
21 // RunOptions controls runtime behavior. The flags/options that belong
22 // here are the ones that are useful for interactive use. For example,
23 // "CommitTrash" is a runtime option rather than a config item because
24 // it invokes a troubleshooting feature rather than expressing how
25 // balancing is meant to be done at a given site.
26 //
27 // RunOptions fields are controlled by command line flags.
28 type RunOptions struct {
29         Once                  bool
30         CommitConfirmedFields bool
31         ChunkPrefix           string
32         Logger                logrus.FieldLogger
33         Dumper                logrus.FieldLogger
34
35         // SafeRendezvousState from the most recent balance operation,
36         // or "" if unknown. If this changes from one run to the next,
37         // we need to watch out for races. See
38         // (*Balancer)ClearTrashLists.
39         SafeRendezvousState string
40 }
41
42 type Server struct {
43         http.Handler
44
45         Cluster    *arvados.Cluster
46         ArvClient  *arvados.Client
47         RunOptions RunOptions
48         Metrics    *metrics
49
50         Logger logrus.FieldLogger
51         Dumper logrus.FieldLogger
52
53         DB *sqlx.DB
54 }
55
56 // CheckHealth implements service.Handler.
57 func (srv *Server) CheckHealth() error {
58         return srv.DB.Ping()
59 }
60
61 // Done implements service.Handler.
62 func (srv *Server) Done() <-chan struct{} {
63         return nil
64 }
65
66 func (srv *Server) run(ctx context.Context) {
67         var err error
68         if srv.RunOptions.Once {
69                 _, err = srv.runOnce(ctx)
70         } else {
71                 err = srv.runForever(ctx)
72         }
73         if err != nil {
74                 srv.Logger.Error(err)
75                 os.Exit(1)
76         } else {
77                 os.Exit(0)
78         }
79 }
80
81 func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
82         bal := &Balancer{
83                 DB:             srv.DB,
84                 Logger:         srv.Logger,
85                 Dumper:         srv.Dumper,
86                 Metrics:        srv.Metrics,
87                 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
88                 ChunkPrefix:    srv.RunOptions.ChunkPrefix,
89         }
90         var err error
91         srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
92         return bal, err
93 }
94
95 // RunForever runs forever, or until ctx is cancelled.
96 func (srv *Server) runForever(ctx context.Context) error {
97         logger := srv.Logger
98
99         ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
100
101         sigUSR1 := make(chan os.Signal, 1)
102         signal.Notify(sigUSR1, syscall.SIGUSR1)
103
104         logger.Info("acquiring service lock")
105         dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
106         defer dblock.KeepBalanceService.Unlock()
107
108         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
109
110         for {
111                 if srv.Cluster.Collections.BalancePullLimit < 1 && srv.Cluster.Collections.BalanceTrashLimit < 1 {
112                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
113                         logger.Print("=======  To commit changes, set BalancePullLimit and BalanceTrashLimit values greater than zero.")
114                 }
115
116                 if !dblock.KeepBalanceService.Check() {
117                         // context canceled
118                         return nil
119                 }
120                 _, err := srv.runOnce(ctx)
121                 if err != nil {
122                         logger.Print("run failed: ", err)
123                 } else {
124                         logger.Print("run succeeded")
125                 }
126
127                 select {
128                 case <-ctx.Done():
129                         signal.Stop(sigUSR1)
130                         return nil
131                 case <-ticker.C:
132                         logger.Print("timer went off")
133                 case <-sigUSR1:
134                         logger.Print("received SIGUSR1, resetting timer")
135                         // Reset the timer so we don't start the N+1st
136                         // run too soon after the Nth run is triggered
137                         // by SIGUSR1.
138                         ticker.Stop()
139                         ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
140                 }
141                 logger.Print("starting next run")
142         }
143 }