1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "git.curoverse.com/arvados.git/sdk/go/auth"
17 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
18 "github.com/julienschmidt/httprouter"
19 "github.com/prometheus/client_golang/prometheus/promhttp"
20 "github.com/sirupsen/logrus"
23 // RunOptions controls runtime behavior. The flags/options that belong
24 // here are the ones that are useful for interactive use. For example,
25 // "CommitTrash" is a runtime option rather than a config item because
26 // it invokes a troubleshooting feature rather than expressing how
27 // balancing is meant to be done at a given site.
29 // RunOptions fields are controlled by command line flags.
30 type RunOptions struct {
34 Logger logrus.FieldLogger
35 Dumper logrus.FieldLogger
37 // SafeRendezvousState from the most recent balance operation,
38 // or "" if unknown. If this changes from one run to the next,
39 // we need to watch out for races. See
40 // (*Balancer)ClearTrashLists.
41 SafeRendezvousState string
45 Cluster *arvados.Cluster
46 ArvClient *arvados.Client
50 httpHandler http.Handler
52 Logger logrus.FieldLogger
53 Dumper logrus.FieldLogger
56 // ServeHTTP implements service.Handler.
57 func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
58 srv.httpHandler.ServeHTTP(w, r)
61 // CheckHealth implements service.Handler.
62 func (srv *Server) CheckHealth() error {
66 // Start sets up and runs the balancer.
67 func (srv *Server) Start(ctx context.Context) {
71 if srv.RunOptions.Once {
72 _, err = srv.runOnce()
74 err = srv.runForever(nil)
81 func (srv *Server) init(ctx context.Context) {
82 if srv.RunOptions.Logger == nil {
83 srv.RunOptions.Logger = ctxlog.FromContext(ctx)
86 srv.Logger = srv.RunOptions.Logger
87 srv.Dumper = srv.RunOptions.Dumper
89 if srv.Cluster.ManagementToken == "" {
90 srv.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
91 http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
94 mux := httprouter.New()
95 metricsH := promhttp.HandlerFor(srv.Metrics.reg, promhttp.HandlerOpts{
98 mux.Handler("GET", "/metrics", metricsH)
99 mux.Handler("GET", "/metrics.json", metricsH)
100 srv.httpHandler = auth.RequireLiteralToken(srv.Cluster.ManagementToken, mux)
104 func (srv *Server) runOnce() (*Balancer, error) {
108 Metrics: srv.Metrics,
109 LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
112 srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
116 // RunForever runs forever, or (for testing purposes) until the given
117 // stop channel is ready to receive.
118 func (srv *Server) runForever(stop <-chan interface{}) error {
121 ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
123 // The unbuffered channel here means we only hear SIGUSR1 if
124 // it arrives while we're waiting in select{}.
125 sigUSR1 := make(chan os.Signal)
126 signal.Notify(sigUSR1, syscall.SIGUSR1)
128 logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
131 if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
132 logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
133 logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
136 _, err := srv.runOnce()
138 logger.Print("run failed: ", err)
140 logger.Print("run succeeded")
148 logger.Print("timer went off")
150 logger.Print("received SIGUSR1, resetting timer")
151 // Reset the timer so we don't start the N+1st
152 // run too soon after the Nth run is triggered
155 ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
157 logger.Print("starting next run")