Merge branch '17574-storage-classes-confirmed' into main
[arvados.git] / services / keep-balance / main.go
index 8b4ee84c716e4596987b1371b38035610f9ffa2f..e1573e7f733935028d164d6d5dd69d383fdf338f 100644 (file)
@@ -9,6 +9,8 @@ import (
        "flag"
        "fmt"
        "io"
        "flag"
        "fmt"
        "io"
+       "net/http"
+       _ "net/http/pprof"
        "os"
 
        "git.arvados.org/arvados.git/lib/config"
        "os"
 
        "git.arvados.org/arvados.git/lib/config"
@@ -16,6 +18,8 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/jmoiron/sqlx"
+       _ "github.com/lib/pq"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
@@ -35,8 +39,17 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
        flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
        flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+       flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
+               "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
        flags.Bool("version", false, "Write version information to stdout and exit 0")
        dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
        flags.Bool("version", false, "Write version information to stdout and exit 0")
        dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
+       pprofAddr := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
+
+       if *pprofAddr != "" {
+               go func() {
+                       logrus.Println(http.ListenAndServe(*pprofAddr, nil))
+               }()
+       }
 
        loader := config.NewLoader(os.Stdin, logger)
        loader.SetupFlags(flags)
 
        loader := config.NewLoader(os.Stdin, logger)
        loader.SetupFlags(flags)
@@ -55,10 +68,11 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
        // service.Command
        args = nil
        dropFlag := map[string]bool{
        // service.Command
        args = nil
        dropFlag := map[string]bool{
-               "once":         true,
-               "commit-pulls": true,
-               "commit-trash": true,
-               "dump":         true,
+               "once":                    true,
+               "commit-pulls":            true,
+               "commit-trash":            true,
+               "commit-confirmed-fields": true,
+               "dump":                    true,
        }
        flags.Visit(func(f *flag.Flag) {
                if !dropFlag[f.Name] {
        }
        flags.Visit(func(f *flag.Flag) {
                if !dropFlag[f.Name] {
@@ -78,6 +92,18 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                                return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
                        }
 
                                return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
                        }
 
+                       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err))
+                       }
+                       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+                               db.SetMaxOpenConns(p)
+                       }
+                       err = db.Ping()
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err))
+                       }
+
                        if options.Logger == nil {
                                options.Logger = ctxlog.FromContext(ctx)
                        }
                        if options.Logger == nil {
                                options.Logger = ctxlog.FromContext(ctx)
                        }
@@ -89,6 +115,7 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                                Metrics:    newMetrics(registry),
                                Logger:     options.Logger,
                                Dumper:     options.Dumper,
                                Metrics:    newMetrics(registry),
                                Logger:     options.Logger,
                                Dumper:     options.Dumper,
+                               DB:         db,
                        }
                        srv.Handler = &health.Handler{
                                Token:  cluster.ManagementToken,
                        }
                        srv.Handler = &health.Handler{
                                Token:  cluster.ManagementToken,