16535: Merge branch 'master'
[arvados.git] / services / keep-balance / main.go
index eb741fa7e949803df99c67bab02936ad66f17457..65bd8d4cf098a17610953810ab8147f678616aee 100644 (file)
 package main
 
 import (
 package main
 
 import (
-       "encoding/json"
+       "context"
        "flag"
        "fmt"
        "flag"
        "fmt"
-       "log"
+       "io"
        "net/http"
        "os"
        "net/http"
        "os"
-       "os/signal"
-       "syscall"
-       "time"
 
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/config"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/lib/service"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
 )
 
 )
 
-var version = "dev"
-
-const (
-       defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
-       rfc3339NanoFixed  = "2006-01-02T15:04:05.000000000Z07:00"
-)
-
-// Config specifies site configuration, like API credentials and the
-// choice of which servers are to be balanced.
-//
-// Config is loaded from a JSON config file (see usage()).
-type Config struct {
-       // Arvados API endpoint and credentials.
-       Client arvados.Client
-
-       // List of service types (e.g., "disk") to balance.
-       KeepServiceTypes []string
-
-       KeepServiceList arvados.KeepServiceList
-
-       // address, address:port, or :port for management interface
-       Listen string
-
-       // How often to check
-       RunPeriod arvados.Duration
-
-       // Number of collections to request in each API call
-       CollectionBatchSize int
-
-       // Max collections to buffer in memory (bigger values consume
-       // more memory, but can reduce store-and-forward latency when
-       // fetching pages)
-       CollectionBuffers int
-
-       // Timeout for outgoing http request/response cycle.
-       RequestTimeout arvados.Duration
-}
-
-// RunOptions controls runtime behavior. The flags/options that belong
-// here are the ones that are useful for interactive use. For example,
-// "CommitTrash" is a runtime option rather than a config item because
-// it invokes a troubleshooting feature rather than expressing how
-// balancing is meant to be done at a given site.
-//
-// RunOptions fields are controlled by command line flags.
-type RunOptions struct {
-       Once        bool
-       CommitPulls bool
-       CommitTrash bool
-       Logger      *logrus.Logger
-       Dumper      *logrus.Logger
-
-       // SafeRendezvousState from the most recent balance operation,
-       // or "" if unknown. If this changes from one run to the next,
-       // we need to watch out for races. See
-       // (*Balancer)ClearTrashLists.
-       SafeRendezvousState string
+func main() {
+       os.Exit(runCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
 }
 
 }
 
-var debugf = func(string, ...interface{}) {}
+func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       logger := ctxlog.FromContext(context.Background())
 
 
-func main() {
-       var cfg Config
-       var runOptions RunOptions
-
-       configPath := flag.String("config", defaultConfigPath,
-               "`path` of JSON or YAML configuration file")
-       serviceListPath := flag.String("config.KeepServiceList", "",
-               "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
-                       "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
-       flag.BoolVar(&runOptions.Once, "once", false,
+       var options RunOptions
+       flags := flag.NewFlagSet(prog, flag.ExitOnError)
+       flags.BoolVar(&options.Once, "once", false,
                "balance once and then exit")
                "balance once and then exit")
-       flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+       flags.BoolVar(&options.CommitPulls, "commit-pulls", false,
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
-       flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+       flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
-       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
-       dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
-       debugFlag := flag.Bool("debug", false, "enable debug messages")
-       getVersion := flag.Bool("version", false, "Print version information and exit.")
-       flag.Usage = usage
-       flag.Parse()
-
-       // Print version information if requested
-       if *getVersion {
-               fmt.Printf("keep-balance %s\n", version)
-               return
-       }
-
-       mustReadConfig(&cfg, *configPath)
-       if *serviceListPath != "" {
-               mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
-       }
-
-       if *dumpConfig {
-               log.Fatal(config.DumpAndExit(cfg))
-       }
+       flags.Bool("version", false, "Write version information to stdout and exit 0")
+       dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
 
 
-       to := time.Duration(cfg.RequestTimeout)
-       if to == 0 {
-               to = 30 * time.Minute
-       }
-       arvados.DefaultSecureClient.Timeout = to
-       arvados.InsecureHTTPClient.Timeout = to
-       http.DefaultClient.Timeout = to
+       loader := config.NewLoader(os.Stdin, logger)
+       loader.SetupFlags(flags)
 
 
-       log.Printf("keep-balance %s started", version)
+       munged := loader.MungeLegacyConfigArgs(logger, args, "-legacy-keepbalance-config")
+       flags.Parse(munged)
 
 
-       if *debugFlag {
-               debugf = log.Printf
-               if j, err := json.Marshal(cfg); err != nil {
-                       log.Fatal(err)
-               } else {
-                       log.Printf("config is %s", j)
-               }
-       }
        if *dumpFlag {
        if *dumpFlag {
-               runOptions.Dumper = logrus.New()
-               runOptions.Dumper.Out = os.Stdout
-               runOptions.Dumper.Formatter = &logrus.TextFormatter{}
-       }
-       srv, err := NewServer(cfg, runOptions)
-       if err != nil {
-               // (don't run)
-       } else if runOptions.Once {
-               _, err = srv.Run()
-       } else {
-               err = srv.RunForever(nil)
-       }
-       if err != nil {
-               log.Fatal(err)
-       }
-}
-
-func mustReadConfig(dst interface{}, path string) {
-       if err := config.LoadFile(dst, path); err != nil {
-               log.Fatal(err)
-       }
-}
-
-type Server struct {
-       config     Config
-       runOptions RunOptions
-       metrics    *metrics
-       listening  string // for tests
-
-       Logger *logrus.Logger
-       Dumper *logrus.Logger
-}
-
-// NewServer returns a new Server that runs Balancers using the given
-// config and runOptions.
-func NewServer(config Config, runOptions RunOptions) (*Server, error) {
-       if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
-               return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
-       }
-       if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
-               return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
-       }
-
-       if runOptions.Logger == nil {
-               log := logrus.New()
-               log.Formatter = &logrus.JSONFormatter{
-                       TimestampFormat: rfc3339NanoFixed,
-               }
-               log.Out = os.Stderr
-               runOptions.Logger = log
-       }
-
-       srv := &Server{
-               config:     config,
-               runOptions: runOptions,
-               metrics:    newMetrics(),
-               Logger:     runOptions.Logger,
-               Dumper:     runOptions.Dumper,
-       }
-       return srv, srv.start()
-}
-
-func (srv *Server) start() error {
-       if srv.config.Listen == "" {
-               return nil
-       }
-       server := &httpserver.Server{
-               Server: http.Server{
-                       Handler: httpserver.LogRequests(srv.Logger, srv.metrics.Handler(srv.Logger)),
-               },
-               Addr: srv.config.Listen,
-       }
-       err := server.Start()
-       if err != nil {
-               return err
-       }
-       srv.Logger.Printf("listening at %s", server.Addr)
-       srv.listening = server.Addr
-       return nil
-}
-
-func (srv *Server) Run() (*Balancer, error) {
-       bal := &Balancer{
-               Logger:  srv.Logger,
-               Dumper:  srv.Dumper,
-               Metrics: srv.metrics,
-       }
-       var err error
-       srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
-       return bal, err
-}
-
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) RunForever(stop <-chan interface{}) error {
-       logger := srv.runOptions.Logger
-
-       ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
-
-       // The unbuffered channel here means we only hear SIGUSR1 if
-       // it arrives while we're waiting in select{}.
-       sigUSR1 := make(chan os.Signal)
-       signal.Notify(sigUSR1, syscall.SIGUSR1)
-
-       logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
-
-       for {
-               if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
-                       logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
-                       logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
-               }
-
-               _, err := srv.Run()
-               if err != nil {
-                       logger.Print("run failed: ", err)
-               } else {
-                       logger.Print("run succeeded")
-               }
-
-               select {
-               case <-stop:
-                       signal.Stop(sigUSR1)
-                       return nil
-               case <-ticker.C:
-                       logger.Print("timer went off")
-               case <-sigUSR1:
-                       logger.Print("received SIGUSR1, resetting timer")
-                       // Reset the timer so we don't start the N+1st
-                       // run too soon after the Nth run is triggered
-                       // by SIGUSR1.
-                       ticker.Stop()
-                       ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
+               dumper := logrus.New()
+               dumper.Out = os.Stdout
+               dumper.Formatter = &logrus.TextFormatter{}
+               options.Dumper = dumper
+       }
+
+       // Drop our custom args that would be rejected by the generic
+       // service.Command
+       args = nil
+       dropFlag := map[string]bool{
+               "once":         true,
+               "commit-pulls": true,
+               "commit-trash": true,
+               "dump":         true,
+       }
+       flags.Visit(func(f *flag.Flag) {
+               if !dropFlag[f.Name] {
+                       args = append(args, "-"+f.Name, f.Value.String())
                }
                }
-               logger.Print("starting next run")
-       }
+       })
+
+       return service.Command(arvados.ServiceNameKeepbalance,
+               func(ctx context.Context, cluster *arvados.Cluster, token string, registry *prometheus.Registry) service.Handler {
+                       if !options.Once && cluster.Collections.BalancePeriod == arvados.Duration(0) {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("cannot start service: Collections.BalancePeriod is zero (if you want to run once and then exit, use the -once flag)"))
+                       }
+
+                       ac, err := arvados.NewClientFromConfig(cluster)
+                       ac.AuthToken = token
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+                       }
+
+                       if options.Logger == nil {
+                               options.Logger = ctxlog.FromContext(ctx)
+                       }
+
+                       srv := &Server{
+                               Handler:    http.NotFoundHandler(),
+                               Cluster:    cluster,
+                               ArvClient:  ac,
+                               RunOptions: options,
+                               Metrics:    newMetrics(registry),
+                               Logger:     options.Logger,
+                               Dumper:     options.Dumper,
+                       }
+
+                       go srv.run()
+                       return srv
+               }).RunCommand(prog, args, stdin, stdout, stderr)
 }
 }