X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a87fd176c3248699cbaaa45b815c8a34b3f5f11b..ca06cfbda0e84d469f7810a280cfa4dfa8997260:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 48b83de4b8..23af6906bb 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -1,32 +1,27 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( - "bytes" "flag" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/httpserver" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "io/ioutil" - "log" "net" "net/http" "os" "os/signal" - "strings" "syscall" "time" -) - -// ====================== -// Configuration settings -// -// TODO(twp): make all of these configurable via command line flags -// and/or configuration file settings. -// Default TCP address on which to listen for requests. -// Initialized by the --listen flag. -const DefaultAddr = ":25107" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/config" + "git.curoverse.com/arvados.git/sdk/go/httpserver" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + arvadosVersion "git.curoverse.com/arvados.git/sdk/go/version" + log "github.com/Sirupsen/logrus" + "github.com/coreos/go-systemd/daemon" +) // A Keep "block" is 64MB. const BlockSize = 64 * 1024 * 1024 @@ -38,36 +33,6 @@ const MinFreeKilobytes = BlockSize / 1024 // ProcMounts /proc/mounts var ProcMounts = "/proc/mounts" -// enforcePermissions controls whether permission signatures -// should be enforced (affecting GET and DELETE requests). -// Initialized by the -enforce-permissions flag. -var enforcePermissions bool - -// blobSignatureTTL is the time duration for which new permission -// signatures (returned by PUT requests) will be valid. -// Initialized by the -permission-ttl flag. -var blobSignatureTTL time.Duration - -// dataManagerToken represents the API token used by the -// Data Manager, and is required on certain privileged operations. -// Initialized by the -data-manager-token-file flag. -var dataManagerToken string - -// neverDelete can be used to prevent the DELETE handler from -// actually deleting anything. -var neverDelete = true - -// trashLifetime is the time duration after a block is trashed -// during which it can be recovered using an /untrash request -// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively. -var trashLifetime time.Duration - -// trashCheckInterval is the time duration at which the emptyTrash goroutine -// will check and delete expired trashed blocks. Default is one day. -// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively. -var trashCheckInterval time.Duration - -var maxBuffers = 128 var bufs *bufferPool // KeepError types. @@ -121,132 +86,57 @@ var KeepVM VolumeManager var pullq *WorkQueue var trashq *WorkQueue -type volumeSet []Volume +func main() { + deprecated.beforeFlagParse(theConfig) -var ( - flagSerializeIO bool - flagReadonly bool - volumes volumeSet -) + dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)") + getVersion := flag.Bool("version", false, "Print version information and exit.") -func (vs *volumeSet) String() string { - return fmt.Sprintf("%+v", (*vs)[:]) -} + defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml" + var configPath string + flag.StringVar( + &configPath, + "config", + defaultConfigPath, + "YAML or JSON configuration file `path`") + flag.Usage = usage + flag.Parse() -// TODO(twp): continue moving as much code as possible out of main -// so it can be effectively tested. Esp. handling and postprocessing -// of command line flags (identifying Keep volumes and initializing -// permission arguments). + // Print version information if requested + if *getVersion { + fmt.Printf("Version: %s\n", arvadosVersion.GetVersion()) + os.Exit(0) + } -func main() { - log.Println("keepstore starting, pid", os.Getpid()) - defer log.Println("keepstore exiting, pid", os.Getpid()) + deprecated.afterFlagParse(theConfig) - var ( - dataManagerTokenFile string - listen string - blobSigningKeyFile string - permissionTTLSec int - pidfile string - maxRequests int - ) - flag.StringVar( - &dataManagerTokenFile, - "data-manager-token-file", - "", - "File with the API token used by the Data Manager. All DELETE "+ - "requests or GET /index requests must carry this token.") - flag.BoolVar( - &enforcePermissions, - "enforce-permissions", - false, - "Enforce permission signatures on requests.") - flag.StringVar( - &listen, - "listen", - DefaultAddr, - "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.") - flag.IntVar( - &maxRequests, - "max-requests", - 0, - "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)") - flag.BoolVar( - &neverDelete, - "never-delete", - true, - "If true, nothing will be deleted. "+ - "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+ - "You should leave this option alone unless you can afford to lose data.") - flag.StringVar( - &blobSigningKeyFile, - "permission-key-file", - "", - "Synonym for -blob-signing-key-file.") - flag.StringVar( - &blobSigningKeyFile, - "blob-signing-key-file", - "", - "File containing the secret key for generating and verifying "+ - "blob permission signatures.") - flag.IntVar( - &permissionTTLSec, - "permission-ttl", - 0, - "Synonym for -blob-signature-ttl.") - flag.IntVar( - &permissionTTLSec, - "blob-signature-ttl", - 2*7*24*3600, - "Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. "+ - "See services/api/config/application.default.yml.") - flag.BoolVar( - &flagSerializeIO, - "serialize", - false, - "Serialize read and write operations on the following volumes.") - flag.BoolVar( - &flagReadonly, - "readonly", - false, - "Do not write, delete, or touch anything on the following volumes.") - flag.StringVar( - &pidfile, - "pid", - "", - "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.") - flag.IntVar( - &maxBuffers, - "max-buffers", - maxBuffers, - fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20)) - flag.DurationVar( - &trashLifetime, - "trash-lifetime", - 0, - "Time duration after a block is trashed during which it can be recovered using an /untrash request") - flag.DurationVar( - &trashCheckInterval, - "trash-check-interval", - 24*time.Hour, - "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.") + err := config.LoadFile(theConfig, configPath) + if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) { + log.Fatal(err) + } - flag.Parse() + if *dumpConfig { + log.Fatal(config.DumpAndExit(theConfig)) + } + + log.Printf("keepstore %q started", arvadosVersion.GetVersion()) - if maxBuffers < 0 { - log.Fatal("-max-buffers must be greater than zero.") + err = theConfig.Start() + if err != nil { + log.Fatal(err) } - bufs = newBufferPool(maxBuffers, BlockSize) - if pidfile != "" { + if pidfile := theConfig.PIDFile; pidfile != "" { f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777) if err != nil { log.Fatalf("open pidfile (%s): %s", pidfile, err) } + defer f.Close() err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) if err != nil { log.Fatalf("flock pidfile (%s): %s", pidfile, err) } + defer os.Remove(pidfile) err = f.Truncate(0) if err != nil { log.Fatalf("truncate pidfile (%s): %s", pidfile, err) @@ -259,74 +149,22 @@ func main() { if err != nil { log.Fatalf("sync pidfile (%s): %s", pidfile, err) } - defer f.Close() - defer os.Remove(pidfile) - } - - if len(volumes) == 0 { - if (&unixVolumeAdder{&volumes}).Discover() == 0 { - log.Fatal("No volumes found.") - } - } - - for _, v := range volumes { - log.Printf("Using volume %v (writable=%v)", v, v.Writable()) - } - - // Initialize data manager token and permission key. - // If these tokens are specified but cannot be read, - // raise a fatal error. - if dataManagerTokenFile != "" { - if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil { - dataManagerToken = strings.TrimSpace(string(buf)) - } else { - log.Fatalf("reading data manager token: %s\n", err) - } - } - - if neverDelete != true { - log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " + - "been extensively tested. You should leave this option alone unless you can afford to lose data.") - } - - if blobSigningKeyFile != "" { - if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil { - PermissionSecret = bytes.TrimSpace(buf) - } else { - log.Fatalf("reading permission key: %s\n", err) - } - } - - blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second - - if PermissionSecret == nil { - if enforcePermissions { - log.Fatal("-enforce-permissions requires a permission key") - } else { - log.Println("Running without a PermissionSecret. Block locators " + - "returned by this server will not be signed, and will be rejected " + - "by a server that enforces permissions.") - log.Println("To fix this, use the -blob-signing-key-file flag " + - "to specify the file containing the permission key.") - } } - if maxRequests <= 0 { - maxRequests = maxBuffers * 2 - log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests) - } + log.Println("keepstore starting, pid", os.Getpid()) + defer log.Println("keepstore exiting, pid", os.Getpid()) // Start a round-robin VolumeManager with the volumes we have found. - KeepVM = MakeRRVolumeManager(volumes) + KeepVM = MakeRRVolumeManager(theConfig.Volumes) - // Middleware stack: logger, maxRequests limiter, method handlers - http.Handle("/", &LoggingRESTRouter{ - httpserver.NewRequestLimiter(maxRequests, - MakeRESTRouter()), - }) + // Middleware stack: logger, MaxRequests limiter, method handlers + router := MakeRESTRouter() + limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router) + router.limiter = limiter + http.Handle("/", &LoggingRESTRouter{router: limiter}) // Set up a TCP listener. - listener, err := net.Listen("tcp", listen) + listener, err := net.Listen("tcp", theConfig.Listen) if err != nil { log.Fatal(err) } @@ -335,7 +173,6 @@ func main() { keepClient := &keepclient.KeepClient{ Arvados: &arvadosclient.ArvadosClient{}, Want_replicas: 1, - Client: &http.Client{}, } // Initialize the pullq and worker @@ -348,7 +185,7 @@ func main() { // Start emptyTrash goroutine doneEmptyingTrash := make(chan bool) - go emptyTrash(doneEmptyingTrash, trashCheckInterval) + go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration()) // Shut down the server gracefully (by closing the listener) // if SIGTERM is received. @@ -362,24 +199,27 @@ func main() { signal.Notify(term, syscall.SIGTERM) signal.Notify(term, syscall.SIGINT) - log.Println("listening at", listen) - srv := &http.Server{Addr: listen} + if _, err := daemon.SdNotify(false, "READY=1"); err != nil { + log.Printf("Error notifying init daemon: %v", err) + } + log.Println("listening at", listener.Addr()) + srv := &http.Server{} srv.Serve(listener) } -// At every trashCheckInterval tick, invoke EmptyTrash on all volumes. -func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) { - ticker := time.NewTicker(trashCheckInterval) +// Periodically (once per interval) invoke EmptyTrash on all volumes. +func emptyTrash(done <-chan bool, interval time.Duration) { + ticker := time.NewTicker(interval) for { select { case <-ticker.C: - for _, v := range volumes { + for _, v := range theConfig.Volumes { if v.Writable() { v.EmptyTrash() } } - case <-doneEmptyingTrash: + case <-done: ticker.Stop() return }