14714: keep-balance uses cluster config
authorEric Biagiotti <ebiagiotti@veritasgenetics.com>
Tue, 24 Sep 2019 13:58:40 +0000 (09:58 -0400)
committerEric Biagiotti <ebiagiotti@veritasgenetics.com>
Thu, 26 Sep 2019 18:18:47 +0000 (14:18 -0400)
- Removes dumpconfig flag.
- Removes the options to specify a keep service list or type. Will now balance all keep services of type disk reported by the keep_services endpoint.
- Debug flag removed. Uses SystemLogs.LogLevel from cluster config instead.
- Reorganizes the Server struct to use lib/service to do generic service things.

Arvados-DCO-1.1-Signed-off-by: Eric Biagiotti <ebiagiotti@veritasgenetics.com>

services/keep-balance/balance.go
services/keep-balance/main.go
services/keep-balance/server.go
services/keep-balance/usage.go [deleted file]

index 9f814a20d3572b58ea5c65333a94a6d0136f62c2..a887c3c691e375917c94fbde51c671dbfa3ae243 100644 (file)
@@ -66,7 +66,7 @@ type Balancer struct {
 // Typical usage:
 //
 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
        nextRunOptions = runOptions
 
        defer bal.time("sweep", "wall clock time to run one full sweep")()
@@ -95,24 +95,21 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                bal.lostBlocks = ioutil.Discard
        }
 
-       if len(config.KeepServiceList.Items) > 0 {
-               err = bal.SetKeepServices(config.KeepServiceList)
-       } else {
-               err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
-       }
+       diskService := []string{"disk"}
+       err = bal.DiscoverKeepServices(client, diskService)
        if err != nil {
                return
        }
 
        for _, srv := range bal.KeepServices {
-               err = srv.discoverMounts(&config.Client)
+               err = srv.discoverMounts(client)
                if err != nil {
                        return
                }
        }
        bal.cleanupMounts()
 
-       if err = bal.CheckSanityEarly(&config.Client); err != nil {
+       if err = bal.CheckSanityEarly(client); err != nil {
                return
        }
        rs := bal.rendezvousState()
@@ -121,7 +118,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                        bal.logf("notice: KeepServices list has changed since last run")
                }
                bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
-               if err = bal.ClearTrashLists(&config.Client); err != nil {
+               if err = bal.ClearTrashLists(client); err != nil {
                        return
                }
                // The current rendezvous state becomes "safe" (i.e.,
@@ -130,7 +127,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                // succeed in clearing existing trash lists.
                nextRunOptions.SafeRendezvousState = rs
        }
-       if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
+       if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
                return
        }
        bal.ComputeChangeSets()
@@ -150,14 +147,14 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                lbFile = nil
        }
        if runOptions.CommitPulls {
-               err = bal.CommitPulls(&config.Client)
+               err = bal.CommitPulls(client)
                if err != nil {
                        // Skip trash if we can't pull. (Too cautious?)
                        return
                }
        }
        if runOptions.CommitTrash {
-               err = bal.CommitTrash(&config.Client)
+               err = bal.CommitTrash(client)
        }
        return
 }
index 84516a821060da1b795da1b40655a9a62157fd52..606fde4984537005cc3b5aa032fb208df3bd9225 100644 (file)
@@ -5,97 +5,90 @@
 package main
 
 import (
-       "encoding/json"
+       "context"
        "flag"
        "fmt"
+       "io"
        "log"
-       "net/http"
        "os"
-       "time"
 
+       "git.curoverse.com/arvados.git/lib/config"
+       "git.curoverse.com/arvados.git/lib/service"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "github.com/sirupsen/logrus"
 )
 
-var debugf = func(string, ...interface{}) {}
+var (
+       version = "dev"
+       debugf  = func(string, ...interface{}) {}
+       command = service.Command(arvados.ServiceNameKeepbalance, newHandler)
+       options RunOptions
+)
 
-func main() {
-       var cfg Config
-       var runOptions RunOptions
-
-       configPath := flag.String("config", defaultConfigPath,
-               "`path` of JSON or YAML configuration file")
-       serviceListPath := flag.String("config.KeepServiceList", "",
-               "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
-                       "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
-       flag.BoolVar(&runOptions.Once, "once", false,
-               "balance once and then exit")
-       flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
-               "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
-       flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
-               "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
-       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
-       dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
-       debugFlag := flag.Bool("debug", false, "enable debug messages")
-       getVersion := flag.Bool("version", false, "Print version information and exit.")
-       flag.Usage = usage
-       flag.Parse()
-
-       // Print version information if requested
-       if *getVersion {
-               fmt.Printf("keep-balance %s\n", version)
-               return
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string) service.Handler {
+       if !options.Once && cluster.Collections.BalancePeriod == arvados.Duration(0) {
+               return service.ErrorHandler(ctx, cluster, fmt.Errorf("You must either run keep-balance with the -once flag, or set Collections.BalancePeriod in the config. "+
+                       "If using the legacy keep-balance.yml config, RunPeriod is the equivalant of Collections.BalancePeriod."))
        }
 
-       mustReadConfig(&cfg, *configPath)
-       if *serviceListPath != "" {
-               mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
+       ac, err := arvados.NewClientFromConfig(cluster)
+       if err != nil {
+               return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
        }
 
-       if *dumpConfig {
-               log.Fatal(config.DumpAndExit(cfg))
+       if cluster.SystemLogs.LogLevel == "debug" {
+               debugf = log.Printf
        }
 
-       to := time.Duration(cfg.RequestTimeout)
-       if to == 0 {
-               to = 30 * time.Minute
+       srv := &Server{
+               Cluster:    cluster,
+               ArvClient:  ac,
+               RunOptions: options,
+               Metrics:    newMetrics(),
        }
-       arvados.DefaultSecureClient.Timeout = to
-       arvados.InsecureHTTPClient.Timeout = to
-       http.DefaultClient.Timeout = to
 
-       log.Printf("keep-balance %s started", version)
+       go srv.Start(ctx)
+       return srv
+}
+
+func main() {
+       os.Exit(runCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
+}
+
+func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       logger := ctxlog.FromContext(context.Background())
+
+       flags := flag.NewFlagSet(prog, flag.ExitOnError)
+       flags.BoolVar(&options.Once, "once", false,
+               "balance once and then exit")
+       flags.BoolVar(&options.CommitPulls, "commit-pulls", false,
+               "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
+       flags.BoolVar(&options.CommitTrash, "commit-trash", false,
+               "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+       flags.Bool("version", false, "Write version information to stdout and exit 0")
+       dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
+
+       loader := config.NewLoader(os.Stdin, logger)
+       loader.SetupFlags(flags)
+
+       munged := loader.MungeLegacyConfigArgs(logger, args, "-legacy-keepbalance-config")
+       flags.Parse(munged)
 
-       if *debugFlag {
-               debugf = log.Printf
-               if j, err := json.Marshal(cfg); err != nil {
-                       log.Fatal(err)
-               } else {
-                       log.Printf("config is %s", j)
-               }
-       }
        if *dumpFlag {
                dumper := logrus.New()
                dumper.Out = os.Stdout
                dumper.Formatter = &logrus.TextFormatter{}
-               runOptions.Dumper = dumper
-       }
-       srv, err := NewServer(cfg, runOptions)
-       if err != nil {
-               // (don't run)
-       } else if runOptions.Once {
-               _, err = srv.Run()
-       } else {
-               err = srv.RunForever(nil)
-       }
-       if err != nil {
-               log.Fatal(err)
+               options.Dumper = dumper
        }
-}
 
-func mustReadConfig(dst interface{}, path string) {
-       if err := config.LoadFile(dst, path); err != nil {
-               log.Fatal(err)
-       }
+       // Only pass along the version flag, which gets handled in RunCommand
+       args = nil
+       flags.Visit(func(f *flag.Flag) {
+               if f.Name == "version" {
+                       args = append(args, "-"+f.Name, f.Value.String())
+               }
+       })
+
+       return command.RunCommand(prog, args, stdin, stdout, stderr)
 }
index e2f13a425ed8dfabc729649d98aa7e4ed977899a..0f4bb7176ae58044027a947cb821ba8db5ff9cdc 100644 (file)
@@ -6,7 +6,6 @@ package main
 
 import (
        "context"
-       "fmt"
        "net/http"
        "os"
        "os/signal"
@@ -14,57 +13,10 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/auth"
        "git.curoverse.com/arvados.git/sdk/go/ctxlog"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "github.com/sirupsen/logrus"
 )
 
-var version = "dev"
-
-const (
-       defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
-       rfc3339NanoFixed  = "2006-01-02T15:04:05.000000000Z07:00"
-)
-
-// Config specifies site configuration, like API credentials and the
-// choice of which servers are to be balanced.
-//
-// Config is loaded from a JSON config file (see usage()).
-type Config struct {
-       // Arvados API endpoint and credentials.
-       Client arvados.Client
-
-       // List of service types (e.g., "disk") to balance.
-       KeepServiceTypes []string
-
-       KeepServiceList arvados.KeepServiceList
-
-       // address, address:port, or :port for management interface
-       Listen string
-
-       // token for management APIs
-       ManagementToken string
-
-       // How often to check
-       RunPeriod arvados.Duration
-
-       // Number of collections to request in each API call
-       CollectionBatchSize int
-
-       // Max collections to buffer in memory (bigger values consume
-       // more memory, but can reduce store-and-forward latency when
-       // fetching pages)
-       CollectionBuffers int
-
-       // Timeout for outgoing http request/response cycle.
-       RequestTimeout arvados.Duration
-
-       // Destination filename for the list of lost block hashes, one
-       // per line. Updated atomically during each successful run.
-       LostBlocksFile string
-}
-
 // RunOptions controls runtime behavior. The flags/options that belong
 // here are the ones that are useful for interactive use. For example,
 // "CommitTrash" is a runtime option rather than a config item because
@@ -87,100 +39,74 @@ type RunOptions struct {
 }
 
 type Server struct {
-       config     Config
-       runOptions RunOptions
-       metrics    *metrics
-       listening  string // for tests
+       http.Handler
+       Cluster    *arvados.Cluster
+       ArvClient  *arvados.Client
+       RunOptions RunOptions
+       Metrics    *metrics
 
        Logger logrus.FieldLogger
        Dumper logrus.FieldLogger
 }
 
-// NewServer returns a new Server that runs Balancers using the given
-// config and runOptions.
-func NewServer(config Config, runOptions RunOptions) (*Server, error) {
-       if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
-               return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
-       }
-       if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
-               return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
-       }
+// CheckHealth implements service.Handler.
+func (srv *Server) CheckHealth() error {
+       return nil
+}
 
-       if runOptions.Logger == nil {
-               log := logrus.New()
-               log.Formatter = &logrus.JSONFormatter{
-                       TimestampFormat: rfc3339NanoFixed,
-               }
-               log.Out = os.Stderr
-               runOptions.Logger = log
+// Start sets up and runs the balancer.
+func (srv *Server) Start(ctx context.Context) {
+       if srv.RunOptions.Logger == nil {
+               srv.RunOptions.Logger = ctxlog.FromContext(ctx)
        }
 
-       srv := &Server{
-               config:     config,
-               runOptions: runOptions,
-               metrics:    newMetrics(),
-               Logger:     runOptions.Logger,
-               Dumper:     runOptions.Dumper,
-       }
-       return srv, srv.start()
-}
+       srv.Logger = srv.RunOptions.Logger
+       srv.Dumper = srv.RunOptions.Dumper
 
-func (srv *Server) start() error {
-       if srv.config.Listen == "" {
-               return nil
-       }
-       ctx := ctxlog.Context(context.Background(), srv.Logger)
-       server := &httpserver.Server{
-               Server: http.Server{
-                       Handler: httpserver.HandlerWithContext(ctx,
-                               httpserver.LogRequests(
-                                       auth.RequireLiteralToken(srv.config.ManagementToken,
-                                               srv.metrics.Handler(srv.Logger)))),
-               },
-               Addr: srv.config.Listen,
+       var err error
+       if srv.RunOptions.Once {
+               _, err = srv.run()
+       } else {
+               err = srv.runForever(nil)
        }
-       err := server.Start()
        if err != nil {
-               return err
+               srv.Logger.Error(err)
        }
-       srv.Logger.Printf("listening at %s", server.Addr)
-       srv.listening = server.Addr
-       return nil
 }
 
-func (srv *Server) Run() (*Balancer, error) {
+func (srv *Server) run() (*Balancer, error) {
        bal := &Balancer{
                Logger:         srv.Logger,
                Dumper:         srv.Dumper,
-               Metrics:        srv.metrics,
-               LostBlocksFile: srv.config.LostBlocksFile,
+               Metrics:        srv.Metrics,
+               LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
        }
        var err error
-       srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
+       srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
        return bal, err
 }
 
 // RunForever runs forever, or (for testing purposes) until the given
 // stop channel is ready to receive.
-func (srv *Server) RunForever(stop <-chan interface{}) error {
-       logger := srv.runOptions.Logger
+func (srv *Server) runForever(stop <-chan interface{}) error {
+       logger := srv.Logger
 
-       ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
+       ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
 
        // The unbuffered channel here means we only hear SIGUSR1 if
        // it arrives while we're waiting in select{}.
        sigUSR1 := make(chan os.Signal)
        signal.Notify(sigUSR1, syscall.SIGUSR1)
 
-       logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
+       logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
 
        for {
-               if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
+               if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
                        logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
                        logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
                }
 
-               _, err := srv.Run()
+               _, err := srv.run()
                if err != nil {
                        logger.Print("run failed: ", err)
                } else {
@@ -199,7 +125,7 @@ func (srv *Server) RunForever(stop <-chan interface{}) error {
                        // run too soon after the Nth run is triggered
                        // by SIGUSR1.
                        ticker.Stop()
-                       ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
+                       ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
                }
                logger.Print("starting next run")
        }
diff --git a/services/keep-balance/usage.go b/services/keep-balance/usage.go
deleted file mode 100644 (file)
index b39e839..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "flag"
-       "fmt"
-       "os"
-)
-
-var exampleConfigFile = []byte(`
-Client:
-    APIHost: zzzzz.arvadosapi.com:443
-    AuthToken: xyzzy
-    Insecure: false
-KeepServiceTypes:
-    - disk
-Listen: ":9005"
-ManagementToken: xyzzy
-RunPeriod: 600s
-CollectionBatchSize: 100000
-CollectionBuffers: 1000
-RequestTimeout: 30m`)
-
-func usage() {
-       fmt.Fprintf(os.Stderr, `
-
-keep-balance rebalances a set of keepstore servers. It creates new
-copies of underreplicated blocks, deletes excess copies of
-overreplicated and unreferenced blocks, and moves blocks to better
-positions (according to the rendezvous hash algorithm) so clients find
-them faster.
-
-Usage: keep-balance [options]
-
-Options:
-`)
-       flag.PrintDefaults()
-       fmt.Fprintf(os.Stderr, `
-Example config file:
-%s
-
-    Client.AuthToken must be recognized by Arvados as an admin token,
-    and must be recognized by all Keep services as a "data manager
-    key".
-
-    Client.Insecure should be true if your Arvados API endpoint uses
-    an unverifiable SSL/TLS certificate.
-
-Periodic scanning:
-
-    By default, keep-balance operates periodically, i.e.: do a
-    scan/balance operation, sleep, repeat.
-
-    RunPeriod determines the interval between start times of
-    successive scan/balance operations. If a scan/balance operation
-    takes longer than RunPeriod, the next one will follow it
-    immediately.
-
-    If SIGUSR1 is received during an idle period between operations,
-    the next operation will start immediately.
-
-One-time scanning:
-
-    Use the -once flag to do a single operation and then exit. The
-    exit code will be zero if the operation was successful.
-
-Committing:
-
-    By default, keep-service computes and reports changes but does not
-    implement them by sending pull and trash lists to the Keep
-    services.
-
-    Use the -commit-pull and -commit-trash flags to implement the
-    computed changes.
-
-Tuning resource usage:
-
-    CollectionBatchSize limits the number of collections retrieved per
-    API transaction. If this is zero or omitted, page size is
-    determined by the API server's own page size limits (see
-    max_items_per_response and max_index_database_read configs).
-
-    CollectionBuffers sets the size of an internal queue of
-    collections. Higher values use more memory, and improve throughput
-    by allowing keep-balance to fetch the next page of collections
-    while the current page is still being processed. If this is zero
-    or omitted, pages are processed serially.
-
-    RequestTimeout is the maximum time keep-balance will spend on a
-    single HTTP request (getting a page of collections, getting the
-    block index from a keepstore server, or sending a trash or pull
-    list to a keepstore server). Defaults to 30 minutes.
-
-Limitations:
-
-    keep-balance does not attempt to discover whether committed pull
-    and trash requests ever get carried out -- only that they are
-    accepted by the Keep services. If some services are full, new
-    copies of underreplicated blocks might never get made, only
-    repeatedly requested.
-
-`, exampleConfigFile)
-}