20470: Fix discovery document generation to drop unpublished fields
[arvados.git] / services / keep-balance / main.go
index 6e7d70d4e842a66323597cee51f75b4114117273..6bc998958979fc41288cd051bb40eaae4a10de4e 100644 (file)
-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepbalance
 
 import (
 
 import (
-       "encoding/json"
+       "bytes"
+       "context"
        "flag"
        "flag"
-       "io/ioutil"
-       "log"
-       "os"
-       "os/signal"
-       "syscall"
-       "time"
-
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "fmt"
+       "io"
+       _ "net/http/pprof"
+
+       "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/lib/service"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/jmoiron/sqlx"
+       _ "github.com/lib/pq"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 )
 
-// Config specifies site configuration, like API credentials and the
-// choice of which servers are to be balanced.
-//
-// Config is loaded from a JSON config file (see usage()).
-type Config struct {
-       // Arvados API endpoint and credentials.
-       Client arvados.Client
-
-       // List of service types (e.g., "disk") to balance.
-       KeepServiceTypes []string
+type command struct{}
 
 
-       KeepServiceList arvados.KeepServiceList
+var Command = command{}
 
 
-       // How often to check
-       RunPeriod arvados.Duration
-
-       // Number of collections to request in each API call
-       CollectionBatchSize int
-}
-
-// RunOptions controls runtime behavior. The flags/options that belong
-// here are the ones that are useful for interactive use. For example,
-// "CommitTrash" is a runtime option rather than a config item because
-// it invokes a troubleshooting feature rather than expressing how
-// balancing is meant to be done at a given site.
-//
-// RunOptions fields are controlled by command line flags.
-type RunOptions struct {
-       Once        bool
-       CommitPulls bool
-       CommitTrash bool
-       Logger      *log.Logger
-       Dumper      *log.Logger
-}
-
-var debugf = func(string, ...interface{}) {}
-
-func main() {
-       var config Config
-       var runOptions RunOptions
-
-       configPath := flag.String("config", "",
-               "`path` of json configuration file")
-       serviceListPath := flag.String("config.KeepServiceList", "",
-               "`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+
-                       "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
-       flag.BoolVar(&runOptions.Once, "once", false,
+func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       var options RunOptions
+       flags := flag.NewFlagSet(prog, flag.ContinueOnError)
+       flags.BoolVar(&options.Once, "once", false,
                "balance once and then exit")
                "balance once and then exit")
-       flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+       flags.BoolVar(&options.CommitPulls, "commit-pulls", false,
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
-       flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+       flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
-       dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
-       debugFlag := flag.Bool("debug", false, "enable debug messages")
-       flag.Usage = usage
-       flag.Parse()
-
-       if *configPath == "" {
-               log.Fatal("You must specify a config file (see `keep-balance -help`)")
-       }
-       mustReadJSON(&config, *configPath)
-       if *serviceListPath != "" {
-               mustReadJSON(&config.KeepServiceList, *serviceListPath)
-       }
-
-       if *debugFlag {
-               debugf = log.Printf
-               if j, err := json.Marshal(config); err != nil {
-                       log.Fatal(err)
-               } else {
-                       log.Printf("config is %s", j)
-               }
-       }
-       if *dumpFlag {
-               runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
-       }
-       err := CheckConfig(config, runOptions)
-       if err != nil {
-               // (don't run)
-       } else if runOptions.Once {
-               err = (&Balancer{}).Run(config, runOptions)
-       } else {
-               err = RunForever(config, runOptions, nil)
+       flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
+               "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
+       flags.StringVar(&options.ChunkPrefix, "chunk-prefix", "",
+               "operate only on blocks with the given prefix (experimental, see https://dev.arvados.org/issues/19923)")
+       // These options are implemented by service.Command, so we
+       // don't need the vars here -- we just need the flags
+       // to pass flags.Parse().
+       flags.Bool("dump", false, "dump details for each block to stdout")
+       flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
+       flags.Bool("version", false, "Write version information to stdout and exit 0")
+
+       logger := ctxlog.New(stderr, "json", "info")
+       loader := config.NewLoader(&bytes.Buffer{}, logger)
+       loader.SetupFlags(flags)
+       munged := loader.MungeLegacyConfigArgs(logger, args, "-legacy-keepbalance-config")
+       if ok, code := cmd.ParseFlags(flags, prog, munged, "", stderr); !ok {
+               return code
        }
        }
-       if err != nil {
-               log.Fatal(err)
-       }
-}
 
 
-func mustReadJSON(dst interface{}, path string) {
-       if buf, err := ioutil.ReadFile(path); err != nil {
-               log.Fatalf("Reading %q: %v", path, err)
-       } else if err = json.Unmarshal(buf, dst); err != nil {
-               log.Fatalf("Decoding %q: %v", path, err)
+       // Drop our custom args that would be rejected by the generic
+       // service.Command
+       args = nil
+       dropFlag := map[string]bool{
+               "once":                    true,
+               "commit-pulls":            true,
+               "commit-trash":            true,
+               "commit-confirmed-fields": true,
+               "dump":                    true,
        }
        }
-}
-
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
-       if runOptions.Logger == nil {
-               runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
-       }
-       logger := runOptions.Logger
-
-       ticker := time.NewTicker(time.Duration(config.RunPeriod))
-
-       // The unbuffered channel here means we only hear SIGUSR1 if
-       // it arrives while we're waiting in select{}.
-       sigUSR1 := make(chan os.Signal)
-       signal.Notify(sigUSR1, syscall.SIGUSR1)
-
-       logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
-
-       for {
-               if !runOptions.CommitPulls && !runOptions.CommitTrash {
-                       logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
-                       logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
-               }
-
-               err := (&Balancer{}).Run(config, runOptions)
-               if err != nil {
-                       logger.Print("run failed: ", err)
-               } else {
-                       logger.Print("run succeeded")
-               }
-
-               select {
-               case <-stop:
-                       signal.Stop(sigUSR1)
-                       return nil
-               case <-ticker.C:
-                       logger.Print("timer went off")
-               case <-sigUSR1:
-                       logger.Print("received SIGUSR1, resetting timer")
-                       // Reset the timer so we don't start the N+1st
-                       // run too soon after the Nth run is triggered
-                       // by SIGUSR1.
-                       ticker.Stop()
-                       ticker = time.NewTicker(time.Duration(config.RunPeriod))
+       flags.Visit(func(f *flag.Flag) {
+               if !dropFlag[f.Name] {
+                       args = append(args, "-"+f.Name+"="+f.Value.String())
                }
                }
-               logger.Print("starting next run")
-       }
+       })
+
+       return service.Command(arvados.ServiceNameKeepbalance,
+               func(ctx context.Context, cluster *arvados.Cluster, token string, registry *prometheus.Registry) service.Handler {
+                       if !options.Once && cluster.Collections.BalancePeriod == arvados.Duration(0) {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("cannot start service: Collections.BalancePeriod is zero (if you want to run once and then exit, use the -once flag)"))
+                       }
+
+                       ac, err := arvados.NewClientFromConfig(cluster)
+                       ac.AuthToken = token
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+                       }
+
+                       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err))
+                       }
+                       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+                               db.SetMaxOpenConns(p)
+                       }
+                       err = db.Ping()
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err))
+                       }
+
+                       if options.Logger == nil {
+                               options.Logger = ctxlog.FromContext(ctx)
+                       }
+
+                       srv := &Server{
+                               Cluster:    cluster,
+                               ArvClient:  ac,
+                               RunOptions: options,
+                               Metrics:    newMetrics(registry),
+                               Logger:     options.Logger,
+                               Dumper:     options.Dumper,
+                               DB:         db,
+                       }
+                       srv.Handler = &health.Handler{
+                               Token:  cluster.ManagementToken,
+                               Prefix: "/_health/",
+                               Routes: health.Routes{"ping": srv.CheckHealth},
+                       }
+
+                       go srv.run(ctx)
+                       return srv
+               }).RunCommand(prog, args, stdin, stdout, stderr)
 }
 }