17948: Adds arguments to allow specifying how many subdirs to create.
[arvados.git] / services / keep-balance / main.go
index 6e89df9a5552cc34bac4d13e3bae4356d6acf48a..e1573e7f733935028d164d6d5dd69d383fdf338f 100644 (file)
@@ -9,12 +9,17 @@ import (
        "flag"
        "fmt"
        "io"
        "flag"
        "fmt"
        "io"
+       "net/http"
+       _ "net/http/pprof"
        "os"
 
        "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "os"
 
        "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/jmoiron/sqlx"
+       _ "github.com/lib/pq"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
@@ -34,8 +39,17 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
        flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
        flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+       flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
+               "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
        flags.Bool("version", false, "Write version information to stdout and exit 0")
        dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
        flags.Bool("version", false, "Write version information to stdout and exit 0")
        dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
+       pprofAddr := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
+
+       if *pprofAddr != "" {
+               go func() {
+                       logrus.Println(http.ListenAndServe(*pprofAddr, nil))
+               }()
+       }
 
        loader := config.NewLoader(os.Stdin, logger)
        loader.SetupFlags(flags)
 
        loader := config.NewLoader(os.Stdin, logger)
        loader.SetupFlags(flags)
@@ -50,10 +64,18 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                options.Dumper = dumper
        }
 
                options.Dumper = dumper
        }
 
-       // Only pass along the version flag, which gets handled in RunCommand
+       // Drop our custom args that would be rejected by the generic
+       // service.Command
        args = nil
        args = nil
+       dropFlag := map[string]bool{
+               "once":                    true,
+               "commit-pulls":            true,
+               "commit-trash":            true,
+               "commit-confirmed-fields": true,
+               "dump":                    true,
+       }
        flags.Visit(func(f *flag.Flag) {
        flags.Visit(func(f *flag.Flag) {
-               if f.Name == "version" {
+               if !dropFlag[f.Name] {
                        args = append(args, "-"+f.Name, f.Value.String())
                }
        })
                        args = append(args, "-"+f.Name, f.Value.String())
                }
        })
@@ -70,6 +92,18 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                                return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
                        }
 
                                return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
                        }
 
+                       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err))
+                       }
+                       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+                               db.SetMaxOpenConns(p)
+                       }
+                       err = db.Ping()
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err))
+                       }
+
                        if options.Logger == nil {
                                options.Logger = ctxlog.FromContext(ctx)
                        }
                        if options.Logger == nil {
                                options.Logger = ctxlog.FromContext(ctx)
                        }
@@ -81,6 +115,12 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                                Metrics:    newMetrics(registry),
                                Logger:     options.Logger,
                                Dumper:     options.Dumper,
                                Metrics:    newMetrics(registry),
                                Logger:     options.Logger,
                                Dumper:     options.Dumper,
+                               DB:         db,
+                       }
+                       srv.Handler = &health.Handler{
+                               Token:  cluster.ManagementToken,
+                               Prefix: "/_health/",
+                               Routes: health.Routes{"ping": srv.CheckHealth},
                        }
 
                        go srv.run()
                        }
 
                        go srv.run()