5562: Use static method. Fixes "TypeError: _socket_open() takes exactly 5 arguments...
[arvados.git] / services / keepstore / keepstore.go
index 06054f5205cf7d38b01b179ee5365230120ccc89..5333625bbdff670fea074154190aba685ac8d782 100644 (file)
@@ -1,9 +1,12 @@
 package main
 
 import (
+       "bufio"
        "bytes"
+       "errors"
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io/ioutil"
        "log"
        "net"
@@ -91,12 +94,98 @@ func (e *KeepError) Error() string {
 // Initialized by the --volumes flag (or by FindKeepVolumes).
 var KeepVM VolumeManager
 
-// The pull list manager is a singleton pull list (a list of blocks
-// that the current keepstore process should be pulling from remote
-// keepstore servers in order to increase data replication) with
-// atomic update methods that are safe to use from multiple
-// goroutines.
-var pullmgr *BlockWorkList
+// The pull list manager and trash queue are threadsafe queues which
+// support atomic update operations. The PullHandler and TrashHandler
+// store results from Data Manager /pull and /trash requests here.
+//
+// See the Keep and Data Manager design documents for more details:
+// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
+// https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
+//
+var pullq *WorkQueue
+var trashq *WorkQueue
+
+var (
+       flagSerializeIO bool
+       flagReadonly    bool
+)
+type volumeSet []Volume
+
+func (vs *volumeSet) Set(value string) error {
+       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+               log.Print("DEPRECATED: using comma-separated volume list.")
+               for _, dir := range dirs {
+                       if err := vs.Set(dir); err != nil {
+                               return err
+                       }
+               }
+               return nil
+       }
+       if len(value) == 0 || value[0] != '/' {
+               return errors.New("Invalid volume: must begin with '/'.")
+       }
+       if _, err := os.Stat(value); err != nil {
+               return err
+       }
+       *vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly))
+       return nil
+}
+
+func (vs *volumeSet) String() string {
+       s := "["
+       for i, v := range *vs {
+               if i > 0 {
+                       s = s + " "
+               }
+               s = s + v.String()
+       }
+       return s + "]"
+}
+
+// Discover adds a volume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *volumeSet) Discover() int {
+       added := 0
+       f, err := os.Open(PROC_MOUNTS)
+       if err != nil {
+               log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
+       }
+       scanner := bufio.NewScanner(f)
+       for scanner.Scan() {
+               args := strings.Fields(scanner.Text())
+               if err := scanner.Err(); err != nil {
+                       log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
+               }
+               dev, mount := args[0], args[1]
+               if mount == "/" {
+                       continue
+               }
+               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+                       continue
+               }
+               keepdir := mount + "/keep"
+               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+                       continue
+               }
+               // Set the -readonly flag (but only for this volume)
+               // if the filesystem is mounted readonly.
+               flagReadonlyWas := flagReadonly
+               for _, fsopt := range strings.Split(args[3], ",") {
+                       if fsopt == "ro" {
+                               flagReadonly = true
+                               break
+                       }
+                       if fsopt == "rw" {
+                               break
+                       }
+               }
+               vs.Set(keepdir)
+               flagReadonly = flagReadonlyWas
+               added++
+       }
+       return added
+}
 
 // TODO(twp): continue moving as much code as possible out of main
 // so it can be effectively tested. Esp. handling and postprocessing
@@ -106,32 +195,12 @@ var pullmgr *BlockWorkList
 func main() {
        log.Println("Keep started: pid", os.Getpid())
 
-       // Parse command-line flags:
-       //
-       // -listen=ipaddr:port
-       //    Interface on which to listen for requests. Use :port without
-       //    an ipaddr to listen on all network interfaces.
-       //    Examples:
-       //      -listen=127.0.0.1:4949
-       //      -listen=10.0.1.24:8000
-       //      -listen=:25107 (to listen to port 25107 on all interfaces)
-       //
-       // -volumes
-       //    A comma-separated list of directories to use as Keep volumes.
-       //    Example:
-       //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
-       //
-       //    If -volumes is empty or is not present, Keep will select volumes
-       //    by looking at currently mounted filesystems for /keep top-level
-       //    directories.
-
        var (
                data_manager_token_file string
                listen                  string
                permission_key_file     string
                permission_ttl_sec      int
-               serialize_io            bool
-               volumearg               string
+               volumes                 volumeSet
                pidfile                 string
        )
        flag.StringVar(
@@ -149,9 +218,7 @@ func main() {
                &listen,
                "listen",
                DEFAULT_ADDR,
-               "Interface on which to listen for requests, in the format "+
-                       "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
-                       "to listen on all network interfaces.")
+               "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
        flag.BoolVar(
                &never_delete,
                "never-delete",
@@ -171,20 +238,23 @@ func main() {
                "Expiration time (in seconds) for newly generated permission "+
                        "signatures.")
        flag.BoolVar(
-               &serialize_io,
+               &flagSerializeIO,
                "serialize",
                false,
-               "If set, all read and write operations on local Keep volumes will "+
-                       "be serialized.")
-       flag.StringVar(
-               &volumearg,
+               "Serialize read and write operations on the following volumes.")
+       flag.BoolVar(
+               &flagReadonly,
+               "readonly",
+               false,
+               "Do not write, delete, or touch anything on the following volumes.")
+       flag.Var(
+               &volumes,
                "volumes",
-               "",
-               "Comma-separated list of directories to use for Keep volumes, "+
-                       "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
-                       "supplied, Keep will scan mounted filesystems for volumes "+
-                       "with a /keep top-level directory.")
-
+               "Deprecated synonym for -volume.")
+       flag.Var(
+               &volumes,
+               "volume",
+               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
        flag.StringVar(
                &pidfile,
                "pid",
@@ -193,31 +263,14 @@ func main() {
 
        flag.Parse()
 
-       // Look for local keep volumes.
-       var keepvols []string
-       if volumearg == "" {
-               // TODO(twp): decide whether this is desirable default behavior.
-               // In production we may want to require the admin to specify
-               // Keep volumes explicitly.
-               keepvols = FindKeepVolumes()
-       } else {
-               keepvols = strings.Split(volumearg, ",")
-       }
-
-       // Check that the specified volumes actually exist.
-       var goodvols []Volume = nil
-       for _, v := range keepvols {
-               if _, err := os.Stat(v); err == nil {
-                       log.Println("adding Keep volume:", v)
-                       newvol := MakeUnixVolume(v, serialize_io)
-                       goodvols = append(goodvols, &newvol)
-               } else {
-                       log.Printf("bad Keep volume: %s\n", err)
+       if len(volumes) == 0 {
+               if volumes.Discover() == 0 {
+                       log.Fatal("No volumes found.")
                }
        }
 
-       if len(goodvols) == 0 {
-               log.Fatal("could not find any keep volumes")
+       for _, v := range volumes {
+               log.Printf("Using volume %v (writable=%v)", v, v.Writable())
        }
 
        // Initialize data manager token and permission key.
@@ -256,11 +309,13 @@ func main() {
        }
 
        // Start a round-robin VolumeManager with the volumes we have found.
-       KeepVM = MakeRRVolumeManager(goodvols)
+       KeepVM = MakeRRVolumeManager(volumes)
 
-       // Tell the built-in HTTP server to direct all requests to the REST
-       // router.
-       http.Handle("/", MakeRESTRouter())
+       // Tell the built-in HTTP server to direct all requests to the REST router.
+       loggingRouter := MakeLoggingRESTRouter()
+       http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
+               loggingRouter.ServeHTTP(resp, req)
+       })
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", listen)
@@ -268,6 +323,22 @@ func main() {
                log.Fatal(err)
        }
 
+       // Initialize Pull queue and worker
+       keepClient := &keepclient.KeepClient{
+               Arvados:       nil,
+               Want_replicas: 1,
+               Using_proxy:   true,
+               Client:        &http.Client{},
+       }
+
+       // Initialize the pullq and worker
+       pullq = NewWorkQueue()
+       go RunPullWorker(pullq, keepClient)
+
+       // Initialize the trashq and worker
+       trashq = NewWorkQueue()
+       go RunTrashWorker(trashq)
+
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
        term := make(chan os.Signal, 1)