16727: Refresh block permission signatures on Sync and Read.
[arvados.git] / services / keep-balance / main.go
index e3e90d3581517c0ae8831f76be2881aaa0f1a44c..8a95d389c8292f04c8479f0c21abdcf5c84932a3 100644 (file)
 package main
 
 import (
 package main
 
 import (
-       "encoding/json"
+       "context"
        "flag"
        "fmt"
        "flag"
        "fmt"
-       "log"
+       "io"
        "net/http"
        "net/http"
+       _ "net/http/pprof"
        "os"
        "os"
-       "time"
 
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/config"
-       "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/lib/service"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/jmoiron/sqlx"
+       _ "github.com/lib/pq"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
 )
 
 )
 
-var debugf = func(string, ...interface{}) {}
-
 func main() {
 func main() {
-       var cfg Config
-       var runOptions RunOptions
-
-       configPath := flag.String("config", defaultConfigPath,
-               "`path` of JSON or YAML configuration file")
-       serviceListPath := flag.String("config.KeepServiceList", "",
-               "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
-                       "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
-       flag.BoolVar(&runOptions.Once, "once", false,
+       os.Exit(runCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
+}
+
+func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       logger := ctxlog.FromContext(context.Background())
+
+       var options RunOptions
+       flags := flag.NewFlagSet(prog, flag.ContinueOnError)
+       flags.BoolVar(&options.Once, "once", false,
                "balance once and then exit")
                "balance once and then exit")
-       flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+       flags.BoolVar(&options.CommitPulls, "commit-pulls", false,
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
-       flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+       flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
-       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
-       dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
-       debugFlag := flag.Bool("debug", false, "enable debug messages")
-       getVersion := flag.Bool("version", false, "Print version information and exit.")
-       flag.Usage = usage
-       flag.Parse()
-
-       // Print version information if requested
-       if *getVersion {
-               fmt.Printf("keep-balance %s\n", version)
-               return
-       }
+       flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
+               "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
+       dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
+       pprofAddr := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
+       // "show version" is implemented by service.Command, so we
+       // don't need the var here -- we just need the -version flag
+       // to pass flags.Parse().
+       flags.Bool("version", false, "Write version information to stdout and exit 0")
 
 
-       mustReadConfig(&cfg, *configPath)
-       if *serviceListPath != "" {
-               mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
+       if *pprofAddr != "" {
+               go func() {
+                       logrus.Println(http.ListenAndServe(*pprofAddr, nil))
+               }()
        }
 
        }
 
-       if *dumpConfig {
-               log.Fatal(config.DumpAndExit(cfg))
-       }
+       loader := config.NewLoader(os.Stdin, logger)
+       loader.SetupFlags(flags)
 
 
-       to := time.Duration(cfg.RequestTimeout)
-       if to == 0 {
-               to = 30 * time.Minute
-       }
-       arvados.DefaultSecureClient.Timeout = to
-       arvados.InsecureHTTPClient.Timeout = to
-       http.DefaultClient.Timeout = to
-
-       log.Printf("keep-balance %s started", version)
-
-       if *debugFlag {
-               debugf = log.Printf
-               if j, err := json.Marshal(cfg); err != nil {
-                       log.Fatal(err)
-               } else {
-                       log.Printf("config is %s", j)
-               }
+       munged := loader.MungeLegacyConfigArgs(logger, args, "-legacy-keepbalance-config")
+       if ok, code := cmd.ParseFlags(flags, prog, munged, "", stderr); !ok {
+               return code
        }
        }
+
        if *dumpFlag {
        if *dumpFlag {
-               runOptions.Dumper = logrus.New()
-               runOptions.Dumper.Out = os.Stdout
-               runOptions.Dumper.Formatter = &logrus.TextFormatter{}
-       }
-       srv, err := NewServer(cfg, runOptions)
-       if err != nil {
-               // (don't run)
-       } else if runOptions.Once {
-               _, err = srv.Run()
-       } else {
-               err = srv.RunForever(nil)
+               dumper := logrus.New()
+               dumper.Out = os.Stdout
+               dumper.Formatter = &logrus.TextFormatter{}
+               options.Dumper = dumper
        }
        }
-       if err != nil {
-               log.Fatal(err)
-       }
-}
 
 
-func mustReadConfig(dst interface{}, path string) {
-       if err := config.LoadFile(dst, path); err != nil {
-               log.Fatal(err)
+       // Drop our custom args that would be rejected by the generic
+       // service.Command
+       args = nil
+       dropFlag := map[string]bool{
+               "once":                    true,
+               "commit-pulls":            true,
+               "commit-trash":            true,
+               "commit-confirmed-fields": true,
+               "dump":                    true,
        }
        }
+       flags.Visit(func(f *flag.Flag) {
+               if !dropFlag[f.Name] {
+                       args = append(args, "-"+f.Name+"="+f.Value.String())
+               }
+       })
+
+       return service.Command(arvados.ServiceNameKeepbalance,
+               func(ctx context.Context, cluster *arvados.Cluster, token string, registry *prometheus.Registry) service.Handler {
+                       if !options.Once && cluster.Collections.BalancePeriod == arvados.Duration(0) {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("cannot start service: Collections.BalancePeriod is zero (if you want to run once and then exit, use the -once flag)"))
+                       }
+
+                       ac, err := arvados.NewClientFromConfig(cluster)
+                       ac.AuthToken = token
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+                       }
+
+                       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err))
+                       }
+                       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+                               db.SetMaxOpenConns(p)
+                       }
+                       err = db.Ping()
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err))
+                       }
+
+                       if options.Logger == nil {
+                               options.Logger = ctxlog.FromContext(ctx)
+                       }
+
+                       srv := &Server{
+                               Cluster:    cluster,
+                               ArvClient:  ac,
+                               RunOptions: options,
+                               Metrics:    newMetrics(registry),
+                               Logger:     options.Logger,
+                               Dumper:     options.Dumper,
+                               DB:         db,
+                       }
+                       srv.Handler = &health.Handler{
+                               Token:  cluster.ManagementToken,
+                               Prefix: "/_health/",
+                               Routes: health.Routes{"ping": srv.CheckHealth},
+                       }
+
+                       go srv.run()
+                       return srv
+               }).RunCommand(prog, args, stdin, stdout, stderr)
 }
 }