X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cc572d714646b424d70facff4feb6d36480e8f7c..da83807d6bcef1c1f0bb78479c5ec17f150f5eda:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 79e3017d55..953aa047cb 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -2,39 +2,21 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( - "flag" - "fmt" - "net" - "os" - "os/signal" - "syscall" "time" - - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/config" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "github.com/coreos/go-systemd/daemon" ) -var version = "dev" - -// A Keep "block" is 64MB. +// BlockSize for a Keep "block" is 64MB. const BlockSize = 64 * 1024 * 1024 -// A Keep volume must have at least MinFreeKilobytes available +// MinFreeKilobytes is the amount of space a Keep volume must have available // in order to permit writes. const MinFreeKilobytes = BlockSize / 1024 -// ProcMounts /proc/mounts -var ProcMounts = "/proc/mounts" - var bufs *bufferPool -// KeepError types. -// type KeepError struct { HTTPCode int ErrMsg string @@ -49,6 +31,7 @@ var ( DiskHashError = &KeepError{500, "Hash mismatch in stored data"} ExpiredError = &KeepError{401, "Expired permission signature"} NotFoundError = &KeepError{404, "Not Found"} + VolumeBusyError = &KeepError{503, "Volume backend busy"} GenericError = &KeepError{500, "Fail"} FullError = &KeepError{503, "Full"} SizeRequiredError = &KeepError{411, "Missing Content-Length"} @@ -62,166 +45,13 @@ func (e *KeepError) Error() string { return e.ErrMsg } -// ======================== -// Internal data structures -// -// These global variables are used by multiple parts of the -// program. They are good candidates for moving into their own -// packages. - -// The Keep VolumeManager maintains a list of available volumes. -// Initialized by the --volumes flag (or by FindKeepVolumes). -var KeepVM VolumeManager - -// The pull list manager and trash queue are threadsafe queues which -// support atomic update operations. The PullHandler and TrashHandler -// store results from Data Manager /pull and /trash requests here. -// -// See the Keep and Data Manager design documents for more details: -// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc -// https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc -// -var pullq *WorkQueue -var trashq *WorkQueue - -func main() { - deprecated.beforeFlagParse(theConfig) - - dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)") - getVersion := flag.Bool("version", false, "Print version information and exit.") - - defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml" - var configPath string - flag.StringVar( - &configPath, - "config", - defaultConfigPath, - "YAML or JSON configuration file `path`") - flag.Usage = usage - flag.Parse() - - // Print version information if requested - if *getVersion { - fmt.Printf("keepstore %s\n", version) - return - } - - deprecated.afterFlagParse(theConfig) - - err := config.LoadFile(theConfig, configPath) - if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) { - log.Fatal(err) - } - - if *dumpConfig { - log.Fatal(config.DumpAndExit(theConfig)) - } - - log.Printf("keepstore %s started", version) - - err = theConfig.Start() - if err != nil { - log.Fatal(err) - } - - if pidfile := theConfig.PIDFile; pidfile != "" { - f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777) - if err != nil { - log.Fatalf("open pidfile (%s): %s", pidfile, err) - } - defer f.Close() - err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) - if err != nil { - log.Fatalf("flock pidfile (%s): %s", pidfile, err) - } - defer os.Remove(pidfile) - err = f.Truncate(0) - if err != nil { - log.Fatalf("truncate pidfile (%s): %s", pidfile, err) - } - _, err = fmt.Fprint(f, os.Getpid()) - if err != nil { - log.Fatalf("write pidfile (%s): %s", pidfile, err) - } - err = f.Sync() - if err != nil { - log.Fatalf("sync pidfile (%s): %s", pidfile, err) - } - } - - log.Println("keepstore starting, pid", os.Getpid()) - defer log.Println("keepstore exiting, pid", os.Getpid()) - - // Start a round-robin VolumeManager with the volumes we have found. - KeepVM = MakeRRVolumeManager(theConfig.Volumes) - - // Middleware/handler stack - router := MakeRESTRouter() - - // Set up a TCP listener. - listener, err := net.Listen("tcp", theConfig.Listen) - if err != nil { - log.Fatal(err) - } - - // Initialize keepclient for pull workers - keepClient := &keepclient.KeepClient{ - Arvados: &arvadosclient.ArvadosClient{}, - Want_replicas: 1, - } - - // Initialize the pullq and workers - pullq = NewWorkQueue() - for i := 0; i < 1 || i < theConfig.PullWorkers; i++ { - go RunPullWorker(pullq, keepClient) - } - - // Initialize the trashq and workers - trashq = NewWorkQueue() - for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ { - go RunTrashWorker(trashq) - } - - // Start emptyTrash goroutine - doneEmptyingTrash := make(chan bool) - go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration()) - - // Shut down the server gracefully (by closing the listener) - // if SIGTERM is received. - term := make(chan os.Signal, 1) - go func(sig <-chan os.Signal) { - s := <-sig - log.Println("caught signal:", s) - doneEmptyingTrash <- true - listener.Close() - }(term) - signal.Notify(term, syscall.SIGTERM) - signal.Notify(term, syscall.SIGINT) - - if _, err := daemon.SdNotify(false, "READY=1"); err != nil { - log.Printf("Error notifying init daemon: %v", err) - } - log.Println("listening at", listener.Addr()) - srv := &server{} - srv.Handler = router - srv.Serve(listener) -} - // Periodically (once per interval) invoke EmptyTrash on all volumes. -func emptyTrash(done <-chan bool, interval time.Duration) { - ticker := time.NewTicker(interval) - - for { - select { - case <-ticker.C: - for _, v := range theConfig.Volumes { - if v.Writable() { - v.EmptyTrash() - } +func emptyTrash(mounts []*VolumeMount, interval time.Duration) { + for range time.NewTicker(interval).C { + for _, v := range mounts { + if v.KeepMount.AllowTrash { + v.EmptyTrash() } - case <-done: - ticker.Stop() - return } } }