X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3a3d67ccee068a85aa3b79c5abd40170223071e3..8a27fe370239ecb8e50d53f46b45ed61203a35ca:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index fb1e1ea545..b9dbe2777e 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -2,37 +2,19 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( - "flag" - "fmt" - "net" - "os" - "os/signal" - "syscall" "time" - - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/config" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "github.com/coreos/go-systemd/daemon" - "github.com/prometheus/client_golang/prometheus" ) -var version = "dev" - -// A Keep "block" is 64MB. +// BlockSize for a Keep "block" is 64MB. const BlockSize = 64 * 1024 * 1024 -// A Keep volume must have at least MinFreeKilobytes available +// MinFreeKilobytes is the amount of space a Keep volume must have available // in order to permit writes. const MinFreeKilobytes = BlockSize / 1024 -// ProcMounts /proc/mounts -var ProcMounts = "/proc/mounts" - var bufs *bufferPool // KeepError types. @@ -51,6 +33,7 @@ var ( DiskHashError = &KeepError{500, "Hash mismatch in stored data"} ExpiredError = &KeepError{401, "Expired permission signature"} NotFoundError = &KeepError{404, "Not Found"} + VolumeBusyError = &KeepError{503, "Volume backend busy"} GenericError = &KeepError{500, "Fail"} FullError = &KeepError{503, "Full"} SizeRequiredError = &KeepError{411, "Missing Content-Length"} @@ -64,184 +47,11 @@ func (e *KeepError) Error() string { return e.ErrMsg } -// ======================== -// Internal data structures -// -// These global variables are used by multiple parts of the -// program. They are good candidates for moving into their own -// packages. - -// The Keep VolumeManager maintains a list of available volumes. -// Initialized by the --volumes flag (or by FindKeepVolumes). -var KeepVM VolumeManager - -// The pull list manager and trash queue are threadsafe queues which -// support atomic update operations. The PullHandler and TrashHandler -// store results from Data Manager /pull and /trash requests here. -// -// See the Keep and Data Manager design documents for more details: -// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc -// https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc -// -var pullq *WorkQueue -var trashq *WorkQueue - -func main() { - deprecated.beforeFlagParse(theConfig) - - dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)") - getVersion := flag.Bool("version", false, "Print version information and exit.") - - defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml" - var configPath string - flag.StringVar( - &configPath, - "config", - defaultConfigPath, - "YAML or JSON configuration file `path`") - flag.Usage = usage - flag.Parse() - - // Print version information if requested - if *getVersion { - fmt.Printf("keepstore %s\n", version) - return - } - - deprecated.afterFlagParse(theConfig) - - err := config.LoadFile(theConfig, configPath) - if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) { - log.Fatal(err) - } - - if *dumpConfig { - log.Fatal(config.DumpAndExit(theConfig)) - } - - log.Printf("keepstore %s started", version) - - metricsRegistry := prometheus.NewRegistry() - - err = theConfig.Start(metricsRegistry) - if err != nil { - log.Fatal(err) - } - - if pidfile := theConfig.PIDFile; pidfile != "" { - f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777) - if err != nil { - log.Fatalf("open pidfile (%s): %s", pidfile, err) - } - defer f.Close() - err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) - if err != nil { - log.Fatalf("flock pidfile (%s): %s", pidfile, err) - } - defer os.Remove(pidfile) - err = f.Truncate(0) - if err != nil { - log.Fatalf("truncate pidfile (%s): %s", pidfile, err) - } - _, err = fmt.Fprint(f, os.Getpid()) - if err != nil { - log.Fatalf("write pidfile (%s): %s", pidfile, err) - } - err = f.Sync() - if err != nil { - log.Fatalf("sync pidfile (%s): %s", pidfile, err) - } - } - - var cluster *arvados.Cluster - cfg, err := arvados.GetConfig(arvados.DefaultConfigFile) - if err != nil && os.IsNotExist(err) { - log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err) - cluster = &arvados.Cluster{ - ClusterID: "xxxxx", - } - } else if err != nil { - log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err) - } else { - cluster, err = cfg.GetCluster("") - if err != nil { - log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err) - } - } - - log.Println("keepstore starting, pid", os.Getpid()) - defer log.Println("keepstore exiting, pid", os.Getpid()) - - // Start a round-robin VolumeManager with the volumes we have found. - KeepVM = MakeRRVolumeManager(theConfig.Volumes) - - // Middleware/handler stack - router := MakeRESTRouter(cluster, metricsRegistry) - - // Set up a TCP listener. - listener, err := net.Listen("tcp", theConfig.Listen) - if err != nil { - log.Fatal(err) - } - - // Initialize keepclient for pull workers - keepClient := &keepclient.KeepClient{ - Arvados: &arvadosclient.ArvadosClient{}, - Want_replicas: 1, - } - - // Initialize the pullq and workers - pullq = NewWorkQueue() - for i := 0; i < 1 || i < theConfig.PullWorkers; i++ { - go RunPullWorker(pullq, keepClient) - } - - // Initialize the trashq and workers - trashq = NewWorkQueue() - for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ { - go RunTrashWorker(trashq) - } - - // Start emptyTrash goroutine - doneEmptyingTrash := make(chan bool) - go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration()) - - // Shut down the server gracefully (by closing the listener) - // if SIGTERM is received. - term := make(chan os.Signal, 1) - go func(sig <-chan os.Signal) { - s := <-sig - log.Println("caught signal:", s) - doneEmptyingTrash <- true - listener.Close() - }(term) - signal.Notify(term, syscall.SIGTERM) - signal.Notify(term, syscall.SIGINT) - - if _, err := daemon.SdNotify(false, "READY=1"); err != nil { - log.Printf("Error notifying init daemon: %v", err) - } - log.Println("listening at", listener.Addr()) - srv := &server{} - srv.Handler = router - srv.Serve(listener) -} - // Periodically (once per interval) invoke EmptyTrash on all volumes. -func emptyTrash(done <-chan bool, interval time.Duration) { - ticker := time.NewTicker(interval) - - for { - select { - case <-ticker.C: - for _, v := range theConfig.Volumes { - if v.Writable() { - v.EmptyTrash() - } - } - case <-done: - ticker.Stop() - return +func emptyTrash(mounts []*VolumeMount, interval time.Duration) { + for range time.NewTicker(interval).C { + for _, v := range mounts { + v.EmptyTrash() } } }