17170: Merge branch 'master'
[arvados.git] / lib / crunchrun / crunchrun.go
index 3a4f3a102b86d8adb1be71d41d85e8b1723f053e..7d6fb4ed47bef547f4eb3cb1728163c77021bf02 100644 (file)
@@ -178,6 +178,8 @@ type ContainerRunner struct {
        arvMountLog   *ThrottledLogger
 
        containerWatchdogInterval time.Duration
+
+       gateway Gateway
 }
 
 // setupSignals sets up signal handling to gracefully terminate the underlying
@@ -539,7 +541,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
                                arvMountCmd = append(arvMountCmd, "--mount-tmp")
                                arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
-                               tmpcount += 1
+                               tmpcount++
                        }
                        if mnt.Writable {
                                if bind == runner.Container.OutputPath {
@@ -621,7 +623,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                return fmt.Errorf("output path does not correspond to a writable mount point")
        }
 
-       if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
+       if needCertMount && runner.Container.RuntimeConstraints.API {
                for _, certfile := range arvadosclient.CertFiles {
                        _, err := os.Stat(certfile)
                        if err == nil {
@@ -944,15 +946,15 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
 
        // If stdin mount is provided, attach it to the docker container
        var stdinRdr arvados.File
-       var stdinJson []byte
+       var stdinJSON []byte
        if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
                if stdinMnt.Kind == "collection" {
                        var stdinColl arvados.Collection
-                       collId := stdinMnt.UUID
-                       if collId == "" {
-                               collId = stdinMnt.PortableDataHash
+                       collID := stdinMnt.UUID
+                       if collID == "" {
+                               collID = stdinMnt.PortableDataHash
                        }
-                       err = runner.ContainerArvClient.Get("collections", collId, nil, &stdinColl)
+                       err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl)
                        if err != nil {
                                return fmt.Errorf("While getting stdin collection: %v", err)
                        }
@@ -966,14 +968,14 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                                return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
                        }
                } else if stdinMnt.Kind == "json" {
-                       stdinJson, err = json.Marshal(stdinMnt.Content)
+                       stdinJSON, err = json.Marshal(stdinMnt.Content)
                        if err != nil {
                                return fmt.Errorf("While encoding stdin json data: %v", err)
                        }
                }
        }
 
-       stdinUsed := stdinRdr != nil || len(stdinJson) != 0
+       stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
        response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
                dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
@@ -1016,9 +1018,9 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                        stdinRdr.Close()
                        response.CloseWrite()
                }()
-       } else if len(stdinJson) != 0 {
+       } else if len(stdinJSON) != 0 {
                go func() {
-                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON))
                        if err != nil {
                                runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
                                runner.stop(nil)
@@ -1092,7 +1094,7 @@ func (runner *ContainerRunner) CreateContainer() error {
                },
        }
 
-       if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+       if runner.Container.RuntimeConstraints.API {
                tok, err := runner.ContainerToken()
                if err != nil {
                        return err
@@ -1269,7 +1271,7 @@ func (runner *ContainerRunner) updateLogs() {
 // CaptureOutput saves data from the container's output directory if
 // needed, and updates the container output accordingly.
 func (runner *ContainerRunner) CaptureOutput() error {
-       if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+       if runner.Container.RuntimeConstraints.API {
                // Output may have been set directly by the container, so
                // refresh the container record to check.
                err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
@@ -1431,15 +1433,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
                // Already finalized.
                return
        }
-       mt, err := runner.LogCollection.MarshalManifest(".")
-       if err != nil {
-               err = fmt.Errorf("error creating log manifest: %v", err)
-               return
-       }
        updates := arvadosclient.Dict{
-               "name":          "logs for " + runner.Container.UUID,
-               "manifest_text": mt,
+               "name": "logs for " + runner.Container.UUID,
        }
+       mt, err1 := runner.LogCollection.MarshalManifest(".")
+       if err1 == nil {
+               // Only send updated manifest text if there was no
+               // error.
+               updates["manifest_text"] = mt
+       }
+
+       // Even if flushing the manifest had an error, we still want
+       // to update the log record, if possible, to push the trash_at
+       // and delete_at times into the future.  Details on bug
+       // #17293.
        if final {
                updates["is_trashed"] = true
        } else {
@@ -1448,16 +1455,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
                updates["delete_at"] = exp
        }
        reqBody := arvadosclient.Dict{"collection": updates}
+       var err2 error
        if runner.logUUID == "" {
                reqBody["ensure_unique_name"] = true
-               err = runner.DispatcherArvClient.Create("collections", reqBody, &response)
+               err2 = runner.DispatcherArvClient.Create("collections", reqBody, &response)
        } else {
-               err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
+               err2 = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
        }
-       if err != nil {
-               return
+       if err2 == nil {
+               runner.logUUID = response.UUID
+       }
+
+       if err1 != nil || err2 != nil {
+               err = fmt.Errorf("error recording logs: %q, %q", err1, err2)
        }
-       runner.logUUID = response.UUID
        return
 }
 
@@ -1469,7 +1480,7 @@ func (runner *ContainerRunner) UpdateContainerRunning() error {
                return ErrCancelled
        }
        return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
-               arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
+               arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil)
 }
 
 // ContainerToken returns the api_token the container (and any
@@ -1814,18 +1825,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                }
        }
 
-       containerId := flags.Arg(0)
+       containerID := flags.Arg(0)
 
        switch {
        case *detach && !ignoreDetachFlag:
-               return Detach(containerId, prog, args, os.Stdout, os.Stderr)
+               return Detach(containerID, prog, args, os.Stdout, os.Stderr)
        case *kill >= 0:
-               return KillProcess(containerId, syscall.Signal(*kill), os.Stdout, os.Stderr)
+               return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr)
        case *list:
                return ListProcesses(os.Stdout, os.Stderr)
        }
 
-       if containerId == "" {
+       if containerID == "" {
                log.Printf("usage: %s [options] UUID", prog)
                return 1
        }
@@ -1839,14 +1850,14 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               log.Printf("%s: %v", containerId, err)
+               log.Printf("%s: %v", containerID, err)
                return 1
        }
        api.Retries = 8
 
        kc, kcerr := keepclient.MakeKeepClient(api)
        if kcerr != nil {
-               log.Printf("%s: %v", containerId, kcerr)
+               log.Printf("%s: %v", containerID, kcerr)
                return 1
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
@@ -1856,21 +1867,35 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        // minimum version we want to support.
        docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
 
-       cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
+       cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
        if err != nil {
                log.Print(err)
                return 1
        }
        if dockererr != nil {
-               cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+               cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
                cr.checkBrokenNode(dockererr)
                cr.CrunchLog.Close()
                return 1
        }
 
-       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
+       cr.gateway = Gateway{
+               Address:           os.Getenv("GatewayAddress"),
+               AuthSecret:        os.Getenv("GatewayAuthSecret"),
+               ContainerUUID:     containerID,
+               DockerContainerID: &cr.ContainerID,
+               Log:               cr.CrunchLog,
+       }
+       os.Unsetenv("GatewayAuthSecret")
+       err = cr.gateway.Start()
+       if err != nil {
+               log.Printf("error starting gateway server: %s", err)
+               return 1
+       }
+
+       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".")
        if tmperr != nil {
-               log.Printf("%s: %v", containerId, tmperr)
+               log.Printf("%s: %v", containerID, tmperr)
                return 1
        }
 
@@ -1904,7 +1929,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
 
        if runerr != nil {
-               log.Printf("%s: %v", containerId, runerr)
+               log.Printf("%s: %v", containerID, runerr)
                return 1
        }
        return 0