17948: Adds arguments to allow specifying how many subdirs to create.
[arvados.git] / services / keep-balance / main.go
index cf844ab050043c1662c3cf5cd62ef9ef238a6789..e1573e7f733935028d164d6d5dd69d383fdf338f 100644 (file)
@@ -9,12 +9,17 @@ import (
        "flag"
        "fmt"
        "io"
        "flag"
        "fmt"
        "io"
+       "net/http"
+       _ "net/http/pprof"
        "os"
 
        "os"
 
-       "git.curoverse.com/arvados.git/lib/config"
-       "git.curoverse.com/arvados.git/lib/service"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/lib/service"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/jmoiron/sqlx"
+       _ "github.com/lib/pq"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
@@ -34,8 +39,17 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
        flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
        flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+       flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
+               "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
        flags.Bool("version", false, "Write version information to stdout and exit 0")
        dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
        flags.Bool("version", false, "Write version information to stdout and exit 0")
        dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
+       pprofAddr := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
+
+       if *pprofAddr != "" {
+               go func() {
+                       logrus.Println(http.ListenAndServe(*pprofAddr, nil))
+               }()
+       }
 
        loader := config.NewLoader(os.Stdin, logger)
        loader.SetupFlags(flags)
 
        loader := config.NewLoader(os.Stdin, logger)
        loader.SetupFlags(flags)
@@ -50,10 +64,18 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                options.Dumper = dumper
        }
 
                options.Dumper = dumper
        }
 
-       // Only pass along the version flag, which gets handled in RunCommand
+       // Drop our custom args that would be rejected by the generic
+       // service.Command
        args = nil
        args = nil
+       dropFlag := map[string]bool{
+               "once":                    true,
+               "commit-pulls":            true,
+               "commit-trash":            true,
+               "commit-confirmed-fields": true,
+               "dump":                    true,
+       }
        flags.Visit(func(f *flag.Flag) {
        flags.Visit(func(f *flag.Flag) {
-               if f.Name == "version" {
+               if !dropFlag[f.Name] {
                        args = append(args, "-"+f.Name, f.Value.String())
                }
        })
                        args = append(args, "-"+f.Name, f.Value.String())
                }
        })
@@ -70,6 +92,18 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                                return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
                        }
 
                                return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
                        }
 
+                       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err))
+                       }
+                       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+                               db.SetMaxOpenConns(p)
+                       }
+                       err = db.Ping()
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err))
+                       }
+
                        if options.Logger == nil {
                                options.Logger = ctxlog.FromContext(ctx)
                        }
                        if options.Logger == nil {
                                options.Logger = ctxlog.FromContext(ctx)
                        }
@@ -81,6 +115,12 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                                Metrics:    newMetrics(registry),
                                Logger:     options.Logger,
                                Dumper:     options.Dumper,
                                Metrics:    newMetrics(registry),
                                Logger:     options.Logger,
                                Dumper:     options.Dumper,
+                               DB:         db,
+                       }
+                       srv.Handler = &health.Handler{
+                               Token:  cluster.ManagementToken,
+                               Prefix: "/_health/",
+                               Routes: health.Routes{"ping": srv.CheckHealth},
                        }
 
                        go srv.run()
                        }
 
                        go srv.run()