X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/70459cdae1f4e92789a14c1fecb66f5954a5aa7f..54781f3d3c1dea0e14542d129b1c8e061ad406fc:/services/keep-balance/main.go diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go index e3e90d3581..605ee5fc82 100644 --- a/services/keep-balance/main.go +++ b/services/keep-balance/main.go @@ -5,96 +5,133 @@ package main import ( - "encoding/json" + "context" "flag" "fmt" - "log" + "io" "net/http" + _ "net/http/pprof" "os" - "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/config" - "github.com/Sirupsen/logrus" + "git.arvados.org/arvados.git/lib/config" + "git.arvados.org/arvados.git/lib/service" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/sdk/go/health" + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) -var debugf = func(string, ...interface{}) {} - func main() { - var cfg Config - var runOptions RunOptions - - configPath := flag.String("config", defaultConfigPath, - "`path` of JSON or YAML configuration file") - serviceListPath := flag.String("config.KeepServiceList", "", - "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+ - "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])") - flag.BoolVar(&runOptions.Once, "once", false, + os.Exit(runCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr)) +} + +func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + logger := ctxlog.FromContext(context.Background()) + + var options RunOptions + flags := flag.NewFlagSet(prog, flag.ExitOnError) + flags.BoolVar(&options.Once, "once", false, "balance once and then exit") - flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false, + flags.BoolVar(&options.CommitPulls, "commit-pulls", false, "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)") - flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false, + flags.BoolVar(&options.CommitTrash, "commit-trash", false, "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)") - dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit") - dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout") - debugFlag := flag.Bool("debug", false, "enable debug messages") - getVersion := flag.Bool("version", false, "Print version information and exit.") - flag.Usage = usage - flag.Parse() - - // Print version information if requested - if *getVersion { - fmt.Printf("keep-balance %s\n", version) - return - } + flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true, + "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)") + flags.Bool("version", false, "Write version information to stdout and exit 0") + dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout") + pprofAddr := flags.String("pprof", "", "serve Go profile data at `[addr]:port`") - mustReadConfig(&cfg, *configPath) - if *serviceListPath != "" { - mustReadConfig(&cfg.KeepServiceList, *serviceListPath) + if *pprofAddr != "" { + go func() { + logrus.Println(http.ListenAndServe(*pprofAddr, nil)) + }() } - if *dumpConfig { - log.Fatal(config.DumpAndExit(cfg)) - } + loader := config.NewLoader(os.Stdin, logger) + loader.SetupFlags(flags) - to := time.Duration(cfg.RequestTimeout) - if to == 0 { - to = 30 * time.Minute + munged := loader.MungeLegacyConfigArgs(logger, args, "-legacy-keepbalance-config") + err := flags.Parse(munged) + if err != nil { + logger.Errorf("error parsing command line flags: %s", err) + return 2 } - arvados.DefaultSecureClient.Timeout = to - arvados.InsecureHTTPClient.Timeout = to - http.DefaultClient.Timeout = to - - log.Printf("keep-balance %s started", version) - - if *debugFlag { - debugf = log.Printf - if j, err := json.Marshal(cfg); err != nil { - log.Fatal(err) - } else { - log.Printf("config is %s", j) - } + if flags.NArg() != 0 { + logger.Errorf("error parsing command line flags: extra arguments: %q", flags.Args()) + return 2 } + if *dumpFlag { - runOptions.Dumper = logrus.New() - runOptions.Dumper.Out = os.Stdout - runOptions.Dumper.Formatter = &logrus.TextFormatter{} - } - srv, err := NewServer(cfg, runOptions) - if err != nil { - // (don't run) - } else if runOptions.Once { - _, err = srv.Run() - } else { - err = srv.RunForever(nil) + dumper := logrus.New() + dumper.Out = os.Stdout + dumper.Formatter = &logrus.TextFormatter{} + options.Dumper = dumper } - if err != nil { - log.Fatal(err) - } -} -func mustReadConfig(dst interface{}, path string) { - if err := config.LoadFile(dst, path); err != nil { - log.Fatal(err) + // Drop our custom args that would be rejected by the generic + // service.Command + args = nil + dropFlag := map[string]bool{ + "once": true, + "commit-pulls": true, + "commit-trash": true, + "commit-confirmed-fields": true, + "dump": true, } + flags.Visit(func(f *flag.Flag) { + if !dropFlag[f.Name] { + args = append(args, "-"+f.Name, f.Value.String()) + } + }) + + return service.Command(arvados.ServiceNameKeepbalance, + func(ctx context.Context, cluster *arvados.Cluster, token string, registry *prometheus.Registry) service.Handler { + if !options.Once && cluster.Collections.BalancePeriod == arvados.Duration(0) { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("cannot start service: Collections.BalancePeriod is zero (if you want to run once and then exit, use the -once flag)")) + } + + ac, err := arvados.NewClientFromConfig(cluster) + ac.AuthToken = token + if err != nil { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err)) + } + + db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String()) + if err != nil { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err)) + } + if p := cluster.PostgreSQL.ConnectionPool; p > 0 { + db.SetMaxOpenConns(p) + } + err = db.Ping() + if err != nil { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err)) + } + + if options.Logger == nil { + options.Logger = ctxlog.FromContext(ctx) + } + + srv := &Server{ + Cluster: cluster, + ArvClient: ac, + RunOptions: options, + Metrics: newMetrics(registry), + Logger: options.Logger, + Dumper: options.Dumper, + DB: db, + } + srv.Handler = &health.Handler{ + Token: cluster.ManagementToken, + Prefix: "/_health/", + Routes: health.Routes{"ping": srv.CheckHealth}, + } + + go srv.run() + return srv + }).RunCommand(prog, args, stdin, stdout, stderr) }