5562: Use static method. Fixes "TypeError: _socket_open() takes exactly 5 arguments...
[arvados.git] / services / keepstore / keepstore.go
index 1e8c3d1e0fbd62f9dea1a2efce54a0a73b911a41..5333625bbdff670fea074154190aba685ac8d782 100644 (file)
@@ -1,9 +1,12 @@
 package main
 
 import (
+       "bufio"
        "bytes"
+       "errors"
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io/ioutil"
        "log"
        "net"
@@ -13,8 +16,6 @@ import (
        "strings"
        "syscall"
        "time"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
 // ======================
@@ -104,6 +105,88 @@ var KeepVM VolumeManager
 var pullq *WorkQueue
 var trashq *WorkQueue
 
+var (
+       flagSerializeIO bool
+       flagReadonly    bool
+)
+type volumeSet []Volume
+
+func (vs *volumeSet) Set(value string) error {
+       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+               log.Print("DEPRECATED: using comma-separated volume list.")
+               for _, dir := range dirs {
+                       if err := vs.Set(dir); err != nil {
+                               return err
+                       }
+               }
+               return nil
+       }
+       if len(value) == 0 || value[0] != '/' {
+               return errors.New("Invalid volume: must begin with '/'.")
+       }
+       if _, err := os.Stat(value); err != nil {
+               return err
+       }
+       *vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly))
+       return nil
+}
+
+func (vs *volumeSet) String() string {
+       s := "["
+       for i, v := range *vs {
+               if i > 0 {
+                       s = s + " "
+               }
+               s = s + v.String()
+       }
+       return s + "]"
+}
+
+// Discover adds a volume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *volumeSet) Discover() int {
+       added := 0
+       f, err := os.Open(PROC_MOUNTS)
+       if err != nil {
+               log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
+       }
+       scanner := bufio.NewScanner(f)
+       for scanner.Scan() {
+               args := strings.Fields(scanner.Text())
+               if err := scanner.Err(); err != nil {
+                       log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
+               }
+               dev, mount := args[0], args[1]
+               if mount == "/" {
+                       continue
+               }
+               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+                       continue
+               }
+               keepdir := mount + "/keep"
+               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+                       continue
+               }
+               // Set the -readonly flag (but only for this volume)
+               // if the filesystem is mounted readonly.
+               flagReadonlyWas := flagReadonly
+               for _, fsopt := range strings.Split(args[3], ",") {
+                       if fsopt == "ro" {
+                               flagReadonly = true
+                               break
+                       }
+                       if fsopt == "rw" {
+                               break
+                       }
+               }
+               vs.Set(keepdir)
+               flagReadonly = flagReadonlyWas
+               added++
+       }
+       return added
+}
+
 // TODO(twp): continue moving as much code as possible out of main
 // so it can be effectively tested. Esp. handling and postprocessing
 // of command line flags (identifying Keep volumes and initializing
@@ -112,32 +195,12 @@ var trashq *WorkQueue
 func main() {
        log.Println("Keep started: pid", os.Getpid())
 
-       // Parse command-line flags:
-       //
-       // -listen=ipaddr:port
-       //    Interface on which to listen for requests. Use :port without
-       //    an ipaddr to listen on all network interfaces.
-       //    Examples:
-       //      -listen=127.0.0.1:4949
-       //      -listen=10.0.1.24:8000
-       //      -listen=:25107 (to listen to port 25107 on all interfaces)
-       //
-       // -volumes
-       //    A comma-separated list of directories to use as Keep volumes.
-       //    Example:
-       //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
-       //
-       //    If -volumes is empty or is not present, Keep will select volumes
-       //    by looking at currently mounted filesystems for /keep top-level
-       //    directories.
-
        var (
                data_manager_token_file string
                listen                  string
                permission_key_file     string
                permission_ttl_sec      int
-               serialize_io            bool
-               volumearg               string
+               volumes                 volumeSet
                pidfile                 string
        )
        flag.StringVar(
@@ -155,9 +218,7 @@ func main() {
                &listen,
                "listen",
                DEFAULT_ADDR,
-               "Interface on which to listen for requests, in the format "+
-                       "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
-                       "to listen on all network interfaces.")
+               "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
        flag.BoolVar(
                &never_delete,
                "never-delete",
@@ -177,20 +238,23 @@ func main() {
                "Expiration time (in seconds) for newly generated permission "+
                        "signatures.")
        flag.BoolVar(
-               &serialize_io,
+               &flagSerializeIO,
                "serialize",
                false,
-               "If set, all read and write operations on local Keep volumes will "+
-                       "be serialized.")
-       flag.StringVar(
-               &volumearg,
+               "Serialize read and write operations on the following volumes.")
+       flag.BoolVar(
+               &flagReadonly,
+               "readonly",
+               false,
+               "Do not write, delete, or touch anything on the following volumes.")
+       flag.Var(
+               &volumes,
                "volumes",
-               "",
-               "Comma-separated list of directories to use for Keep volumes, "+
-                       "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
-                       "supplied, Keep will scan mounted filesystems for volumes "+
-                       "with a /keep top-level directory.")
-
+               "Deprecated synonym for -volume.")
+       flag.Var(
+               &volumes,
+               "volume",
+               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
        flag.StringVar(
                &pidfile,
                "pid",
@@ -199,31 +263,14 @@ func main() {
 
        flag.Parse()
 
-       // Look for local keep volumes.
-       var keepvols []string
-       if volumearg == "" {
-               // TODO(twp): decide whether this is desirable default behavior.
-               // In production we may want to require the admin to specify
-               // Keep volumes explicitly.
-               keepvols = FindKeepVolumes()
-       } else {
-               keepvols = strings.Split(volumearg, ",")
-       }
-
-       // Check that the specified volumes actually exist.
-       var goodvols []Volume = nil
-       for _, v := range keepvols {
-               if _, err := os.Stat(v); err == nil {
-                       log.Println("adding Keep volume:", v)
-                       newvol := MakeUnixVolume(v, serialize_io)
-                       goodvols = append(goodvols, &newvol)
-               } else {
-                       log.Printf("bad Keep volume: %s\n", err)
+       if len(volumes) == 0 {
+               if volumes.Discover() == 0 {
+                       log.Fatal("No volumes found.")
                }
        }
 
-       if len(goodvols) == 0 {
-               log.Fatal("could not find any keep volumes")
+       for _, v := range volumes {
+               log.Printf("Using volume %v (writable=%v)", v, v.Writable())
        }
 
        // Initialize data manager token and permission key.
@@ -262,7 +309,7 @@ func main() {
        }
 
        // Start a round-robin VolumeManager with the volumes we have found.
-       KeepVM = MakeRRVolumeManager(goodvols)
+       KeepVM = MakeRRVolumeManager(volumes)
 
        // Tell the built-in HTTP server to direct all requests to the REST router.
        loggingRouter := MakeLoggingRESTRouter()
@@ -277,19 +324,21 @@ func main() {
        }
 
        // Initialize Pull queue and worker
-       arv, err := arvadosclient.MakeArvadosClient()
-       if err != nil {
-               log.Fatalf("Error setting up arvados client %s", err.Error())
-       }
-
-       keepClient, err := keepclient.MakeKeepClient(&arv)
-       if err != nil {
-               log.Fatalf("Error setting up keep client %s", err.Error())
+       keepClient := &keepclient.KeepClient{
+               Arvados:       nil,
+               Want_replicas: 1,
+               Using_proxy:   true,
+               Client:        &http.Client{},
        }
 
+       // Initialize the pullq and worker
        pullq = NewWorkQueue()
        go RunPullWorker(pullq, keepClient)
 
+       // Initialize the trashq and worker
+       trashq = NewWorkQueue()
+       go RunTrashWorker(trashq)
+
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
        term := make(chan os.Signal, 1)