X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/92b530ffbb5022192f00977183e591ae81240347..aa9507e1633819259794ea4d6cf391dc88621dac:/services/keep-balance/main.go diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go index 364bb3ffd3..b016db22ff 100644 --- a/services/keep-balance/main.go +++ b/services/keep-balance/main.go @@ -1,164 +1,118 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepbalance import ( - "encoding/json" + "bytes" + "context" "flag" - "io/ioutil" - "log" - "os" - "os/signal" - "syscall" - "time" - - "git.curoverse.com/arvados.git/sdk/go/arvados" + "fmt" + "io" + _ "net/http/pprof" + + "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/config" + "git.arvados.org/arvados.git/lib/service" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/sdk/go/health" + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" + "github.com/prometheus/client_golang/prometheus" ) -// Config specifies site configuration, like API credentials and the -// choice of which servers are to be balanced. -// -// Config is loaded from a JSON config file (see usage()). -type Config struct { - // Arvados API endpoint and credentials. - Client arvados.Client - - // List of service types (e.g., "disk") to balance. - KeepServiceTypes []string - - KeepServiceList arvados.KeepServiceList - - // How often to check - RunPeriod arvados.Duration - - // Number of collections to request in each API call - CollectionBatchSize int - - // Max collections to buffer in memory (bigger values consume - // more memory, but can reduce store-and-forward latency when - // fetching pages) - CollectionBuffers int -} - -// RunOptions controls runtime behavior. The flags/options that belong -// here are the ones that are useful for interactive use. For example, -// "CommitTrash" is a runtime option rather than a config item because -// it invokes a troubleshooting feature rather than expressing how -// balancing is meant to be done at a given site. -// -// RunOptions fields are controlled by command line flags. -type RunOptions struct { - Once bool - CommitPulls bool - CommitTrash bool - Logger *log.Logger - Dumper *log.Logger -} - -var debugf = func(string, ...interface{}) {} +type command struct{} -func main() { - var config Config - var runOptions RunOptions +var Command = command{} - configPath := flag.String("config", "", - "`path` of json configuration file") - serviceListPath := flag.String("config.KeepServiceList", "", - "`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+ - "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])") - flag.BoolVar(&runOptions.Once, "once", false, +func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + var options RunOptions + flags := flag.NewFlagSet(prog, flag.ContinueOnError) + flags.BoolVar(&options.Once, "once", false, "balance once and then exit") - flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false, + flags.BoolVar(&options.CommitPulls, "commit-pulls", false, "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)") - flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false, + flags.BoolVar(&options.CommitTrash, "commit-trash", false, "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)") - dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout") - debugFlag := flag.Bool("debug", false, "enable debug messages") - flag.Usage = usage - flag.Parse() - - if *configPath == "" { - log.Fatal("You must specify a config file (see `keep-balance -help`)") - } - mustReadJSON(&config, *configPath) - if *serviceListPath != "" { - mustReadJSON(&config.KeepServiceList, *serviceListPath) + flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true, + "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)") + // These options are implemented by service.Command, so we + // don't need the vars here -- we just need the flags + // to pass flags.Parse(). + flags.Bool("dump", false, "dump details for each block to stdout") + flags.String("pprof", "", "serve Go profile data at `[addr]:port`") + flags.Bool("version", false, "Write version information to stdout and exit 0") + + logger := ctxlog.New(stderr, "json", "info") + loader := config.NewLoader(&bytes.Buffer{}, logger) + loader.SetupFlags(flags) + munged := loader.MungeLegacyConfigArgs(logger, args, "-legacy-keepbalance-config") + if ok, code := cmd.ParseFlags(flags, prog, munged, "", stderr); !ok { + return code } - if *debugFlag { - debugf = log.Printf - if j, err := json.Marshal(config); err != nil { - log.Fatal(err) - } else { - log.Printf("config is %s", j) - } - } - if *dumpFlag { - runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags) + // Drop our custom args that would be rejected by the generic + // service.Command + args = nil + dropFlag := map[string]bool{ + "once": true, + "commit-pulls": true, + "commit-trash": true, + "commit-confirmed-fields": true, + "dump": true, } - err := CheckConfig(config, runOptions) - if err != nil { - // (don't run) - } else if runOptions.Once { - err = (&Balancer{}).Run(config, runOptions) - } else { - err = RunForever(config, runOptions, nil) - } - if err != nil { - log.Fatal(err) - } -} - -func mustReadJSON(dst interface{}, path string) { - if buf, err := ioutil.ReadFile(path); err != nil { - log.Fatalf("Reading %q: %v", path, err) - } else if err = json.Unmarshal(buf, dst); err != nil { - log.Fatalf("Decoding %q: %v", path, err) - } -} - -// RunForever runs forever, or (for testing purposes) until the given -// stop channel is ready to receive. -func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error { - if runOptions.Logger == nil { - runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags) - } - logger := runOptions.Logger - - ticker := time.NewTicker(time.Duration(config.RunPeriod)) - - // The unbuffered channel here means we only hear SIGUSR1 if - // it arrives while we're waiting in select{}. - sigUSR1 := make(chan os.Signal) - signal.Notify(sigUSR1, syscall.SIGUSR1) - - logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod) - - for { - if !runOptions.CommitPulls && !runOptions.CommitTrash { - logger.Print("WARNING: Will scan periodically, but no changes will be committed.") - logger.Print("======= Consider using -commit-pulls and -commit-trash flags.") - } - - err := (&Balancer{}).Run(config, runOptions) - if err != nil { - logger.Print("run failed: ", err) - } else { - logger.Print("run succeeded") - } - - select { - case <-stop: - signal.Stop(sigUSR1) - return nil - case <-ticker.C: - logger.Print("timer went off") - case <-sigUSR1: - logger.Print("received SIGUSR1, resetting timer") - // Reset the timer so we don't start the N+1st - // run too soon after the Nth run is triggered - // by SIGUSR1. - ticker.Stop() - ticker = time.NewTicker(time.Duration(config.RunPeriod)) + flags.Visit(func(f *flag.Flag) { + if !dropFlag[f.Name] { + args = append(args, "-"+f.Name+"="+f.Value.String()) } - logger.Print("starting next run") - } + }) + + return service.Command(arvados.ServiceNameKeepbalance, + func(ctx context.Context, cluster *arvados.Cluster, token string, registry *prometheus.Registry) service.Handler { + if !options.Once && cluster.Collections.BalancePeriod == arvados.Duration(0) { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("cannot start service: Collections.BalancePeriod is zero (if you want to run once and then exit, use the -once flag)")) + } + + ac, err := arvados.NewClientFromConfig(cluster) + ac.AuthToken = token + if err != nil { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err)) + } + + db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String()) + if err != nil { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err)) + } + if p := cluster.PostgreSQL.ConnectionPool; p > 0 { + db.SetMaxOpenConns(p) + } + err = db.Ping() + if err != nil { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err)) + } + + if options.Logger == nil { + options.Logger = ctxlog.FromContext(ctx) + } + + srv := &Server{ + Cluster: cluster, + ArvClient: ac, + RunOptions: options, + Metrics: newMetrics(registry), + Logger: options.Logger, + Dumper: options.Dumper, + DB: db, + } + srv.Handler = &health.Handler{ + Token: cluster.ManagementToken, + Prefix: "/_health/", + Routes: health.Routes{"ping": srv.CheckHealth}, + } + + go srv.run(ctx) + return srv + }).RunCommand(prog, args, stdin, stdout, stderr) }