17574: Get collections directly from DB instead of controller.
[arvados.git] / services / keep-balance / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "net/http"
9         "os"
10         "os/signal"
11         "syscall"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "github.com/jmoiron/sqlx"
16         "github.com/sirupsen/logrus"
17 )
18
19 // RunOptions controls runtime behavior. The flags/options that belong
20 // here are the ones that are useful for interactive use. For example,
21 // "CommitTrash" is a runtime option rather than a config item because
22 // it invokes a troubleshooting feature rather than expressing how
23 // balancing is meant to be done at a given site.
24 //
25 // RunOptions fields are controlled by command line flags.
26 type RunOptions struct {
27         Once        bool
28         CommitPulls bool
29         CommitTrash bool
30         Logger      logrus.FieldLogger
31         Dumper      logrus.FieldLogger
32
33         // SafeRendezvousState from the most recent balance operation,
34         // or "" if unknown. If this changes from one run to the next,
35         // we need to watch out for races. See
36         // (*Balancer)ClearTrashLists.
37         SafeRendezvousState string
38 }
39
40 type Server struct {
41         http.Handler
42
43         Cluster    *arvados.Cluster
44         ArvClient  *arvados.Client
45         RunOptions RunOptions
46         Metrics    *metrics
47
48         Logger logrus.FieldLogger
49         Dumper logrus.FieldLogger
50
51         DB *sqlx.DB
52 }
53
54 // CheckHealth implements service.Handler.
55 func (srv *Server) CheckHealth() error {
56         return srv.DB.Ping()
57 }
58
59 // Done implements service.Handler.
60 func (srv *Server) Done() <-chan struct{} {
61         return nil
62 }
63
64 func (srv *Server) run() {
65         var err error
66         if srv.RunOptions.Once {
67                 _, err = srv.runOnce()
68         } else {
69                 err = srv.runForever(nil)
70         }
71         if err != nil {
72                 srv.Logger.Error(err)
73                 os.Exit(1)
74         } else {
75                 os.Exit(0)
76         }
77 }
78
79 func (srv *Server) runOnce() (*Balancer, error) {
80         bal := &Balancer{
81                 DB:             srv.DB,
82                 Logger:         srv.Logger,
83                 Dumper:         srv.Dumper,
84                 Metrics:        srv.Metrics,
85                 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
86         }
87         var err error
88         srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
89         return bal, err
90 }
91
92 // RunForever runs forever, or (for testing purposes) until the given
93 // stop channel is ready to receive.
94 func (srv *Server) runForever(stop <-chan interface{}) error {
95         logger := srv.Logger
96
97         ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
98
99         // The unbuffered channel here means we only hear SIGUSR1 if
100         // it arrives while we're waiting in select{}.
101         sigUSR1 := make(chan os.Signal)
102         signal.Notify(sigUSR1, syscall.SIGUSR1)
103
104         logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
105
106         for {
107                 if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
108                         logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
109                         logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
110                 }
111
112                 _, err := srv.runOnce()
113                 if err != nil {
114                         logger.Print("run failed: ", err)
115                 } else {
116                         logger.Print("run succeeded")
117                 }
118
119                 select {
120                 case <-stop:
121                         signal.Stop(sigUSR1)
122                         return nil
123                 case <-ticker.C:
124                         logger.Print("timer went off")
125                 case <-sigUSR1:
126                         logger.Print("received SIGUSR1, resetting timer")
127                         // Reset the timer so we don't start the N+1st
128                         // run too soon after the Nth run is triggered
129                         // by SIGUSR1.
130                         ticker.Stop()
131                         ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
132                 }
133                 logger.Print("starting next run")
134         }
135 }