16347: Run a dedicated keepstore process for each container.
[arvados.git] / lib / crunchrun / crunchrun.go
index 42f143f1cb8fe6bb97bfff2511e9f318f37359af..7a2afeacca3272b07df5a1bad4ea1cad8b9b8755 100644 (file)
@@ -6,6 +6,7 @@ package crunchrun
 
 import (
        "bytes"
+       "context"
        "encoding/json"
        "errors"
        "flag"
@@ -13,6 +14,8 @@ import (
        "io"
        "io/ioutil"
        "log"
+       "net"
+       "net/http"
        "os"
        "os/exec"
        "os/signal"
@@ -33,13 +36,20 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "git.arvados.org/arvados.git/sdk/go/manifest"
-       "golang.org/x/net/context"
 )
 
 type command struct{}
 
 var Command = command{}
 
+// ConfigData contains environment variables and (when needed) cluster
+// configuration, passed from dispatchcloud to crunch-run on stdin.
+type ConfigData struct {
+       Env         map[string]string
+       KeepBuffers int
+       Cluster     *arvados.Cluster
+}
+
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
 type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
@@ -1644,7 +1654,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
        detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
-       stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+       stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
        sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1674,33 +1684,45 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
-       if *stdinEnv && !ignoreDetachFlag {
-               // Load env vars on stdin if asked (but not in a
-               // detached child process, in which case stdin is
-               // /dev/null).
-               err := loadEnv(os.Stdin)
-               if err != nil {
-                       log.Print(err)
-                       return 1
-               }
-       }
-
        containerUUID := flags.Arg(0)
 
        switch {
        case *detach && !ignoreDetachFlag:
-               return Detach(containerUUID, prog, args, os.Stdout, os.Stderr)
+               return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
        case *kill >= 0:
                return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
        case *list:
                return ListProcesses(os.Stdout, os.Stderr)
        }
 
-       if containerUUID == "" {
+       if len(containerUUID) != 27 {
                log.Printf("usage: %s [options] UUID", prog)
                return 1
        }
 
+       var conf ConfigData
+       if *stdinConfig {
+               err := json.NewDecoder(os.Stdin).Decode(&conf)
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
+               for k, v := range conf.Env {
+                       err = os.Setenv(k, v)
+                       if err != nil {
+                               log.Printf("setenv(%q): %s", k, err)
+                               return 1
+                       }
+               }
+               if conf.Cluster != nil {
+                       // ClusterID is missing from the JSON
+                       // representation, but we need it to generate
+                       // a valid config file for keepstore, so we
+                       // fill it using the container UUID prefix.
+                       conf.Cluster.ClusterID = containerUUID[:5]
+               }
+       }
+
        log.Printf("crunch-run %s started", cmd.Version.String())
        time.Sleep(*sleep)
 
@@ -1708,6 +1730,16 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                arvadosclient.CertFiles = []string{*caCertsPath}
        }
 
+       var keepstoreLog bufThenWrite
+       keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr))
+       if err != nil {
+               log.Print(err)
+               return 1
+       }
+       if keepstore != nil {
+               defer keepstore.Process.Kill()
+       }
+
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Printf("%s: %v", containerUUID, err)
@@ -1729,6 +1761,19 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       if keepstore != nil {
+               w, err := cr.NewLogWriter("keepstore")
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
+               err = keepstoreLog.SetWriter(NewThrottledLogger(w))
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
+       }
+
        switch *runtimeEngine {
        case "docker":
                cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
@@ -1816,21 +1861,73 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        return 0
 }
 
-func loadEnv(rdr io.Reader) error {
-       buf, err := ioutil.ReadAll(rdr)
+func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
+       if configData.Cluster == nil || configData.KeepBuffers < 1 {
+               return nil, nil
+       }
+
+       // Rather than have an alternate way to tell keepstore how
+       // many buffers to use when starting it this way, we just
+       // modify the cluster configuration that we feed it on stdin.
+       configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+
+       ln, err := net.Listen("tcp", "localhost:0")
+       if err != nil {
+               return nil, err
+       }
+       _, port, err := net.SplitHostPort(ln.Addr().String())
+       if err != nil {
+               ln.Close()
+               return nil, err
+       }
+       ln.Close()
+       url := "http://localhost:" + port
+
+       fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
+
+       var confJSON bytes.Buffer
+       err = json.NewEncoder(&confJSON).Encode(arvados.Config{
+               Clusters: map[string]arvados.Cluster{
+                       configData.Cluster.ClusterID: *configData.Cluster,
+               },
+       })
        if err != nil {
-               return fmt.Errorf("read stdin: %s", err)
+               return nil, err
+       }
+       cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+       cmd.Stdin = &confJSON
+       cmd.Stdout = logbuf
+       cmd.Stderr = logbuf
+       cmd.Env = []string{
+               "GOGC=10",
+               "ARVADOS_SERVICE_INTERNAL_URL=" + url,
        }
-       var env map[string]string
-       err = json.Unmarshal(buf, &env)
+       err = cmd.Start()
        if err != nil {
-               return fmt.Errorf("decode stdin: %s", err)
+               return nil, fmt.Errorf("error starting keepstore process: %w", err)
        }
-       for k, v := range env {
-               err = os.Setenv(k, v)
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
+       defer cancel()
+       poll := time.NewTicker(time.Second / 10)
+       defer poll.Stop()
+       client := http.Client{}
+       for range poll.C {
+               testReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
                if err != nil {
-                       return fmt.Errorf("setenv(%q): %s", k, err)
+                       return nil, err
+               }
+               resp, err := client.Do(testReq)
+               if err == nil {
+                       // Success -- don't need to check the
+                       // response, we just need to know it's
+                       // accepting requests.
+                       resp.Body.Close()
+                       break
+               }
+               if ctx.Err() != nil {
+                       return nil, fmt.Errorf("timed out waiting for new keepstore process to accept a request")
                }
        }
-       return nil
+       os.Setenv("ARVADOS_KEEP_SERVICES", url)
+       return cmd, nil
 }