Merge branch 'master' into 7200-keepproxy-index-api
[arvados.git] / services / keepstore / keepstore.go
index a363bac2553998e6356216f77472bcbf537b78d3..3e360e1799117e80e773e1e5c58fa3b5560b07ef 100644 (file)
@@ -1,7 +1,9 @@
 package main
 
 import (
+       "bufio"
        "bytes"
+       "errors"
        "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -12,6 +14,7 @@ import (
        "os"
        "os/signal"
        "strings"
+       "sync"
        "syscall"
        "time"
 )
@@ -24,38 +27,45 @@ import (
 
 // Default TCP address on which to listen for requests.
 // Initialized by the --listen flag.
-const DEFAULT_ADDR = ":25107"
+const DefaultAddr = ":25107"
 
 // A Keep "block" is 64MB.
-const BLOCKSIZE = 64 * 1024 * 1024
+const BlockSize = 64 * 1024 * 1024
 
-// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// A Keep volume must have at least MinFreeKilobytes available
 // in order to permit writes.
-const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
+const MinFreeKilobytes = BlockSize / 1024
 
-var PROC_MOUNTS = "/proc/mounts"
+// Until #6221 is resolved, never_delete must be true.
+// However, allow it to be false in testing with TestDataManagerToken
+const TestDataManagerToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
 
-// enforce_permissions controls whether permission signatures
+// ProcMounts /proc/mounts
+var ProcMounts = "/proc/mounts"
+
+// enforcePermissions controls whether permission signatures
 // should be enforced (affecting GET and DELETE requests).
-// Initialized by the --enforce-permissions flag.
-var enforce_permissions bool
+// Initialized by the -enforce-permissions flag.
+var enforcePermissions bool
 
-// permission_ttl is the time duration for which new permission
+// blobSignatureTTL is the time duration for which new permission
 // signatures (returned by PUT requests) will be valid.
-// Initialized by the --permission-ttl flag.
-var permission_ttl time.Duration
+// Initialized by the -permission-ttl flag.
+var blobSignatureTTL time.Duration
 
-// data_manager_token represents the API token used by the
+// dataManagerToken represents the API token used by the
 // Data Manager, and is required on certain privileged operations.
-// Initialized by the --data-manager-token-file flag.
-var data_manager_token string
+// Initialized by the -data-manager-token-file flag.
+var dataManagerToken string
 
-// never_delete can be used to prevent the DELETE handler from
+// neverDelete can be used to prevent the DELETE handler from
 // actually deleting anything.
-var never_delete = false
+var neverDelete = true
+
+var maxBuffers = 128
+var bufs *bufferPool
 
-// ==========
-// Error types.
+// KeepError types.
 //
 type KeepError struct {
        HTTPCode int
@@ -73,7 +83,8 @@ var (
        NotFoundError       = &KeepError{404, "Not Found"}
        GenericError        = &KeepError{500, "Fail"}
        FullError           = &KeepError{503, "Full"}
-       TooLongError        = &KeepError{504, "Timeout"}
+       SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
+       TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
 )
 
@@ -103,165 +114,268 @@ var KeepVM VolumeManager
 var pullq *WorkQueue
 var trashq *WorkQueue
 
+var (
+       flagSerializeIO bool
+       flagReadonly    bool
+)
+
+type volumeSet []Volume
+
+func (vs *volumeSet) Set(value string) error {
+       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+               log.Print("DEPRECATED: using comma-separated volume list.")
+               for _, dir := range dirs {
+                       if err := vs.Set(dir); err != nil {
+                               return err
+                       }
+               }
+               return nil
+       }
+       if len(value) == 0 || value[0] != '/' {
+               return errors.New("Invalid volume: must begin with '/'.")
+       }
+       if _, err := os.Stat(value); err != nil {
+               return err
+       }
+       var locker sync.Locker
+       if flagSerializeIO {
+               locker = &sync.Mutex{}
+       }
+       *vs = append(*vs, &UnixVolume{
+               root:     value,
+               locker:   locker,
+               readonly: flagReadonly,
+       })
+       return nil
+}
+
+func (vs *volumeSet) String() string {
+       s := "["
+       for i, v := range *vs {
+               if i > 0 {
+                       s = s + " "
+               }
+               s = s + v.String()
+       }
+       return s + "]"
+}
+
+// Discover adds a volume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *volumeSet) Discover() int {
+       added := 0
+       f, err := os.Open(ProcMounts)
+       if err != nil {
+               log.Fatalf("opening %s: %s", ProcMounts, err)
+       }
+       scanner := bufio.NewScanner(f)
+       for scanner.Scan() {
+               args := strings.Fields(scanner.Text())
+               if err := scanner.Err(); err != nil {
+                       log.Fatalf("reading %s: %s", ProcMounts, err)
+               }
+               dev, mount := args[0], args[1]
+               if mount == "/" {
+                       continue
+               }
+               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+                       continue
+               }
+               keepdir := mount + "/keep"
+               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+                       continue
+               }
+               // Set the -readonly flag (but only for this volume)
+               // if the filesystem is mounted readonly.
+               flagReadonlyWas := flagReadonly
+               for _, fsopt := range strings.Split(args[3], ",") {
+                       if fsopt == "ro" {
+                               flagReadonly = true
+                               break
+                       }
+                       if fsopt == "rw" {
+                               break
+                       }
+               }
+               vs.Set(keepdir)
+               flagReadonly = flagReadonlyWas
+               added++
+       }
+       return added
+}
+
 // TODO(twp): continue moving as much code as possible out of main
 // so it can be effectively tested. Esp. handling and postprocessing
 // of command line flags (identifying Keep volumes and initializing
 // permission arguments).
 
 func main() {
-       log.Println("Keep started: pid", os.Getpid())
-
-       // Parse command-line flags:
-       //
-       // -listen=ipaddr:port
-       //    Interface on which to listen for requests. Use :port without
-       //    an ipaddr to listen on all network interfaces.
-       //    Examples:
-       //      -listen=127.0.0.1:4949
-       //      -listen=10.0.1.24:8000
-       //      -listen=:25107 (to listen to port 25107 on all interfaces)
-       //
-       // -volumes
-       //    A comma-separated list of directories to use as Keep volumes.
-       //    Example:
-       //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
-       //
-       //    If -volumes is empty or is not present, Keep will select volumes
-       //    by looking at currently mounted filesystems for /keep top-level
-       //    directories.
+       log.Println("keepstore starting, pid", os.Getpid())
+       defer log.Println("keepstore exiting, pid", os.Getpid())
 
        var (
-               data_manager_token_file string
-               listen                  string
-               permission_key_file     string
-               permission_ttl_sec      int
-               serialize_io            bool
-               volumearg               string
-               pidfile                 string
+               dataManagerTokenFile string
+               listen               string
+               blobSigningKeyFile   string
+               permissionTTLSec     int
+               volumes              volumeSet
+               pidfile              string
        )
        flag.StringVar(
-               &data_manager_token_file,
+               &dataManagerTokenFile,
                "data-manager-token-file",
                "",
                "File with the API token used by the Data Manager. All DELETE "+
                        "requests or GET /index requests must carry this token.")
        flag.BoolVar(
-               &enforce_permissions,
+               &enforcePermissions,
                "enforce-permissions",
                false,
                "Enforce permission signatures on requests.")
        flag.StringVar(
                &listen,
                "listen",
-               DEFAULT_ADDR,
-               "Interface on which to listen for requests, in the format "+
-                       "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
-                       "to listen on all network interfaces.")
+               DefaultAddr,
+               "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
        flag.BoolVar(
-               &never_delete,
+               &neverDelete,
                "never-delete",
-               false,
+               true,
                "If set, nothing will be deleted. HTTP 405 will be returned "+
                        "for valid DELETE requests.")
        flag.StringVar(
-               &permission_key_file,
+               &blobSigningKeyFile,
                "permission-key-file",
                "",
+               "Synonym for -blob-signing-key-file.")
+       flag.StringVar(
+               &blobSigningKeyFile,
+               "blob-signing-key-file",
+               "",
                "File containing the secret key for generating and verifying "+
-                       "permission signatures.")
+                       "blob permission signatures.")
        flag.IntVar(
-               &permission_ttl_sec,
+               &permissionTTLSec,
                "permission-ttl",
-               1209600,
-               "Expiration time (in seconds) for newly generated permission "+
-                       "signatures.")
+               0,
+               "Synonym for -blob-signature-ttl.")
+       flag.IntVar(
+               &permissionTTLSec,
+               "blob-signature-ttl",
+               int(time.Duration(2*7*24*time.Hour).Seconds()),
+               "Lifetime of blob permission signatures. "+
+                       "See services/api/config/application.default.yml.")
        flag.BoolVar(
-               &serialize_io,
+               &flagSerializeIO,
                "serialize",
                false,
-               "If set, all read and write operations on local Keep volumes will "+
-                       "be serialized.")
-       flag.StringVar(
-               &volumearg,
+               "Serialize read and write operations on the following volumes.")
+       flag.BoolVar(
+               &flagReadonly,
+               "readonly",
+               false,
+               "Do not write, delete, or touch anything on the following volumes.")
+       flag.Var(
+               &volumes,
                "volumes",
-               "",
-               "Comma-separated list of directories to use for Keep volumes, "+
-                       "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
-                       "supplied, Keep will scan mounted filesystems for volumes "+
-                       "with a /keep top-level directory.")
-
+               "Deprecated synonym for -volume.")
+       flag.Var(
+               &volumes,
+               "volume",
+               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
        flag.StringVar(
                &pidfile,
                "pid",
                "",
-               "Path to write pid file")
+               "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
+       flag.IntVar(
+               &maxBuffers,
+               "max-buffers",
+               maxBuffers,
+               fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
 
        flag.Parse()
 
-       // Look for local keep volumes.
-       var keepvols []string
-       if volumearg == "" {
-               // TODO(twp): decide whether this is desirable default behavior.
-               // In production we may want to require the admin to specify
-               // Keep volumes explicitly.
-               keepvols = FindKeepVolumes()
-       } else {
-               keepvols = strings.Split(volumearg, ",")
+       if maxBuffers < 0 {
+               log.Fatal("-max-buffers must be greater than zero.")
        }
+       bufs = newBufferPool(maxBuffers, BlockSize)
 
-       // Check that the specified volumes actually exist.
-       var goodvols []Volume = nil
-       for _, v := range keepvols {
-               if _, err := os.Stat(v); err == nil {
-                       log.Println("adding Keep volume:", v)
-                       newvol := MakeUnixVolume(v, serialize_io)
-                       goodvols = append(goodvols, &newvol)
-               } else {
-                       log.Printf("bad Keep volume: %s\n", err)
+       if pidfile != "" {
+               f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
+               if err != nil {
+                       log.Fatalf("open pidfile (%s): %s", pidfile, err)
+               }
+               err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+               if err != nil {
+                       log.Fatalf("flock pidfile (%s): %s", pidfile, err)
+               }
+               err = f.Truncate(0)
+               if err != nil {
+                       log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
                }
+               _, err = fmt.Fprint(f, os.Getpid())
+               if err != nil {
+                       log.Fatalf("write pidfile (%s): %s", pidfile, err)
+               }
+               err = f.Sync()
+               if err != nil {
+                       log.Fatalf("sync pidfile (%s): %s", pidfile, err)
+               }
+               defer f.Close()
+               defer os.Remove(pidfile)
        }
 
-       if len(goodvols) == 0 {
-               log.Fatal("could not find any keep volumes")
+       if len(volumes) == 0 {
+               if volumes.Discover() == 0 {
+                       log.Fatal("No volumes found.")
+               }
+       }
+
+       for _, v := range volumes {
+               log.Printf("Using volume %v (writable=%v)", v, v.Writable())
        }
 
        // Initialize data manager token and permission key.
        // If these tokens are specified but cannot be read,
        // raise a fatal error.
-       if data_manager_token_file != "" {
-               if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
-                       data_manager_token = strings.TrimSpace(string(buf))
+       if dataManagerTokenFile != "" {
+               if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
+                       dataManagerToken = strings.TrimSpace(string(buf))
                } else {
                        log.Fatalf("reading data manager token: %s\n", err)
                }
        }
-       if permission_key_file != "" {
-               if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
+
+       if neverDelete != true && dataManagerToken != TestDataManagerToken {
+               log.Fatal("never_delete must be true, see #6221")
+       }
+
+       if blobSigningKeyFile != "" {
+               if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
                        PermissionSecret = bytes.TrimSpace(buf)
                } else {
                        log.Fatalf("reading permission key: %s\n", err)
                }
        }
 
-       // Initialize permission TTL
-       permission_ttl = time.Duration(permission_ttl_sec) * time.Second
+       blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
 
-       // If --enforce-permissions is true, we must have a permission key
-       // to continue.
        if PermissionSecret == nil {
-               if enforce_permissions {
-                       log.Fatal("--enforce-permissions requires a permission key")
+               if enforcePermissions {
+                       log.Fatal("-enforce-permissions requires a permission key")
                } else {
                        log.Println("Running without a PermissionSecret. Block locators " +
                                "returned by this server will not be signed, and will be rejected " +
                                "by a server that enforces permissions.")
-                       log.Println("To fix this, run Keep with --permission-key-file=<path> " +
-                               "to define the location of a file containing the permission key.")
+                       log.Println("To fix this, use the -blob-signing-key-file flag " +
+                               "to specify the file containing the permission key.")
                }
        }
 
        // Start a round-robin VolumeManager with the volumes we have found.
-       KeepVM = MakeRRVolumeManager(goodvols)
+       KeepVM = MakeRRVolumeManager(volumes)
 
        // Tell the built-in HTTP server to direct all requests to the REST router.
        loggingRouter := MakeLoggingRESTRouter()
@@ -276,7 +390,7 @@ func main() {
        }
 
        // Initialize Pull queue and worker
-       keepClient := keepclient.KeepClient{
+       keepClient := &keepclient.KeepClient{
                Arvados:       nil,
                Want_replicas: 1,
                Using_proxy:   true,
@@ -300,24 +414,9 @@ func main() {
                listener.Close()
        }(term)
        signal.Notify(term, syscall.SIGTERM)
+       signal.Notify(term, syscall.SIGINT)
 
-       if pidfile != "" {
-               f, err := os.Create(pidfile)
-               if err == nil {
-                       fmt.Fprint(f, os.Getpid())
-                       f.Close()
-               } else {
-                       log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
-               }
-       }
-
-       // Start listening for requests.
+       log.Println("listening at", listen)
        srv := &http.Server{Addr: listen}
        srv.Serve(listener)
-
-       log.Println("shutting down")
-
-       if pidfile != "" {
-               os.Remove(pidfile)
-       }
 }