From da79c28c17eb5de3196ab49718b86c3f73db2380 Mon Sep 17 00:00:00 2001 From: Eric Biagiotti Date: Tue, 24 Sep 2019 09:58:40 -0400 Subject: [PATCH] 14714: keep-balance uses cluster config - Removes dumpconfig flag. - Removes the options to specify a keep service list or type. Will now balance all keep services of type disk reported by the keep_services endpoint. - Debug flag removed. Uses SystemLogs.LogLevel from cluster config instead. - Reorganizes the Server struct to use lib/service to do generic service things. Arvados-DCO-1.1-Signed-off-by: Eric Biagiotti --- services/keep-balance/balance.go | 21 ++--- services/keep-balance/main.go | 129 ++++++++++++++--------------- services/keep-balance/server.go | 138 +++++++------------------------ services/keep-balance/usage.go | 106 ------------------------ 4 files changed, 102 insertions(+), 292 deletions(-) delete mode 100644 services/keep-balance/usage.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 9f814a20d3..a887c3c691 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -66,7 +66,7 @@ type Balancer struct { // Typical usage: // // runOptions, err = (&Balancer{}).Run(config, runOptions) -func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) { +func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) { nextRunOptions = runOptions defer bal.time("sweep", "wall clock time to run one full sweep")() @@ -95,24 +95,21 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R bal.lostBlocks = ioutil.Discard } - if len(config.KeepServiceList.Items) > 0 { - err = bal.SetKeepServices(config.KeepServiceList) - } else { - err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes) - } + diskService := []string{"disk"} + err = bal.DiscoverKeepServices(client, diskService) if err != nil { return } for _, srv := range bal.KeepServices { - err = srv.discoverMounts(&config.Client) + err = srv.discoverMounts(client) if err != nil { return } } bal.cleanupMounts() - if err = bal.CheckSanityEarly(&config.Client); err != nil { + if err = bal.CheckSanityEarly(client); err != nil { return } rs := bal.rendezvousState() @@ -121,7 +118,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R bal.logf("notice: KeepServices list has changed since last run") } bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run") - if err = bal.ClearTrashLists(&config.Client); err != nil { + if err = bal.ClearTrashLists(client); err != nil { return } // The current rendezvous state becomes "safe" (i.e., @@ -130,7 +127,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R // succeed in clearing existing trash lists. nextRunOptions.SafeRendezvousState = rs } - if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil { + if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil { return } bal.ComputeChangeSets() @@ -150,14 +147,14 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R lbFile = nil } if runOptions.CommitPulls { - err = bal.CommitPulls(&config.Client) + err = bal.CommitPulls(client) if err != nil { // Skip trash if we can't pull. (Too cautious?) return } } if runOptions.CommitTrash { - err = bal.CommitTrash(&config.Client) + err = bal.CommitTrash(client) } return } diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go index 84516a8210..606fde4984 100644 --- a/services/keep-balance/main.go +++ b/services/keep-balance/main.go @@ -5,97 +5,90 @@ package main import ( - "encoding/json" + "context" "flag" "fmt" + "io" "log" - "net/http" "os" - "time" + "git.curoverse.com/arvados.git/lib/config" + "git.curoverse.com/arvados.git/lib/service" "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/config" + "git.curoverse.com/arvados.git/sdk/go/ctxlog" "github.com/sirupsen/logrus" ) -var debugf = func(string, ...interface{}) {} +var ( + version = "dev" + debugf = func(string, ...interface{}) {} + command = service.Command(arvados.ServiceNameKeepbalance, newHandler) + options RunOptions +) -func main() { - var cfg Config - var runOptions RunOptions - - configPath := flag.String("config", defaultConfigPath, - "`path` of JSON or YAML configuration file") - serviceListPath := flag.String("config.KeepServiceList", "", - "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+ - "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])") - flag.BoolVar(&runOptions.Once, "once", false, - "balance once and then exit") - flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false, - "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)") - flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false, - "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)") - dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit") - dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout") - debugFlag := flag.Bool("debug", false, "enable debug messages") - getVersion := flag.Bool("version", false, "Print version information and exit.") - flag.Usage = usage - flag.Parse() - - // Print version information if requested - if *getVersion { - fmt.Printf("keep-balance %s\n", version) - return +func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string) service.Handler { + if !options.Once && cluster.Collections.BalancePeriod == arvados.Duration(0) { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("You must either run keep-balance with the -once flag, or set Collections.BalancePeriod in the config. "+ + "If using the legacy keep-balance.yml config, RunPeriod is the equivalant of Collections.BalancePeriod.")) } - mustReadConfig(&cfg, *configPath) - if *serviceListPath != "" { - mustReadConfig(&cfg.KeepServiceList, *serviceListPath) + ac, err := arvados.NewClientFromConfig(cluster) + if err != nil { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err)) } - if *dumpConfig { - log.Fatal(config.DumpAndExit(cfg)) + if cluster.SystemLogs.LogLevel == "debug" { + debugf = log.Printf } - to := time.Duration(cfg.RequestTimeout) - if to == 0 { - to = 30 * time.Minute + srv := &Server{ + Cluster: cluster, + ArvClient: ac, + RunOptions: options, + Metrics: newMetrics(), } - arvados.DefaultSecureClient.Timeout = to - arvados.InsecureHTTPClient.Timeout = to - http.DefaultClient.Timeout = to - log.Printf("keep-balance %s started", version) + go srv.Start(ctx) + return srv +} + +func main() { + os.Exit(runCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr)) +} + +func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + logger := ctxlog.FromContext(context.Background()) + + flags := flag.NewFlagSet(prog, flag.ExitOnError) + flags.BoolVar(&options.Once, "once", false, + "balance once and then exit") + flags.BoolVar(&options.CommitPulls, "commit-pulls", false, + "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)") + flags.BoolVar(&options.CommitTrash, "commit-trash", false, + "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)") + flags.Bool("version", false, "Write version information to stdout and exit 0") + dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout") + + loader := config.NewLoader(os.Stdin, logger) + loader.SetupFlags(flags) + + munged := loader.MungeLegacyConfigArgs(logger, args, "-legacy-keepbalance-config") + flags.Parse(munged) - if *debugFlag { - debugf = log.Printf - if j, err := json.Marshal(cfg); err != nil { - log.Fatal(err) - } else { - log.Printf("config is %s", j) - } - } if *dumpFlag { dumper := logrus.New() dumper.Out = os.Stdout dumper.Formatter = &logrus.TextFormatter{} - runOptions.Dumper = dumper - } - srv, err := NewServer(cfg, runOptions) - if err != nil { - // (don't run) - } else if runOptions.Once { - _, err = srv.Run() - } else { - err = srv.RunForever(nil) - } - if err != nil { - log.Fatal(err) + options.Dumper = dumper } -} -func mustReadConfig(dst interface{}, path string) { - if err := config.LoadFile(dst, path); err != nil { - log.Fatal(err) - } + // Only pass along the version flag, which gets handled in RunCommand + args = nil + flags.Visit(func(f *flag.Flag) { + if f.Name == "version" { + args = append(args, "-"+f.Name, f.Value.String()) + } + }) + + return command.RunCommand(prog, args, stdin, stdout, stderr) } diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go index e2f13a425e..0f4bb7176a 100644 --- a/services/keep-balance/server.go +++ b/services/keep-balance/server.go @@ -6,7 +6,6 @@ package main import ( "context" - "fmt" "net/http" "os" "os/signal" @@ -14,57 +13,10 @@ import ( "time" "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/auth" "git.curoverse.com/arvados.git/sdk/go/ctxlog" - "git.curoverse.com/arvados.git/sdk/go/httpserver" "github.com/sirupsen/logrus" ) -var version = "dev" - -const ( - defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml" - rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" -) - -// Config specifies site configuration, like API credentials and the -// choice of which servers are to be balanced. -// -// Config is loaded from a JSON config file (see usage()). -type Config struct { - // Arvados API endpoint and credentials. - Client arvados.Client - - // List of service types (e.g., "disk") to balance. - KeepServiceTypes []string - - KeepServiceList arvados.KeepServiceList - - // address, address:port, or :port for management interface - Listen string - - // token for management APIs - ManagementToken string - - // How often to check - RunPeriod arvados.Duration - - // Number of collections to request in each API call - CollectionBatchSize int - - // Max collections to buffer in memory (bigger values consume - // more memory, but can reduce store-and-forward latency when - // fetching pages) - CollectionBuffers int - - // Timeout for outgoing http request/response cycle. - RequestTimeout arvados.Duration - - // Destination filename for the list of lost block hashes, one - // per line. Updated atomically during each successful run. - LostBlocksFile string -} - // RunOptions controls runtime behavior. The flags/options that belong // here are the ones that are useful for interactive use. For example, // "CommitTrash" is a runtime option rather than a config item because @@ -87,100 +39,74 @@ type RunOptions struct { } type Server struct { - config Config - runOptions RunOptions - metrics *metrics - listening string // for tests + http.Handler + Cluster *arvados.Cluster + ArvClient *arvados.Client + RunOptions RunOptions + Metrics *metrics Logger logrus.FieldLogger Dumper logrus.FieldLogger } -// NewServer returns a new Server that runs Balancers using the given -// config and runOptions. -func NewServer(config Config, runOptions RunOptions) (*Server, error) { - if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil { - return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config") - } - if !runOptions.Once && config.RunPeriod == arvados.Duration(0) { - return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config") - } +// CheckHealth implements service.Handler. +func (srv *Server) CheckHealth() error { + return nil +} - if runOptions.Logger == nil { - log := logrus.New() - log.Formatter = &logrus.JSONFormatter{ - TimestampFormat: rfc3339NanoFixed, - } - log.Out = os.Stderr - runOptions.Logger = log +// Start sets up and runs the balancer. +func (srv *Server) Start(ctx context.Context) { + if srv.RunOptions.Logger == nil { + srv.RunOptions.Logger = ctxlog.FromContext(ctx) } - srv := &Server{ - config: config, - runOptions: runOptions, - metrics: newMetrics(), - Logger: runOptions.Logger, - Dumper: runOptions.Dumper, - } - return srv, srv.start() -} + srv.Logger = srv.RunOptions.Logger + srv.Dumper = srv.RunOptions.Dumper -func (srv *Server) start() error { - if srv.config.Listen == "" { - return nil - } - ctx := ctxlog.Context(context.Background(), srv.Logger) - server := &httpserver.Server{ - Server: http.Server{ - Handler: httpserver.HandlerWithContext(ctx, - httpserver.LogRequests( - auth.RequireLiteralToken(srv.config.ManagementToken, - srv.metrics.Handler(srv.Logger)))), - }, - Addr: srv.config.Listen, + var err error + if srv.RunOptions.Once { + _, err = srv.run() + } else { + err = srv.runForever(nil) } - err := server.Start() if err != nil { - return err + srv.Logger.Error(err) } - srv.Logger.Printf("listening at %s", server.Addr) - srv.listening = server.Addr - return nil } -func (srv *Server) Run() (*Balancer, error) { +func (srv *Server) run() (*Balancer, error) { bal := &Balancer{ Logger: srv.Logger, Dumper: srv.Dumper, - Metrics: srv.metrics, - LostBlocksFile: srv.config.LostBlocksFile, + Metrics: srv.Metrics, + LostBlocksFile: srv.Cluster.Collections.BlobMissingReport, } var err error - srv.runOptions, err = bal.Run(srv.config, srv.runOptions) + srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions) return bal, err } // RunForever runs forever, or (for testing purposes) until the given // stop channel is ready to receive. -func (srv *Server) RunForever(stop <-chan interface{}) error { - logger := srv.runOptions.Logger +func (srv *Server) runForever(stop <-chan interface{}) error { + logger := srv.Logger - ticker := time.NewTicker(time.Duration(srv.config.RunPeriod)) + ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod)) // The unbuffered channel here means we only hear SIGUSR1 if // it arrives while we're waiting in select{}. sigUSR1 := make(chan os.Signal) signal.Notify(sigUSR1, syscall.SIGUSR1) - logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod) + logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod) for { - if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash { + if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash { logger.Print("WARNING: Will scan periodically, but no changes will be committed.") logger.Print("======= Consider using -commit-pulls and -commit-trash flags.") } - _, err := srv.Run() + _, err := srv.run() if err != nil { logger.Print("run failed: ", err) } else { @@ -199,7 +125,7 @@ func (srv *Server) RunForever(stop <-chan interface{}) error { // run too soon after the Nth run is triggered // by SIGUSR1. ticker.Stop() - ticker = time.NewTicker(time.Duration(srv.config.RunPeriod)) + ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod)) } logger.Print("starting next run") } diff --git a/services/keep-balance/usage.go b/services/keep-balance/usage.go deleted file mode 100644 index b39e83905d..0000000000 --- a/services/keep-balance/usage.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: AGPL-3.0 - -package main - -import ( - "flag" - "fmt" - "os" -) - -var exampleConfigFile = []byte(` -Client: - APIHost: zzzzz.arvadosapi.com:443 - AuthToken: xyzzy - Insecure: false -KeepServiceTypes: - - disk -Listen: ":9005" -ManagementToken: xyzzy -RunPeriod: 600s -CollectionBatchSize: 100000 -CollectionBuffers: 1000 -RequestTimeout: 30m`) - -func usage() { - fmt.Fprintf(os.Stderr, ` - -keep-balance rebalances a set of keepstore servers. It creates new -copies of underreplicated blocks, deletes excess copies of -overreplicated and unreferenced blocks, and moves blocks to better -positions (according to the rendezvous hash algorithm) so clients find -them faster. - -Usage: keep-balance [options] - -Options: -`) - flag.PrintDefaults() - fmt.Fprintf(os.Stderr, ` -Example config file: -%s - - Client.AuthToken must be recognized by Arvados as an admin token, - and must be recognized by all Keep services as a "data manager - key". - - Client.Insecure should be true if your Arvados API endpoint uses - an unverifiable SSL/TLS certificate. - -Periodic scanning: - - By default, keep-balance operates periodically, i.e.: do a - scan/balance operation, sleep, repeat. - - RunPeriod determines the interval between start times of - successive scan/balance operations. If a scan/balance operation - takes longer than RunPeriod, the next one will follow it - immediately. - - If SIGUSR1 is received during an idle period between operations, - the next operation will start immediately. - -One-time scanning: - - Use the -once flag to do a single operation and then exit. The - exit code will be zero if the operation was successful. - -Committing: - - By default, keep-service computes and reports changes but does not - implement them by sending pull and trash lists to the Keep - services. - - Use the -commit-pull and -commit-trash flags to implement the - computed changes. - -Tuning resource usage: - - CollectionBatchSize limits the number of collections retrieved per - API transaction. If this is zero or omitted, page size is - determined by the API server's own page size limits (see - max_items_per_response and max_index_database_read configs). - - CollectionBuffers sets the size of an internal queue of - collections. Higher values use more memory, and improve throughput - by allowing keep-balance to fetch the next page of collections - while the current page is still being processed. If this is zero - or omitted, pages are processed serially. - - RequestTimeout is the maximum time keep-balance will spend on a - single HTTP request (getting a page of collections, getting the - block index from a keepstore server, or sending a trash or pull - list to a keepstore server). Defaults to 30 minutes. - -Limitations: - - keep-balance does not attempt to discover whether committed pull - and trash requests ever get carried out -- only that they are - accepted by the Keep services. If some services are full, new - copies of underreplicated blocks might never get made, only - repeatedly requested. - -`, exampleConfigFile) -} -- 2.30.2