Merge branch '10813-arv-put-six-threads'
[arvados.git] / services / keep-balance / main.go
index 42a8d635b131f14a78f2ac7ced915a32d0f5a7f7..04d3e9992b21f74289919dfeb5a67bda509e94f6 100644 (file)
@@ -3,7 +3,6 @@ package main
 import (
        "encoding/json"
        "flag"
 import (
        "encoding/json"
        "flag"
-       "io/ioutil"
        "log"
        "os"
        "os/signal"
        "log"
        "os"
        "os/signal"
@@ -11,8 +10,11 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/config"
 )
 
 )
 
+const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
+
 // Config specifies site configuration, like API credentials and the
 // choice of which servers are to be balanced.
 //
 // Config specifies site configuration, like API credentials and the
 // choice of which servers are to be balanced.
 //
@@ -28,6 +30,14 @@ type Config struct {
 
        // How often to check
        RunPeriod arvados.Duration
 
        // How often to check
        RunPeriod arvados.Duration
+
+       // Number of collections to request in each API call
+       CollectionBatchSize int
+
+       // Max collections to buffer in memory (bigger values consume
+       // more memory, but can reduce store-and-forward latency when
+       // fetching pages)
+       CollectionBuffers int
 }
 
 // RunOptions controls runtime behavior. The flags/options that belong
 }
 
 // RunOptions controls runtime behavior. The flags/options that belong
@@ -43,18 +53,24 @@ type RunOptions struct {
        CommitTrash bool
        Logger      *log.Logger
        Dumper      *log.Logger
        CommitTrash bool
        Logger      *log.Logger
        Dumper      *log.Logger
+
+       // SafeRendezvousState from the most recent balance operation,
+       // or "" if unknown. If this changes from one run to the next,
+       // we need to watch out for races. See
+       // (*Balancer)ClearTrashLists.
+       SafeRendezvousState string
 }
 
 var debugf = func(string, ...interface{}) {}
 
 func main() {
 }
 
 var debugf = func(string, ...interface{}) {}
 
 func main() {
-       var config Config
+       var cfg Config
        var runOptions RunOptions
 
        var runOptions RunOptions
 
-       configPath := flag.String("config", "",
-               "`path` of json configuration file")
+       configPath := flag.String("config", defaultConfigPath,
+               "`path` of JSON or YAML configuration file")
        serviceListPath := flag.String("config.KeepServiceList", "",
        serviceListPath := flag.String("config.KeepServiceList", "",
-               "`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+
+               "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
                        "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
        flag.BoolVar(&runOptions.Once, "once", false,
                "balance once and then exit")
                        "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
        flag.BoolVar(&runOptions.Once, "once", false,
                "balance once and then exit")
@@ -62,22 +78,24 @@ func main() {
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
        flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
        flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
        dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
        debugFlag := flag.Bool("debug", false, "enable debug messages")
        flag.Usage = usage
        flag.Parse()
 
        dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
        debugFlag := flag.Bool("debug", false, "enable debug messages")
        flag.Usage = usage
        flag.Parse()
 
-       if *configPath == "" {
-               log.Fatal("You must specify a config file (see `keep-balance -help`)")
-       }
-       mustReadJSON(&config, *configPath)
+       mustReadConfig(&cfg, *configPath)
        if *serviceListPath != "" {
        if *serviceListPath != "" {
-               mustReadJSON(&config.KeepServiceList, *serviceListPath)
+               mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
+       }
+
+       if *dumpConfig {
+               log.Fatal(config.DumpAndExit(cfg))
        }
 
        if *debugFlag {
                debugf = log.Printf
        }
 
        if *debugFlag {
                debugf = log.Printf
-               if j, err := json.Marshal(config); err != nil {
+               if j, err := json.Marshal(cfg); err != nil {
                        log.Fatal(err)
                } else {
                        log.Printf("config is %s", j)
                        log.Fatal(err)
                } else {
                        log.Printf("config is %s", j)
@@ -86,24 +104,22 @@ func main() {
        if *dumpFlag {
                runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
        }
        if *dumpFlag {
                runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
        }
-       err := CheckConfig(config, runOptions)
+       err := CheckConfig(cfg, runOptions)
        if err != nil {
                // (don't run)
        } else if runOptions.Once {
        if err != nil {
                // (don't run)
        } else if runOptions.Once {
-               err = (&Balancer{}).Run(config, runOptions)
+               _, err = (&Balancer{}).Run(cfg, runOptions)
        } else {
        } else {
-               err = RunForever(config, runOptions, nil)
+               err = RunForever(cfg, runOptions, nil)
        }
        if err != nil {
                log.Fatal(err)
        }
 }
 
        }
        if err != nil {
                log.Fatal(err)
        }
 }
 
-func mustReadJSON(dst interface{}, path string) {
-       if buf, err := ioutil.ReadFile(path); err != nil {
-               log.Fatalf("Reading %q: %v", path, err)
-       } else if err = json.Unmarshal(buf, dst); err != nil {
-               log.Fatalf("Decoding %q: %v", path, err)
+func mustReadConfig(dst interface{}, path string) {
+       if err := config.LoadFile(dst, path); err != nil {
+               log.Fatal(err)
        }
 }
 
        }
 }
 
@@ -130,7 +146,9 @@ func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) e
                        logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
                }
 
                        logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
                }
 
-               err := (&Balancer{}).Run(config, runOptions)
+               bal := &Balancer{}
+               var err error
+               runOptions, err = bal.Run(config, runOptions)
                if err != nil {
                        logger.Print("run failed: ", err)
                } else {
                if err != nil {
                        logger.Print("run failed: ", err)
                } else {