16347: Run a dedicated keepstore process for each container.
[arvados.git] / lib / dispatchcloud / worker / runner.go
index 47521213427610a531fa269df32b046b17ba72aa..29c4b8e0a36a3be2a721e1bc509335817e86842c 100644 (file)
@@ -8,9 +8,12 @@ import (
        "bytes"
        "encoding/json"
        "fmt"
+       "net"
+       "strings"
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/crunchrun"
        "github.com/sirupsen/logrus"
 )
 
@@ -19,8 +22,9 @@ import (
 type remoteRunner struct {
        uuid          string
        executor      Executor
-       envJSON       json.RawMessage
+       configJSON    json.RawMessage
        runnerCmd     string
+       runnerArgs    []string
        remoteUser    string
        timeoutTERM   time.Duration
        timeoutSignal time.Duration
@@ -44,23 +48,31 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
        if err := enc.Encode(wkr.instType); err != nil {
                panic(err)
        }
-       env := map[string]string{
+       var configData crunchrun.ConfigData
+       configData.Env = map[string]string{
                "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
                "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
                "InstanceType":      instJSON.String(),
+               "GatewayAddress":    net.JoinHostPort(wkr.instance.Address(), "0"),
+               "GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
        }
        if wkr.wp.arvClient.Insecure {
-               env["ARVADOS_API_HOST_INSECURE"] = "1"
+               configData.Env["ARVADOS_API_HOST_INSECURE"] = "1"
        }
-       envJSON, err := json.Marshal(env)
+       if bufs := wkr.wp.cluster.Containers.LocalKeepBlobBuffersPerVCPU; bufs > 0 {
+               configData.Cluster = wkr.wp.cluster
+               configData.KeepBuffers = bufs * wkr.instType.VCPUs
+       }
+       configJSON, err := json.Marshal(configData)
        if err != nil {
                panic(err)
        }
        rr := &remoteRunner{
                uuid:          uuid,
                executor:      wkr.executor,
-               envJSON:       envJSON,
+               configJSON:    configJSON,
                runnerCmd:     wkr.wp.runnerCmd,
+               runnerArgs:    wkr.wp.runnerArgs,
                remoteUser:    wkr.instance.RemoteUser(),
                timeoutTERM:   wkr.wp.timeoutTERM,
                timeoutSignal: wkr.wp.timeoutSignal,
@@ -78,11 +90,15 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 // assume the remote process _might_ have started, at least until it
 // probes the worker and finds otherwise.
 func (rr *remoteRunner) Start() {
-       cmd := rr.runnerCmd + " --detach --stdin-env '" + rr.uuid + "'"
+       cmd := rr.runnerCmd + " --detach --stdin-config"
+       for _, arg := range rr.runnerArgs {
+               cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
+       }
+       cmd += " '" + rr.uuid + "'"
        if rr.remoteUser != "root" {
                cmd = "sudo " + cmd
        }
-       stdin := bytes.NewBuffer(rr.envJSON)
+       stdin := bytes.NewBuffer(rr.configJSON)
        stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
        if err != nil {
                rr.logger.WithField("stdout", string(stdout)).