Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / services / keep-balance / main.go
index 3316a17240cddd69c37aa804e9bd5e2dbf610def..b016db22ffe67f6316f1e4f537bfa680f135ecad 100644 (file)
 //
 // SPDX-License-Identifier: AGPL-3.0
 
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepbalance
 
 import (
 
 import (
-       "encoding/json"
+       "bytes"
+       "context"
        "flag"
        "fmt"
        "flag"
        "fmt"
-       "log"
-       "net/http"
-       "os"
-       "time"
+       "io"
+       _ "net/http/pprof"
 
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/config"
-       "github.com/sirupsen/logrus"
+       "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/lib/service"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/jmoiron/sqlx"
+       _ "github.com/lib/pq"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 )
 
-var debugf = func(string, ...interface{}) {}
+type command struct{}
 
 
-func main() {
-       var cfg Config
-       var runOptions RunOptions
+var Command = command{}
 
 
-       configPath := flag.String("config", defaultConfigPath,
-               "`path` of JSON or YAML configuration file")
-       serviceListPath := flag.String("config.KeepServiceList", "",
-               "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
-                       "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
-       flag.BoolVar(&runOptions.Once, "once", false,
+func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       var options RunOptions
+       flags := flag.NewFlagSet(prog, flag.ContinueOnError)
+       flags.BoolVar(&options.Once, "once", false,
                "balance once and then exit")
                "balance once and then exit")
-       flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+       flags.BoolVar(&options.CommitPulls, "commit-pulls", false,
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
-       flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+       flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
-       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
-       dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
-       debugFlag := flag.Bool("debug", false, "enable debug messages")
-       getVersion := flag.Bool("version", false, "Print version information and exit.")
-       flag.Usage = usage
-       flag.Parse()
+       flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
+               "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
+       // These options are implemented by service.Command, so we
+       // don't need the vars here -- we just need the flags
+       // to pass flags.Parse().
+       flags.Bool("dump", false, "dump details for each block to stdout")
+       flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
+       flags.Bool("version", false, "Write version information to stdout and exit 0")
 
 
-       // Print version information if requested
-       if *getVersion {
-               fmt.Printf("keep-balance %s\n", version)
-               return
+       logger := ctxlog.New(stderr, "json", "info")
+       loader := config.NewLoader(&bytes.Buffer{}, logger)
+       loader.SetupFlags(flags)
+       munged := loader.MungeLegacyConfigArgs(logger, args, "-legacy-keepbalance-config")
+       if ok, code := cmd.ParseFlags(flags, prog, munged, "", stderr); !ok {
+               return code
        }
 
        }
 
-       mustReadConfig(&cfg, *configPath)
-       if *serviceListPath != "" {
-               mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
+       // Drop our custom args that would be rejected by the generic
+       // service.Command
+       args = nil
+       dropFlag := map[string]bool{
+               "once":                    true,
+               "commit-pulls":            true,
+               "commit-trash":            true,
+               "commit-confirmed-fields": true,
+               "dump":                    true,
        }
        }
+       flags.Visit(func(f *flag.Flag) {
+               if !dropFlag[f.Name] {
+                       args = append(args, "-"+f.Name+"="+f.Value.String())
+               }
+       })
 
 
-       if *dumpConfig {
-               log.Fatal(config.DumpAndExit(cfg))
-       }
+       return service.Command(arvados.ServiceNameKeepbalance,
+               func(ctx context.Context, cluster *arvados.Cluster, token string, registry *prometheus.Registry) service.Handler {
+                       if !options.Once && cluster.Collections.BalancePeriod == arvados.Duration(0) {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("cannot start service: Collections.BalancePeriod is zero (if you want to run once and then exit, use the -once flag)"))
+                       }
 
 
-       to := time.Duration(cfg.RequestTimeout)
-       if to == 0 {
-               to = 30 * time.Minute
-       }
-       arvados.DefaultSecureClient.Timeout = to
-       arvados.InsecureHTTPClient.Timeout = to
-       http.DefaultClient.Timeout = to
+                       ac, err := arvados.NewClientFromConfig(cluster)
+                       ac.AuthToken = token
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+                       }
 
 
-       log.Printf("keep-balance %s started", version)
+                       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err))
+                       }
+                       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+                               db.SetMaxOpenConns(p)
+                       }
+                       err = db.Ping()
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err))
+                       }
 
 
-       if *debugFlag {
-               debugf = log.Printf
-               if j, err := json.Marshal(cfg); err != nil {
-                       log.Fatal(err)
-               } else {
-                       log.Printf("config is %s", j)
-               }
-       }
-       if *dumpFlag {
-               runOptions.Dumper = logrus.New()
-               runOptions.Dumper.Out = os.Stdout
-               runOptions.Dumper.Formatter = &logrus.TextFormatter{}
-       }
-       srv, err := NewServer(cfg, runOptions)
-       if err != nil {
-               // (don't run)
-       } else if runOptions.Once {
-               _, err = srv.Run()
-       } else {
-               err = srv.RunForever(nil)
-       }
-       if err != nil {
-               log.Fatal(err)
-       }
-}
+                       if options.Logger == nil {
+                               options.Logger = ctxlog.FromContext(ctx)
+                       }
 
 
-func mustReadConfig(dst interface{}, path string) {
-       if err := config.LoadFile(dst, path); err != nil {
-               log.Fatal(err)
-       }
+                       srv := &Server{
+                               Cluster:    cluster,
+                               ArvClient:  ac,
+                               RunOptions: options,
+                               Metrics:    newMetrics(registry),
+                               Logger:     options.Logger,
+                               Dumper:     options.Dumper,
+                               DB:         db,
+                       }
+                       srv.Handler = &health.Handler{
+                               Token:  cluster.ManagementToken,
+                               Prefix: "/_health/",
+                               Routes: health.Routes{"ping": srv.CheckHealth},
+                       }
+
+                       go srv.run(ctx)
+                       return srv
+               }).RunCommand(prog, args, stdin, stdout, stderr)
 }
 }