17170: Add "arvados-client shell" subcommand and backend support.
[arvados.git] / lib / crunchrun / crunchrun.go
index c125b27a5f0783fe757bcf29ac0b62674b68df95..b252e0dce162fca3473dc0a0c64a9f846295e4b9 100644 (file)
@@ -33,6 +33,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "git.arvados.org/arvados.git/sdk/go/manifest"
+       "golang.org/x/crypto/ssh"
        "golang.org/x/net/context"
 
        dockertypes "github.com/docker/docker/api/types"
@@ -178,6 +179,10 @@ type ContainerRunner struct {
        arvMountLog   *ThrottledLogger
 
        containerWatchdogInterval time.Duration
+
+       gatewayAddress    string
+       gatewaySSHConfig  *ssh.ServerConfig
+       gatewayAuthSecret string
 }
 
 // setupSignals sets up signal handling to gracefully terminate the underlying
@@ -455,11 +460,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        }
        for bind := range runner.SecretMounts {
                if _, ok := runner.Container.Mounts[bind]; ok {
-                       return fmt.Errorf("Secret mount %q conflicts with regular mount", bind)
+                       return fmt.Errorf("secret mount %q conflicts with regular mount", bind)
                }
                if runner.SecretMounts[bind].Kind != "json" &&
                        runner.SecretMounts[bind].Kind != "text" {
-                       return fmt.Errorf("Secret mount %q type is %q but only 'json' and 'text' are permitted.",
+                       return fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted",
                                bind, runner.SecretMounts[bind].Kind)
                }
                binds = append(binds, bind)
@@ -474,7 +479,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                if bind == "stdout" || bind == "stderr" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
-                               return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
+                               return fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind)
                        }
 
                        // Does path start with OutputPath?
@@ -490,7 +495,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                if bind == "stdin" {
                        // Is it a "collection" mount kind?
                        if mnt.Kind != "collection" && mnt.Kind != "json" {
-                               return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
+                               return fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind)
                        }
                }
 
@@ -500,7 +505,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
                if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
                        if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" {
-                               return fmt.Errorf("Only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
+                               return fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
                        }
                }
 
@@ -508,17 +513,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                case mnt.Kind == "collection" && bind != "stdin":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
-                               return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
+                               return fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
                        }
                        if mnt.UUID != "" {
                                if mnt.Writable {
-                                       return fmt.Errorf("Writing to existing collections currently not permitted.")
+                                       return fmt.Errorf("writing to existing collections currently not permitted")
                                }
                                pdhOnly = false
                                src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
                        } else if mnt.PortableDataHash != "" {
                                if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
-                                       return fmt.Errorf("Can never write to a collection specified by portable data hash")
+                                       return fmt.Errorf("can never write to a collection specified by portable data hash")
                                }
                                idx := strings.Index(mnt.PortableDataHash, "/")
                                if idx > 0 {
@@ -539,7 +544,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
                                arvMountCmd = append(arvMountCmd, "--mount-tmp")
                                arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
-                               tmpcount += 1
+                               tmpcount++
                        }
                        if mnt.Writable {
                                if bind == runner.Container.OutputPath {
@@ -559,15 +564,15 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        var tmpdir string
                        tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
                        if err != nil {
-                               return fmt.Errorf("While creating mount temp dir: %v", err)
+                               return fmt.Errorf("while creating mount temp dir: %v", err)
                        }
                        st, staterr := os.Stat(tmpdir)
                        if staterr != nil {
-                               return fmt.Errorf("While Stat on temp dir: %v", staterr)
+                               return fmt.Errorf("while Stat on temp dir: %v", staterr)
                        }
                        err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
                        if staterr != nil {
-                               return fmt.Errorf("While Chmod temp dir: %v", err)
+                               return fmt.Errorf("while Chmod temp dir: %v", err)
                        }
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
                        if bind == runner.Container.OutputPath {
@@ -618,7 +623,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        }
 
        if runner.HostOutputDir == "" {
-               return fmt.Errorf("Output path does not correspond to a writable mount point")
+               return fmt.Errorf("output path does not correspond to a writable mount point")
        }
 
        if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
@@ -640,20 +645,20 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
        if err != nil {
-               return fmt.Errorf("While trying to start arv-mount: %v", err)
+               return fmt.Errorf("while trying to start arv-mount: %v", err)
        }
 
        for _, p := range collectionPaths {
                _, err = os.Stat(p)
                if err != nil {
-                       return fmt.Errorf("While checking that input files exist: %v", err)
+                       return fmt.Errorf("while checking that input files exist: %v", err)
                }
        }
 
        for _, cp := range copyFiles {
                st, err := os.Stat(cp.src)
                if err != nil {
-                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+                       return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
                }
                if st.IsDir() {
                        err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
@@ -674,7 +679,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                        }
                                        return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
                                } else {
-                                       return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
+                                       return fmt.Errorf("source %q is not a regular file or directory", cp.src)
                                }
                        })
                } else if st.Mode().IsRegular() {
@@ -684,7 +689,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                }
                if err != nil {
-                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+                       return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
                }
        }
 
@@ -944,15 +949,15 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
 
        // If stdin mount is provided, attach it to the docker container
        var stdinRdr arvados.File
-       var stdinJson []byte
+       var stdinJSON []byte
        if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
                if stdinMnt.Kind == "collection" {
                        var stdinColl arvados.Collection
-                       collId := stdinMnt.UUID
-                       if collId == "" {
-                               collId = stdinMnt.PortableDataHash
+                       collID := stdinMnt.UUID
+                       if collID == "" {
+                               collID = stdinMnt.PortableDataHash
                        }
-                       err = runner.ContainerArvClient.Get("collections", collId, nil, &stdinColl)
+                       err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl)
                        if err != nil {
                                return fmt.Errorf("While getting stdin collection: %v", err)
                        }
@@ -966,14 +971,14 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                                return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
                        }
                } else if stdinMnt.Kind == "json" {
-                       stdinJson, err = json.Marshal(stdinMnt.Content)
+                       stdinJSON, err = json.Marshal(stdinMnt.Content)
                        if err != nil {
                                return fmt.Errorf("While encoding stdin json data: %v", err)
                        }
                }
        }
 
-       stdinUsed := stdinRdr != nil || len(stdinJson) != 0
+       stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
        response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
                dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
@@ -1016,9 +1021,9 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                        stdinRdr.Close()
                        response.CloseWrite()
                }()
-       } else if len(stdinJson) != 0 {
+       } else if len(stdinJSON) != 0 {
                go func() {
-                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON))
                        if err != nil {
                                runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
                                runner.stop(nil)
@@ -1469,7 +1474,7 @@ func (runner *ContainerRunner) UpdateContainerRunning() error {
                return ErrCancelled
        }
        return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
-               arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
+               arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gatewayAddress}}, nil)
 }
 
 // ContainerToken returns the api_token the container (and any
@@ -1814,18 +1819,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                }
        }
 
-       containerId := flags.Arg(0)
+       containerID := flags.Arg(0)
 
        switch {
        case *detach && !ignoreDetachFlag:
-               return Detach(containerId, prog, args, os.Stdout, os.Stderr)
+               return Detach(containerID, prog, args, os.Stdout, os.Stderr)
        case *kill >= 0:
-               return KillProcess(containerId, syscall.Signal(*kill), os.Stdout, os.Stderr)
+               return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr)
        case *list:
                return ListProcesses(os.Stdout, os.Stderr)
        }
 
-       if containerId == "" {
+       if containerID == "" {
                log.Printf("usage: %s [options] UUID", prog)
                return 1
        }
@@ -1839,14 +1844,14 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               log.Printf("%s: %v", containerId, err)
+               log.Printf("%s: %v", containerID, err)
                return 1
        }
        api.Retries = 8
 
        kc, kcerr := keepclient.MakeKeepClient(api)
        if kcerr != nil {
-               log.Printf("%s: %v", containerId, kcerr)
+               log.Printf("%s: %v", containerID, kcerr)
                return 1
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
@@ -1856,21 +1861,29 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        // minimum version we want to support.
        docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
 
-       cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
+       cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
        if err != nil {
                log.Print(err)
                return 1
        }
        if dockererr != nil {
-               cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+               cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
                cr.checkBrokenNode(dockererr)
                cr.CrunchLog.Close()
                return 1
        }
 
-       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
+       cr.gatewayAuthSecret = os.Getenv("GatewayAuthSecret")
+       os.Unsetenv("GatewayAuthSecret")
+       err = cr.startGatewayServer()
+       if err != nil {
+               log.Printf("error starting gateway server: %s", err)
+               return 1
+       }
+
+       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".")
        if tmperr != nil {
-               log.Printf("%s: %v", containerId, tmperr)
+               log.Printf("%s: %v", containerID, tmperr)
                return 1
        }
 
@@ -1904,7 +1917,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
 
        if runerr != nil {
-               log.Printf("%s: %v", containerId, runerr)
+               log.Printf("%s: %v", containerID, runerr)
                return 1
        }
        return 0