11644: Use filesystem UUID and path as DeviceID for local disk volumes.
[arvados.git] / services / keepstore / keepstore.go
index a363bac2553998e6356216f77472bcbf537b78d3..9033de811775f776499b61f5347545dd42775cc0 100644 (file)
@@ -1,61 +1,36 @@
 package main
 
 import (
-       "bytes"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "io/ioutil"
-       "log"
        "net"
        "net/http"
        "os"
        "os/signal"
-       "strings"
        "syscall"
        "time"
-)
 
-// ======================
-// Configuration settings
-//
-// TODO(twp): make all of these configurable via command line flags
-// and/or configuration file settings.
-
-// Default TCP address on which to listen for requests.
-// Initialized by the --listen flag.
-const DEFAULT_ADDR = ":25107"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       log "github.com/Sirupsen/logrus"
+       "github.com/coreos/go-systemd/daemon"
+)
 
 // A Keep "block" is 64MB.
-const BLOCKSIZE = 64 * 1024 * 1024
+const BlockSize = 64 * 1024 * 1024
 
-// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// A Keep volume must have at least MinFreeKilobytes available
 // in order to permit writes.
-const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
-
-var PROC_MOUNTS = "/proc/mounts"
-
-// enforce_permissions controls whether permission signatures
-// should be enforced (affecting GET and DELETE requests).
-// Initialized by the --enforce-permissions flag.
-var enforce_permissions bool
-
-// permission_ttl is the time duration for which new permission
-// signatures (returned by PUT requests) will be valid.
-// Initialized by the --permission-ttl flag.
-var permission_ttl time.Duration
+const MinFreeKilobytes = BlockSize / 1024
 
-// data_manager_token represents the API token used by the
-// Data Manager, and is required on certain privileged operations.
-// Initialized by the --data-manager-token-file flag.
-var data_manager_token string
+// ProcMounts /proc/mounts
+var ProcMounts = "/proc/mounts"
 
-// never_delete can be used to prevent the DELETE handler from
-// actually deleting anything.
-var never_delete = false
+var bufs *bufferPool
 
-// ==========
-// Error types.
+// KeepError types.
 //
 type KeepError struct {
        HTTPCode int
@@ -73,8 +48,11 @@ var (
        NotFoundError       = &KeepError{404, "Not Found"}
        GenericError        = &KeepError{500, "Fail"}
        FullError           = &KeepError{503, "Full"}
-       TooLongError        = &KeepError{504, "Timeout"}
+       SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
+       TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
+       ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
+       ErrClientDisconnect = &KeepError{503, "Client disconnected"}
 )
 
 func (e *KeepError) Error() string {
@@ -103,183 +81,84 @@ var KeepVM VolumeManager
 var pullq *WorkQueue
 var trashq *WorkQueue
 
-// TODO(twp): continue moving as much code as possible out of main
-// so it can be effectively tested. Esp. handling and postprocessing
-// of command line flags (identifying Keep volumes and initializing
-// permission arguments).
-
 func main() {
-       log.Println("Keep started: pid", os.Getpid())
-
-       // Parse command-line flags:
-       //
-       // -listen=ipaddr:port
-       //    Interface on which to listen for requests. Use :port without
-       //    an ipaddr to listen on all network interfaces.
-       //    Examples:
-       //      -listen=127.0.0.1:4949
-       //      -listen=10.0.1.24:8000
-       //      -listen=:25107 (to listen to port 25107 on all interfaces)
-       //
-       // -volumes
-       //    A comma-separated list of directories to use as Keep volumes.
-       //    Example:
-       //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
-       //
-       //    If -volumes is empty or is not present, Keep will select volumes
-       //    by looking at currently mounted filesystems for /keep top-level
-       //    directories.
-
-       var (
-               data_manager_token_file string
-               listen                  string
-               permission_key_file     string
-               permission_ttl_sec      int
-               serialize_io            bool
-               volumearg               string
-               pidfile                 string
-       )
-       flag.StringVar(
-               &data_manager_token_file,
-               "data-manager-token-file",
-               "",
-               "File with the API token used by the Data Manager. All DELETE "+
-                       "requests or GET /index requests must carry this token.")
-       flag.BoolVar(
-               &enforce_permissions,
-               "enforce-permissions",
-               false,
-               "Enforce permission signatures on requests.")
-       flag.StringVar(
-               &listen,
-               "listen",
-               DEFAULT_ADDR,
-               "Interface on which to listen for requests, in the format "+
-                       "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
-                       "to listen on all network interfaces.")
-       flag.BoolVar(
-               &never_delete,
-               "never-delete",
-               false,
-               "If set, nothing will be deleted. HTTP 405 will be returned "+
-                       "for valid DELETE requests.")
-       flag.StringVar(
-               &permission_key_file,
-               "permission-key-file",
-               "",
-               "File containing the secret key for generating and verifying "+
-                       "permission signatures.")
-       flag.IntVar(
-               &permission_ttl_sec,
-               "permission-ttl",
-               1209600,
-               "Expiration time (in seconds) for newly generated permission "+
-                       "signatures.")
-       flag.BoolVar(
-               &serialize_io,
-               "serialize",
-               false,
-               "If set, all read and write operations on local Keep volumes will "+
-                       "be serialized.")
-       flag.StringVar(
-               &volumearg,
-               "volumes",
-               "",
-               "Comma-separated list of directories to use for Keep volumes, "+
-                       "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
-                       "supplied, Keep will scan mounted filesystems for volumes "+
-                       "with a /keep top-level directory.")
+       deprecated.beforeFlagParse(theConfig)
 
-       flag.StringVar(
-               &pidfile,
-               "pid",
-               "",
-               "Path to write pid file")
+       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
 
+       defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
+       var configPath string
+       flag.StringVar(
+               &configPath,
+               "config",
+               defaultConfigPath,
+               "YAML or JSON configuration file `path`")
+       flag.Usage = usage
        flag.Parse()
 
-       // Look for local keep volumes.
-       var keepvols []string
-       if volumearg == "" {
-               // TODO(twp): decide whether this is desirable default behavior.
-               // In production we may want to require the admin to specify
-               // Keep volumes explicitly.
-               keepvols = FindKeepVolumes()
-       } else {
-               keepvols = strings.Split(volumearg, ",")
+       deprecated.afterFlagParse(theConfig)
+
+       err := config.LoadFile(theConfig, configPath)
+       if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
+               log.Fatal(err)
        }
 
-       // Check that the specified volumes actually exist.
-       var goodvols []Volume = nil
-       for _, v := range keepvols {
-               if _, err := os.Stat(v); err == nil {
-                       log.Println("adding Keep volume:", v)
-                       newvol := MakeUnixVolume(v, serialize_io)
-                       goodvols = append(goodvols, &newvol)
-               } else {
-                       log.Printf("bad Keep volume: %s\n", err)
-               }
+       if *dumpConfig {
+               log.Fatal(config.DumpAndExit(theConfig))
        }
 
-       if len(goodvols) == 0 {
-               log.Fatal("could not find any keep volumes")
+       err = theConfig.Start()
+       if err != nil {
+               log.Fatal(err)
        }
 
-       // Initialize data manager token and permission key.
-       // If these tokens are specified but cannot be read,
-       // raise a fatal error.
-       if data_manager_token_file != "" {
-               if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
-                       data_manager_token = strings.TrimSpace(string(buf))
-               } else {
-                       log.Fatalf("reading data manager token: %s\n", err)
+       if pidfile := theConfig.PIDFile; pidfile != "" {
+               f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
+               if err != nil {
+                       log.Fatalf("open pidfile (%s): %s", pidfile, err)
                }
-       }
-       if permission_key_file != "" {
-               if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
-                       PermissionSecret = bytes.TrimSpace(buf)
-               } else {
-                       log.Fatalf("reading permission key: %s\n", err)
+               defer f.Close()
+               err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+               if err != nil {
+                       log.Fatalf("flock pidfile (%s): %s", pidfile, err)
                }
-       }
-
-       // Initialize permission TTL
-       permission_ttl = time.Duration(permission_ttl_sec) * time.Second
-
-       // If --enforce-permissions is true, we must have a permission key
-       // to continue.
-       if PermissionSecret == nil {
-               if enforce_permissions {
-                       log.Fatal("--enforce-permissions requires a permission key")
-               } else {
-                       log.Println("Running without a PermissionSecret. Block locators " +
-                               "returned by this server will not be signed, and will be rejected " +
-                               "by a server that enforces permissions.")
-                       log.Println("To fix this, run Keep with --permission-key-file=<path> " +
-                               "to define the location of a file containing the permission key.")
+               defer os.Remove(pidfile)
+               err = f.Truncate(0)
+               if err != nil {
+                       log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
+               }
+               _, err = fmt.Fprint(f, os.Getpid())
+               if err != nil {
+                       log.Fatalf("write pidfile (%s): %s", pidfile, err)
+               }
+               err = f.Sync()
+               if err != nil {
+                       log.Fatalf("sync pidfile (%s): %s", pidfile, err)
                }
        }
 
+       log.Println("keepstore starting, pid", os.Getpid())
+       defer log.Println("keepstore exiting, pid", os.Getpid())
+
        // Start a round-robin VolumeManager with the volumes we have found.
-       KeepVM = MakeRRVolumeManager(goodvols)
+       KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
-       // Tell the built-in HTTP server to direct all requests to the REST router.
-       loggingRouter := MakeLoggingRESTRouter()
-       http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
-               loggingRouter.ServeHTTP(resp, req)
-       })
+       // Middleware stack: logger, MaxRequests limiter, method handlers
+       router := MakeRESTRouter()
+       limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
+       router.limiter = limiter
+       http.Handle("/", &LoggingRESTRouter{router: limiter})
 
        // Set up a TCP listener.
-       listener, err := net.Listen("tcp", listen)
+       listener, err := net.Listen("tcp", theConfig.Listen)
        if err != nil {
                log.Fatal(err)
        }
 
        // Initialize Pull queue and worker
-       keepClient := keepclient.KeepClient{
-               Arvados:       nil,
+       keepClient := &keepclient.KeepClient{
+               Arvados:       &arvadosclient.ArvadosClient{},
                Want_replicas: 1,
-               Using_proxy:   true,
                Client:        &http.Client{},
        }
 
@@ -291,33 +170,45 @@ func main() {
        trashq = NewWorkQueue()
        go RunTrashWorker(trashq)
 
+       // Start emptyTrash goroutine
+       doneEmptyingTrash := make(chan bool)
+       go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
+
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
        term := make(chan os.Signal, 1)
        go func(sig <-chan os.Signal) {
                s := <-sig
                log.Println("caught signal:", s)
+               doneEmptyingTrash <- true
                listener.Close()
        }(term)
        signal.Notify(term, syscall.SIGTERM)
+       signal.Notify(term, syscall.SIGINT)
 
-       if pidfile != "" {
-               f, err := os.Create(pidfile)
-               if err == nil {
-                       fmt.Fprint(f, os.Getpid())
-                       f.Close()
-               } else {
-                       log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
-               }
+       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+               log.Printf("Error notifying init daemon: %v", err)
        }
-
-       // Start listening for requests.
-       srv := &http.Server{Addr: listen}
+       log.Println("listening at", listener.Addr())
+       srv := &http.Server{}
        srv.Serve(listener)
+}
 
-       log.Println("shutting down")
-
-       if pidfile != "" {
-               os.Remove(pidfile)
+// Periodically (once per interval) invoke EmptyTrash on all volumes.
+func emptyTrash(done <-chan bool, interval time.Duration) {
+       ticker := time.NewTicker(interval)
+
+       for {
+               select {
+               case <-ticker.C:
+                       for _, v := range theConfig.Volumes {
+                               if v.Writable() {
+                                       v.EmptyTrash()
+                               }
+                       }
+               case <-done:
+                       ticker.Stop()
+                       return
+               }
        }
 }