X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e7a865275a832420b9d63c0ab3ebf87eaca57d26..f04693da1811e670d4cbb981debeecf14d79137c:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 2f5f8d43ea..f2973b586a 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -1,22 +1,11 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( - "flag" - "fmt" - "log" - "net" - "net/http" - "os" - "os/signal" - "syscall" "time" - - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/config" - "git.curoverse.com/arvados.git/sdk/go/httpserver" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "github.com/coreos/go-systemd/daemon" - "github.com/ghodss/yaml" ) // A Keep "block" is 64MB. @@ -26,9 +15,6 @@ const BlockSize = 64 * 1024 * 1024 // in order to permit writes. const MinFreeKilobytes = BlockSize / 1024 -// ProcMounts /proc/mounts -var ProcMounts = "/proc/mounts" - var bufs *bufferPool // KeepError types. @@ -47,6 +33,7 @@ var ( DiskHashError = &KeepError{500, "Hash mismatch in stored data"} ExpiredError = &KeepError{401, "Expired permission signature"} NotFoundError = &KeepError{404, "Not Found"} + VolumeBusyError = &KeepError{503, "Volume backend busy"} GenericError = &KeepError{500, "Fail"} FullError = &KeepError{503, "Full"} SizeRequiredError = &KeepError{411, "Missing Content-Length"} @@ -60,158 +47,11 @@ func (e *KeepError) Error() string { return e.ErrMsg } -// ======================== -// Internal data structures -// -// These global variables are used by multiple parts of the -// program. They are good candidates for moving into their own -// packages. - -// The Keep VolumeManager maintains a list of available volumes. -// Initialized by the --volumes flag (or by FindKeepVolumes). -var KeepVM VolumeManager - -// The pull list manager and trash queue are threadsafe queues which -// support atomic update operations. The PullHandler and TrashHandler -// store results from Data Manager /pull and /trash requests here. -// -// See the Keep and Data Manager design documents for more details: -// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc -// https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc -// -var pullq *WorkQueue -var trashq *WorkQueue - -func main() { - deprecated.beforeFlagParse(theConfig) - - dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)") - - defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml" - var configPath string - flag.StringVar( - &configPath, - "config", - defaultConfigPath, - "YAML or JSON configuration file `path`") - flag.Usage = usage - flag.Parse() - - deprecated.afterFlagParse(theConfig) - - err := config.LoadFile(theConfig, configPath) - if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) { - log.Fatal(err) - } - - if *dumpConfig { - y, err := yaml.Marshal(theConfig) - if err != nil { - log.Fatal(err) - } - os.Stdout.Write(y) - os.Exit(0) - } - - err = theConfig.Start() - - if pidfile := theConfig.PIDFile; pidfile != "" { - f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777) - if err != nil { - log.Fatalf("open pidfile (%s): %s", pidfile, err) - } - defer f.Close() - err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) - if err != nil { - log.Fatalf("flock pidfile (%s): %s", pidfile, err) - } - defer os.Remove(pidfile) - err = f.Truncate(0) - if err != nil { - log.Fatalf("truncate pidfile (%s): %s", pidfile, err) - } - _, err = fmt.Fprint(f, os.Getpid()) - if err != nil { - log.Fatalf("write pidfile (%s): %s", pidfile, err) - } - err = f.Sync() - if err != nil { - log.Fatalf("sync pidfile (%s): %s", pidfile, err) - } - } - - log.Println("keepstore starting, pid", os.Getpid()) - defer log.Println("keepstore exiting, pid", os.Getpid()) - - // Start a round-robin VolumeManager with the volumes we have found. - KeepVM = MakeRRVolumeManager(theConfig.Volumes) - - // Middleware stack: logger, MaxRequests limiter, method handlers - http.Handle("/", &LoggingRESTRouter{ - httpserver.NewRequestLimiter(theConfig.MaxRequests, - MakeRESTRouter()), - }) - - // Set up a TCP listener. - listener, err := net.Listen("tcp", theConfig.Listen) - if err != nil { - log.Fatal(err) - } - - // Initialize Pull queue and worker - keepClient := &keepclient.KeepClient{ - Arvados: &arvadosclient.ArvadosClient{}, - Want_replicas: 1, - Client: &http.Client{}, - } - - // Initialize the pullq and worker - pullq = NewWorkQueue() - go RunPullWorker(pullq, keepClient) - - // Initialize the trashq and worker - trashq = NewWorkQueue() - go RunTrashWorker(trashq) - - // Start emptyTrash goroutine - doneEmptyingTrash := make(chan bool) - go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration()) - - // Shut down the server gracefully (by closing the listener) - // if SIGTERM is received. - term := make(chan os.Signal, 1) - go func(sig <-chan os.Signal) { - s := <-sig - log.Println("caught signal:", s) - doneEmptyingTrash <- true - listener.Close() - }(term) - signal.Notify(term, syscall.SIGTERM) - signal.Notify(term, syscall.SIGINT) - - if _, err := daemon.SdNotify(false, "READY=1"); err != nil { - log.Printf("Error notifying init daemon: %v", err) - } - log.Println("listening at", listener.Addr()) - srv := &http.Server{} - srv.Serve(listener) -} - // Periodically (once per interval) invoke EmptyTrash on all volumes. -func emptyTrash(done <-chan bool, interval time.Duration) { - ticker := time.NewTicker(interval) - - for { - select { - case <-ticker.C: - for _, v := range theConfig.Volumes { - if v.Writable() { - v.EmptyTrash() - } - } - case <-done: - ticker.Stop() - return +func emptyTrash(mounts []*VolumeMount, interval time.Duration) { + for range time.NewTicker(interval).C { + for _, v := range mounts { + v.EmptyTrash() } } }