8561: Fix test_arvados_node_match
[arvados.git] / services / keepstore / keepstore.go
index a363bac2553998e6356216f77472bcbf537b78d3..3850e993fc511216d4967b1c757109ced844a591 100644 (file)
@@ -24,38 +24,45 @@ import (
 
 // Default TCP address on which to listen for requests.
 // Initialized by the --listen flag.
-const DEFAULT_ADDR = ":25107"
+const DefaultAddr = ":25107"
 
 // A Keep "block" is 64MB.
-const BLOCKSIZE = 64 * 1024 * 1024
+const BlockSize = 64 * 1024 * 1024
 
-// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// A Keep volume must have at least MinFreeKilobytes available
 // in order to permit writes.
-const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
+const MinFreeKilobytes = BlockSize / 1024
 
-var PROC_MOUNTS = "/proc/mounts"
+// ProcMounts /proc/mounts
+var ProcMounts = "/proc/mounts"
 
-// enforce_permissions controls whether permission signatures
+// enforcePermissions controls whether permission signatures
 // should be enforced (affecting GET and DELETE requests).
-// Initialized by the --enforce-permissions flag.
-var enforce_permissions bool
+// Initialized by the -enforce-permissions flag.
+var enforcePermissions bool
 
-// permission_ttl is the time duration for which new permission
+// blobSignatureTTL is the time duration for which new permission
 // signatures (returned by PUT requests) will be valid.
-// Initialized by the --permission-ttl flag.
-var permission_ttl time.Duration
+// Initialized by the -permission-ttl flag.
+var blobSignatureTTL time.Duration
 
-// data_manager_token represents the API token used by the
+// dataManagerToken represents the API token used by the
 // Data Manager, and is required on certain privileged operations.
-// Initialized by the --data-manager-token-file flag.
-var data_manager_token string
+// Initialized by the -data-manager-token-file flag.
+var dataManagerToken string
 
-// never_delete can be used to prevent the DELETE handler from
+// neverDelete can be used to prevent the DELETE handler from
 // actually deleting anything.
-var never_delete = false
+var neverDelete = true
 
-// ==========
-// Error types.
+// trashLifetime is the time duration after a block is trashed
+// during which it can be recovered using an /untrash request
+var trashLifetime time.Duration
+
+var maxBuffers = 128
+var bufs *bufferPool
+
+// KeepError types.
 //
 type KeepError struct {
        HTTPCode int
@@ -73,8 +80,10 @@ var (
        NotFoundError       = &KeepError{404, "Not Found"}
        GenericError        = &KeepError{500, "Fail"}
        FullError           = &KeepError{503, "Full"}
-       TooLongError        = &KeepError{504, "Timeout"}
+       SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
+       TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
+       ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
 )
 
 func (e *KeepError) Error() string {
@@ -103,165 +112,187 @@ var KeepVM VolumeManager
 var pullq *WorkQueue
 var trashq *WorkQueue
 
+type volumeSet []Volume
+
+var (
+       flagSerializeIO bool
+       flagReadonly    bool
+       volumes         volumeSet
+)
+
+func (vs *volumeSet) String() string {
+       return fmt.Sprintf("%+v", (*vs)[:])
+}
+
 // TODO(twp): continue moving as much code as possible out of main
 // so it can be effectively tested. Esp. handling and postprocessing
 // of command line flags (identifying Keep volumes and initializing
 // permission arguments).
 
 func main() {
-       log.Println("Keep started: pid", os.Getpid())
-
-       // Parse command-line flags:
-       //
-       // -listen=ipaddr:port
-       //    Interface on which to listen for requests. Use :port without
-       //    an ipaddr to listen on all network interfaces.
-       //    Examples:
-       //      -listen=127.0.0.1:4949
-       //      -listen=10.0.1.24:8000
-       //      -listen=:25107 (to listen to port 25107 on all interfaces)
-       //
-       // -volumes
-       //    A comma-separated list of directories to use as Keep volumes.
-       //    Example:
-       //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
-       //
-       //    If -volumes is empty or is not present, Keep will select volumes
-       //    by looking at currently mounted filesystems for /keep top-level
-       //    directories.
+       log.Println("keepstore starting, pid", os.Getpid())
+       defer log.Println("keepstore exiting, pid", os.Getpid())
 
        var (
-               data_manager_token_file string
-               listen                  string
-               permission_key_file     string
-               permission_ttl_sec      int
-               serialize_io            bool
-               volumearg               string
-               pidfile                 string
+               dataManagerTokenFile string
+               listen               string
+               blobSigningKeyFile   string
+               permissionTTLSec     int
+               pidfile              string
        )
        flag.StringVar(
-               &data_manager_token_file,
+               &dataManagerTokenFile,
                "data-manager-token-file",
                "",
                "File with the API token used by the Data Manager. All DELETE "+
                        "requests or GET /index requests must carry this token.")
        flag.BoolVar(
-               &enforce_permissions,
+               &enforcePermissions,
                "enforce-permissions",
                false,
                "Enforce permission signatures on requests.")
        flag.StringVar(
                &listen,
                "listen",
-               DEFAULT_ADDR,
-               "Interface on which to listen for requests, in the format "+
-                       "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
-                       "to listen on all network interfaces.")
+               DefaultAddr,
+               "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
        flag.BoolVar(
-               &never_delete,
+               &neverDelete,
                "never-delete",
-               false,
-               "If set, nothing will be deleted. HTTP 405 will be returned "+
-                       "for valid DELETE requests.")
+               true,
+               "If true, nothing will be deleted. "+
+                       "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
+                       "You should leave this option alone unless you can afford to lose data.")
        flag.StringVar(
-               &permission_key_file,
+               &blobSigningKeyFile,
                "permission-key-file",
                "",
+               "Synonym for -blob-signing-key-file.")
+       flag.StringVar(
+               &blobSigningKeyFile,
+               "blob-signing-key-file",
+               "",
                "File containing the secret key for generating and verifying "+
-                       "permission signatures.")
+                       "blob permission signatures.")
        flag.IntVar(
-               &permission_ttl_sec,
+               &permissionTTLSec,
                "permission-ttl",
-               1209600,
-               "Expiration time (in seconds) for newly generated permission "+
-                       "signatures.")
+               0,
+               "Synonym for -blob-signature-ttl.")
+       flag.IntVar(
+               &permissionTTLSec,
+               "blob-signature-ttl",
+               int(time.Duration(2*7*24*time.Hour).Seconds()),
+               "Lifetime of blob permission signatures. "+
+                       "See services/api/config/application.default.yml.")
        flag.BoolVar(
-               &serialize_io,
+               &flagSerializeIO,
                "serialize",
                false,
-               "If set, all read and write operations on local Keep volumes will "+
-                       "be serialized.")
-       flag.StringVar(
-               &volumearg,
-               "volumes",
-               "",
-               "Comma-separated list of directories to use for Keep volumes, "+
-                       "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
-                       "supplied, Keep will scan mounted filesystems for volumes "+
-                       "with a /keep top-level directory.")
-
+               "Serialize read and write operations on the following volumes.")
+       flag.BoolVar(
+               &flagReadonly,
+               "readonly",
+               false,
+               "Do not write, delete, or touch anything on the following volumes.")
        flag.StringVar(
                &pidfile,
                "pid",
                "",
-               "Path to write pid file")
+               "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
+       flag.IntVar(
+               &maxBuffers,
+               "max-buffers",
+               maxBuffers,
+               fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+       flag.DurationVar(
+               &trashLifetime,
+               "trash-lifetime",
+               0*time.Second,
+               "Interval after a block is trashed during which it can be recovered using an /untrash request")
 
        flag.Parse()
 
-       // Look for local keep volumes.
-       var keepvols []string
-       if volumearg == "" {
-               // TODO(twp): decide whether this is desirable default behavior.
-               // In production we may want to require the admin to specify
-               // Keep volumes explicitly.
-               keepvols = FindKeepVolumes()
-       } else {
-               keepvols = strings.Split(volumearg, ",")
+       if maxBuffers < 0 {
+               log.Fatal("-max-buffers must be greater than zero.")
        }
+       bufs = newBufferPool(maxBuffers, BlockSize)
 
-       // Check that the specified volumes actually exist.
-       var goodvols []Volume = nil
-       for _, v := range keepvols {
-               if _, err := os.Stat(v); err == nil {
-                       log.Println("adding Keep volume:", v)
-                       newvol := MakeUnixVolume(v, serialize_io)
-                       goodvols = append(goodvols, &newvol)
-               } else {
-                       log.Printf("bad Keep volume: %s\n", err)
+       if pidfile != "" {
+               f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
+               if err != nil {
+                       log.Fatalf("open pidfile (%s): %s", pidfile, err)
+               }
+               err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+               if err != nil {
+                       log.Fatalf("flock pidfile (%s): %s", pidfile, err)
+               }
+               err = f.Truncate(0)
+               if err != nil {
+                       log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
+               }
+               _, err = fmt.Fprint(f, os.Getpid())
+               if err != nil {
+                       log.Fatalf("write pidfile (%s): %s", pidfile, err)
+               }
+               err = f.Sync()
+               if err != nil {
+                       log.Fatalf("sync pidfile (%s): %s", pidfile, err)
+               }
+               defer f.Close()
+               defer os.Remove(pidfile)
+       }
+
+       if len(volumes) == 0 {
+               if (&unixVolumeAdder{&volumes}).Discover() == 0 {
+                       log.Fatal("No volumes found.")
                }
        }
 
-       if len(goodvols) == 0 {
-               log.Fatal("could not find any keep volumes")
+       for _, v := range volumes {
+               log.Printf("Using volume %v (writable=%v)", v, v.Writable())
        }
 
        // Initialize data manager token and permission key.
        // If these tokens are specified but cannot be read,
        // raise a fatal error.
-       if data_manager_token_file != "" {
-               if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
-                       data_manager_token = strings.TrimSpace(string(buf))
+       if dataManagerTokenFile != "" {
+               if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
+                       dataManagerToken = strings.TrimSpace(string(buf))
                } else {
                        log.Fatalf("reading data manager token: %s\n", err)
                }
        }
-       if permission_key_file != "" {
-               if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
+
+       if neverDelete != true {
+               log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
+                       "been extensively tested. You should leave this option alone unless you can afford to lose data.")
+       }
+
+       if blobSigningKeyFile != "" {
+               if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
                        PermissionSecret = bytes.TrimSpace(buf)
                } else {
                        log.Fatalf("reading permission key: %s\n", err)
                }
        }
 
-       // Initialize permission TTL
-       permission_ttl = time.Duration(permission_ttl_sec) * time.Second
+       blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
 
-       // If --enforce-permissions is true, we must have a permission key
-       // to continue.
        if PermissionSecret == nil {
-               if enforce_permissions {
-                       log.Fatal("--enforce-permissions requires a permission key")
+               if enforcePermissions {
+                       log.Fatal("-enforce-permissions requires a permission key")
                } else {
                        log.Println("Running without a PermissionSecret. Block locators " +
                                "returned by this server will not be signed, and will be rejected " +
                                "by a server that enforces permissions.")
-                       log.Println("To fix this, run Keep with --permission-key-file=<path> " +
-                               "to define the location of a file containing the permission key.")
+                       log.Println("To fix this, use the -blob-signing-key-file flag " +
+                               "to specify the file containing the permission key.")
                }
        }
 
        // Start a round-robin VolumeManager with the volumes we have found.
-       KeepVM = MakeRRVolumeManager(goodvols)
+       KeepVM = MakeRRVolumeManager(volumes)
 
        // Tell the built-in HTTP server to direct all requests to the REST router.
        loggingRouter := MakeLoggingRESTRouter()
@@ -276,10 +307,9 @@ func main() {
        }
 
        // Initialize Pull queue and worker
-       keepClient := keepclient.KeepClient{
+       keepClient := &keepclient.KeepClient{
                Arvados:       nil,
                Want_replicas: 1,
-               Using_proxy:   true,
                Client:        &http.Client{},
        }
 
@@ -300,24 +330,9 @@ func main() {
                listener.Close()
        }(term)
        signal.Notify(term, syscall.SIGTERM)
+       signal.Notify(term, syscall.SIGINT)
 
-       if pidfile != "" {
-               f, err := os.Create(pidfile)
-               if err == nil {
-                       fmt.Fprint(f, os.Getpid())
-                       f.Close()
-               } else {
-                       log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
-               }
-       }
-
-       // Start listening for requests.
+       log.Println("listening at", listen)
        srv := &http.Server{Addr: listen}
        srv.Serve(listener)
-
-       log.Println("shutting down")
-
-       if pidfile != "" {
-               os.Remove(pidfile)
-       }
 }