11644: Use filesystem UUID and path as DeviceID for local disk volumes.
[arvados.git] / services / keepstore / keepstore.go
index ec11af5cf51c8c1a65bdf4827962a74aa90a03fe..9033de811775f776499b61f5347545dd42775cc0 100644 (file)
@@ -1,33 +1,22 @@
 package main
 
 import (
-       "bufio"
-       "bytes"
-       "errors"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "io/ioutil"
-       "log"
        "net"
        "net/http"
        "os"
        "os/signal"
-       "strings"
-       "sync"
        "syscall"
        "time"
-)
 
-// ======================
-// Configuration settings
-//
-// TODO(twp): make all of these configurable via command line flags
-// and/or configuration file settings.
-
-// Default TCP address on which to listen for requests.
-// Initialized by the --listen flag.
-const DefaultAddr = ":25107"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       log "github.com/Sirupsen/logrus"
+       "github.com/coreos/go-systemd/daemon"
+)
 
 // A Keep "block" is 64MB.
 const BlockSize = 64 * 1024 * 1024
@@ -36,33 +25,9 @@ const BlockSize = 64 * 1024 * 1024
 // in order to permit writes.
 const MinFreeKilobytes = BlockSize / 1024
 
-// Until #6221 is resolved, never_delete must be true.
-// However, allow it to be false in testing with TEST_DATA_MANAGER_TOKEN
-const TEST_DATA_MANAGER_TOKEN = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
-
 // ProcMounts /proc/mounts
 var ProcMounts = "/proc/mounts"
 
-// enforcePermissions controls whether permission signatures
-// should be enforced (affecting GET and DELETE requests).
-// Initialized by the -enforce-permissions flag.
-var enforcePermissions bool
-
-// blobSignatureTTL is the time duration for which new permission
-// signatures (returned by PUT requests) will be valid.
-// Initialized by the -permission-ttl flag.
-var blobSignatureTTL time.Duration
-
-// dataManagerToken represents the API token used by the
-// Data Manager, and is required on certain privileged operations.
-// Initialized by the -data-manager-token-file flag.
-var dataManagerToken string
-
-// neverDelete can be used to prevent the DELETE handler from
-// actually deleting anything.
-var neverDelete = true
-
-var maxBuffers = 128
 var bufs *bufferPool
 
 // KeepError types.
@@ -86,6 +51,8 @@ var (
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
        TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
+       ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
+       ErrClientDisconnect = &KeepError{503, "Client disconnected"}
 )
 
 func (e *KeepError) Error() string {
@@ -114,203 +81,48 @@ var KeepVM VolumeManager
 var pullq *WorkQueue
 var trashq *WorkQueue
 
-var (
-       flagSerializeIO bool
-       flagReadonly    bool
-)
+func main() {
+       deprecated.beforeFlagParse(theConfig)
 
-type volumeSet []Volume
+       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
 
-func (vs *volumeSet) Set(value string) error {
-       if dirs := strings.Split(value, ","); len(dirs) > 1 {
-               log.Print("DEPRECATED: using comma-separated volume list.")
-               for _, dir := range dirs {
-                       if err := vs.Set(dir); err != nil {
-                               return err
-                       }
-               }
-               return nil
-       }
-       if len(value) == 0 || value[0] != '/' {
-               return errors.New("Invalid volume: must begin with '/'.")
-       }
-       if _, err := os.Stat(value); err != nil {
-               return err
-       }
-       var locker sync.Locker
-       if flagSerializeIO {
-               locker = &sync.Mutex{}
-       }
-       *vs = append(*vs, &UnixVolume{
-               root:     value,
-               locker:   locker,
-               readonly: flagReadonly,
-       })
-       return nil
-}
+       defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
+       var configPath string
+       flag.StringVar(
+               &configPath,
+               "config",
+               defaultConfigPath,
+               "YAML or JSON configuration file `path`")
+       flag.Usage = usage
+       flag.Parse()
 
-func (vs *volumeSet) String() string {
-       s := "["
-       for i, v := range *vs {
-               if i > 0 {
-                       s = s + " "
-               }
-               s = s + v.String()
-       }
-       return s + "]"
-}
+       deprecated.afterFlagParse(theConfig)
 
-// Discover adds a volume for every directory named "keep" that is
-// located at the top level of a device- or tmpfs-backed mount point
-// other than "/". It returns the number of volumes added.
-func (vs *volumeSet) Discover() int {
-       added := 0
-       f, err := os.Open(ProcMounts)
-       if err != nil {
-               log.Fatalf("opening %s: %s", ProcMounts, err)
-       }
-       scanner := bufio.NewScanner(f)
-       for scanner.Scan() {
-               args := strings.Fields(scanner.Text())
-               if err := scanner.Err(); err != nil {
-                       log.Fatalf("reading %s: %s", ProcMounts, err)
-               }
-               dev, mount := args[0], args[1]
-               if mount == "/" {
-                       continue
-               }
-               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
-                       continue
-               }
-               keepdir := mount + "/keep"
-               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
-                       continue
-               }
-               // Set the -readonly flag (but only for this volume)
-               // if the filesystem is mounted readonly.
-               flagReadonlyWas := flagReadonly
-               for _, fsopt := range strings.Split(args[3], ",") {
-                       if fsopt == "ro" {
-                               flagReadonly = true
-                               break
-                       }
-                       if fsopt == "rw" {
-                               break
-                       }
-               }
-               vs.Set(keepdir)
-               flagReadonly = flagReadonlyWas
-               added++
+       err := config.LoadFile(theConfig, configPath)
+       if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
+               log.Fatal(err)
        }
-       return added
-}
-
-// TODO(twp): continue moving as much code as possible out of main
-// so it can be effectively tested. Esp. handling and postprocessing
-// of command line flags (identifying Keep volumes and initializing
-// permission arguments).
-
-func main() {
-       log.Println("keepstore starting, pid", os.Getpid())
-       defer log.Println("keepstore exiting, pid", os.Getpid())
-
-       var (
-               dataManagerTokenFile string
-               listen               string
-               blobSigningKeyFile   string
-               permissionTTLSec     int
-               volumes              volumeSet
-               pidfile              string
-       )
-       flag.StringVar(
-               &dataManagerTokenFile,
-               "data-manager-token-file",
-               "",
-               "File with the API token used by the Data Manager. All DELETE "+
-                       "requests or GET /index requests must carry this token.")
-       flag.BoolVar(
-               &enforcePermissions,
-               "enforce-permissions",
-               false,
-               "Enforce permission signatures on requests.")
-       flag.StringVar(
-               &listen,
-               "listen",
-               DefaultAddr,
-               "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
-       flag.BoolVar(
-               &neverDelete,
-               "never-delete",
-               true,
-               "If set, nothing will be deleted. HTTP 405 will be returned "+
-                       "for valid DELETE requests.")
-       flag.StringVar(
-               &blobSigningKeyFile,
-               "permission-key-file",
-               "",
-               "Synonym for -blob-signing-key-file.")
-       flag.StringVar(
-               &blobSigningKeyFile,
-               "blob-signing-key-file",
-               "",
-               "File containing the secret key for generating and verifying "+
-                       "blob permission signatures.")
-       flag.IntVar(
-               &permissionTTLSec,
-               "permission-ttl",
-               0,
-               "Synonym for -blob-signature-ttl.")
-       flag.IntVar(
-               &permissionTTLSec,
-               "blob-signature-ttl",
-               int(time.Duration(2*7*24*time.Hour).Seconds()),
-               "Lifetime of blob permission signatures. "+
-                       "See services/api/config/application.default.yml.")
-       flag.BoolVar(
-               &flagSerializeIO,
-               "serialize",
-               false,
-               "Serialize read and write operations on the following volumes.")
-       flag.BoolVar(
-               &flagReadonly,
-               "readonly",
-               false,
-               "Do not write, delete, or touch anything on the following volumes.")
-       flag.Var(
-               &volumes,
-               "volumes",
-               "Deprecated synonym for -volume.")
-       flag.Var(
-               &volumes,
-               "volume",
-               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
-       flag.StringVar(
-               &pidfile,
-               "pid",
-               "",
-               "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
-       flag.IntVar(
-               &maxBuffers,
-               "max-buffers",
-               maxBuffers,
-               fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
 
-       flag.Parse()
+       if *dumpConfig {
+               log.Fatal(config.DumpAndExit(theConfig))
+       }
 
-       if maxBuffers < 0 {
-               log.Fatal("-max-buffers must be greater than zero.")
+       err = theConfig.Start()
+       if err != nil {
+               log.Fatal(err)
        }
-       bufs = newBufferPool(maxBuffers, BlockSize)
 
-       if pidfile != "" {
+       if pidfile := theConfig.PIDFile; pidfile != "" {
                f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
                if err != nil {
                        log.Fatalf("open pidfile (%s): %s", pidfile, err)
                }
+               defer f.Close()
                err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
                if err != nil {
                        log.Fatalf("flock pidfile (%s): %s", pidfile, err)
                }
+               defer os.Remove(pidfile)
                err = f.Truncate(0)
                if err != nil {
                        log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
@@ -323,77 +135,30 @@ func main() {
                if err != nil {
                        log.Fatalf("sync pidfile (%s): %s", pidfile, err)
                }
-               defer f.Close()
-               defer os.Remove(pidfile)
-       }
-
-       if len(volumes) == 0 {
-               if volumes.Discover() == 0 {
-                       log.Fatal("No volumes found.")
-               }
-       }
-
-       for _, v := range volumes {
-               log.Printf("Using volume %v (writable=%v)", v, v.Writable())
-       }
-
-       // Initialize data manager token and permission key.
-       // If these tokens are specified but cannot be read,
-       // raise a fatal error.
-       if dataManagerTokenFile != "" {
-               if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
-                       dataManagerToken = strings.TrimSpace(string(buf))
-               } else {
-                       log.Fatalf("reading data manager token: %s\n", err)
-               }
-       }
-
-       if neverDelete != true && dataManagerToken != TEST_DATA_MANAGER_TOKEN {
-               log.Fatal("never_delete must be true, see #6221")
-       }
-
-       if blobSigningKeyFile != "" {
-               if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
-                       PermissionSecret = bytes.TrimSpace(buf)
-               } else {
-                       log.Fatalf("reading permission key: %s\n", err)
-               }
        }
 
-       blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
-
-       if PermissionSecret == nil {
-               if enforcePermissions {
-                       log.Fatal("-enforce-permissions requires a permission key")
-               } else {
-                       log.Println("Running without a PermissionSecret. Block locators " +
-                               "returned by this server will not be signed, and will be rejected " +
-                               "by a server that enforces permissions.")
-                       log.Println("To fix this, use the -blob-signing-key-file flag " +
-                               "to specify the file containing the permission key.")
-               }
-       }
+       log.Println("keepstore starting, pid", os.Getpid())
+       defer log.Println("keepstore exiting, pid", os.Getpid())
 
        // Start a round-robin VolumeManager with the volumes we have found.
-       KeepVM = MakeRRVolumeManager(volumes)
+       KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
-       // Tell the built-in HTTP server to direct all requests to the REST router.
-       loggingRouter := MakeLoggingRESTRouter()
-       http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
-               loggingRouter.ServeHTTP(resp, req)
-       })
+       // Middleware stack: logger, MaxRequests limiter, method handlers
+       router := MakeRESTRouter()
+       limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
+       router.limiter = limiter
+       http.Handle("/", &LoggingRESTRouter{router: limiter})
 
        // Set up a TCP listener.
-       listener, err := net.Listen("tcp", listen)
+       listener, err := net.Listen("tcp", theConfig.Listen)
        if err != nil {
                log.Fatal(err)
        }
 
        // Initialize Pull queue and worker
        keepClient := &keepclient.KeepClient{
-               Arvados:       nil,
+               Arvados:       &arvadosclient.ArvadosClient{},
                Want_replicas: 1,
-               Using_proxy:   true,
                Client:        &http.Client{},
        }
 
@@ -405,18 +170,45 @@ func main() {
        trashq = NewWorkQueue()
        go RunTrashWorker(trashq)
 
+       // Start emptyTrash goroutine
+       doneEmptyingTrash := make(chan bool)
+       go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
+
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
        term := make(chan os.Signal, 1)
        go func(sig <-chan os.Signal) {
                s := <-sig
                log.Println("caught signal:", s)
+               doneEmptyingTrash <- true
                listener.Close()
        }(term)
        signal.Notify(term, syscall.SIGTERM)
        signal.Notify(term, syscall.SIGINT)
 
-       log.Println("listening at", listen)
-       srv := &http.Server{Addr: listen}
+       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+               log.Printf("Error notifying init daemon: %v", err)
+       }
+       log.Println("listening at", listener.Addr())
+       srv := &http.Server{}
        srv.Serve(listener)
 }
+
+// Periodically (once per interval) invoke EmptyTrash on all volumes.
+func emptyTrash(done <-chan bool, interval time.Duration) {
+       ticker := time.NewTicker(interval)
+
+       for {
+               select {
+               case <-ticker.C:
+                       for _, v := range theConfig.Volumes {
+                               if v.Writable() {
+                                       v.EmptyTrash()
+                               }
+                       }
+               case <-done:
+                       ticker.Stop()
+                       return
+               }
+       }
+}