X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8991b43990aa7a77edd78f165114b93a6a207985..f30c8ed35e3e1ad7cb3cb51fc6d83f56a04ae8de:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index c125b27a5f..7d6fb4ed47 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -178,6 +178,8 @@ type ContainerRunner struct { arvMountLog *ThrottledLogger containerWatchdogInterval time.Duration + + gateway Gateway } // setupSignals sets up signal handling to gracefully terminate the underlying @@ -455,11 +457,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } for bind := range runner.SecretMounts { if _, ok := runner.Container.Mounts[bind]; ok { - return fmt.Errorf("Secret mount %q conflicts with regular mount", bind) + return fmt.Errorf("secret mount %q conflicts with regular mount", bind) } if runner.SecretMounts[bind].Kind != "json" && runner.SecretMounts[bind].Kind != "text" { - return fmt.Errorf("Secret mount %q type is %q but only 'json' and 'text' are permitted.", + return fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted", bind, runner.SecretMounts[bind].Kind) } binds = append(binds, bind) @@ -474,7 +476,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if bind == "stdout" || bind == "stderr" { // Is it a "file" mount kind? if mnt.Kind != "file" { - return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind) + return fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind) } // Does path start with OutputPath? @@ -490,7 +492,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if bind == "stdin" { // Is it a "collection" mount kind? if mnt.Kind != "collection" && mnt.Kind != "json" { - return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind) + return fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind) } } @@ -500,7 +502,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" { if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" { - return fmt.Errorf("Only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind) + return fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind) } } @@ -508,17 +510,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) { case mnt.Kind == "collection" && bind != "stdin": var src string if mnt.UUID != "" && mnt.PortableDataHash != "" { - return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") + return fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") } if mnt.UUID != "" { if mnt.Writable { - return fmt.Errorf("Writing to existing collections currently not permitted.") + return fmt.Errorf("writing to existing collections currently not permitted") } pdhOnly = false src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID) } else if mnt.PortableDataHash != "" { if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") { - return fmt.Errorf("Can never write to a collection specified by portable data hash") + return fmt.Errorf("can never write to a collection specified by portable data hash") } idx := strings.Index(mnt.PortableDataHash, "/") if idx > 0 { @@ -539,7 +541,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount) arvMountCmd = append(arvMountCmd, "--mount-tmp") arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount)) - tmpcount += 1 + tmpcount++ } if mnt.Writable { if bind == runner.Container.OutputPath { @@ -559,15 +561,15 @@ func (runner *ContainerRunner) SetupMounts() (err error) { var tmpdir string tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp") if err != nil { - return fmt.Errorf("While creating mount temp dir: %v", err) + return fmt.Errorf("while creating mount temp dir: %v", err) } st, staterr := os.Stat(tmpdir) if staterr != nil { - return fmt.Errorf("While Stat on temp dir: %v", staterr) + return fmt.Errorf("while Stat on temp dir: %v", staterr) } err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777) if staterr != nil { - return fmt.Errorf("While Chmod temp dir: %v", err) + return fmt.Errorf("while Chmod temp dir: %v", err) } runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind)) if bind == runner.Container.OutputPath { @@ -618,10 +620,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } if runner.HostOutputDir == "" { - return fmt.Errorf("Output path does not correspond to a writable mount point") + return fmt.Errorf("output path does not correspond to a writable mount point") } - if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI { + if needCertMount && runner.Container.RuntimeConstraints.API { for _, certfile := range arvadosclient.CertFiles { _, err := os.Stat(certfile) if err == nil { @@ -640,20 +642,20 @@ func (runner *ContainerRunner) SetupMounts() (err error) { runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token) if err != nil { - return fmt.Errorf("While trying to start arv-mount: %v", err) + return fmt.Errorf("while trying to start arv-mount: %v", err) } for _, p := range collectionPaths { _, err = os.Stat(p) if err != nil { - return fmt.Errorf("While checking that input files exist: %v", err) + return fmt.Errorf("while checking that input files exist: %v", err) } } for _, cp := range copyFiles { st, err := os.Stat(cp.src) if err != nil { - return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err) + return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) } if st.IsDir() { err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error { @@ -674,7 +676,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777) } else { - return fmt.Errorf("Source %q is not a regular file or directory", cp.src) + return fmt.Errorf("source %q is not a regular file or directory", cp.src) } }) } else if st.Mode().IsRegular() { @@ -684,7 +686,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } } if err != nil { - return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err) + return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) } } @@ -944,15 +946,15 @@ func (runner *ContainerRunner) AttachStreams() (err error) { // If stdin mount is provided, attach it to the docker container var stdinRdr arvados.File - var stdinJson []byte + var stdinJSON []byte if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok { if stdinMnt.Kind == "collection" { var stdinColl arvados.Collection - collId := stdinMnt.UUID - if collId == "" { - collId = stdinMnt.PortableDataHash + collID := stdinMnt.UUID + if collID == "" { + collID = stdinMnt.PortableDataHash } - err = runner.ContainerArvClient.Get("collections", collId, nil, &stdinColl) + err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl) if err != nil { return fmt.Errorf("While getting stdin collection: %v", err) } @@ -966,14 +968,14 @@ func (runner *ContainerRunner) AttachStreams() (err error) { return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err) } } else if stdinMnt.Kind == "json" { - stdinJson, err = json.Marshal(stdinMnt.Content) + stdinJSON, err = json.Marshal(stdinMnt.Content) if err != nil { return fmt.Errorf("While encoding stdin json data: %v", err) } } } - stdinUsed := stdinRdr != nil || len(stdinJson) != 0 + stdinUsed := stdinRdr != nil || len(stdinJSON) != 0 response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID, dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true}) if err != nil { @@ -1016,9 +1018,9 @@ func (runner *ContainerRunner) AttachStreams() (err error) { stdinRdr.Close() response.CloseWrite() }() - } else if len(stdinJson) != 0 { + } else if len(stdinJSON) != 0 { go func() { - _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson)) + _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON)) if err != nil { runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err) runner.stop(nil) @@ -1092,7 +1094,7 @@ func (runner *ContainerRunner) CreateContainer() error { }, } - if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { + if runner.Container.RuntimeConstraints.API { tok, err := runner.ContainerToken() if err != nil { return err @@ -1269,7 +1271,7 @@ func (runner *ContainerRunner) updateLogs() { // CaptureOutput saves data from the container's output directory if // needed, and updates the container output accordingly. func (runner *ContainerRunner) CaptureOutput() error { - if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { + if runner.Container.RuntimeConstraints.API { // Output may have been set directly by the container, so // refresh the container record to check. err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID, @@ -1431,15 +1433,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C // Already finalized. return } - mt, err := runner.LogCollection.MarshalManifest(".") - if err != nil { - err = fmt.Errorf("error creating log manifest: %v", err) - return - } updates := arvadosclient.Dict{ - "name": "logs for " + runner.Container.UUID, - "manifest_text": mt, + "name": "logs for " + runner.Container.UUID, } + mt, err1 := runner.LogCollection.MarshalManifest(".") + if err1 == nil { + // Only send updated manifest text if there was no + // error. + updates["manifest_text"] = mt + } + + // Even if flushing the manifest had an error, we still want + // to update the log record, if possible, to push the trash_at + // and delete_at times into the future. Details on bug + // #17293. if final { updates["is_trashed"] = true } else { @@ -1448,16 +1455,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C updates["delete_at"] = exp } reqBody := arvadosclient.Dict{"collection": updates} + var err2 error if runner.logUUID == "" { reqBody["ensure_unique_name"] = true - err = runner.DispatcherArvClient.Create("collections", reqBody, &response) + err2 = runner.DispatcherArvClient.Create("collections", reqBody, &response) } else { - err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response) + err2 = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response) } - if err != nil { - return + if err2 == nil { + runner.logUUID = response.UUID + } + + if err1 != nil || err2 != nil { + err = fmt.Errorf("error recording logs: %q, %q", err1, err2) } - runner.logUUID = response.UUID return } @@ -1469,7 +1480,7 @@ func (runner *ContainerRunner) UpdateContainerRunning() error { return ErrCancelled } return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, - arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil) + arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil) } // ContainerToken returns the api_token the container (and any @@ -1814,18 +1825,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } } - containerId := flags.Arg(0) + containerID := flags.Arg(0) switch { case *detach && !ignoreDetachFlag: - return Detach(containerId, prog, args, os.Stdout, os.Stderr) + return Detach(containerID, prog, args, os.Stdout, os.Stderr) case *kill >= 0: - return KillProcess(containerId, syscall.Signal(*kill), os.Stdout, os.Stderr) + return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr) case *list: return ListProcesses(os.Stdout, os.Stderr) } - if containerId == "" { + if containerID == "" { log.Printf("usage: %s [options] UUID", prog) return 1 } @@ -1839,14 +1850,14 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s api, err := arvadosclient.MakeArvadosClient() if err != nil { - log.Printf("%s: %v", containerId, err) + log.Printf("%s: %v", containerID, err) return 1 } api.Retries = 8 kc, kcerr := keepclient.MakeKeepClient(api) if kcerr != nil { - log.Printf("%s: %v", containerId, kcerr) + log.Printf("%s: %v", containerID, kcerr) return 1 } kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} @@ -1856,21 +1867,35 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s // minimum version we want to support. docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) - cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId) + cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID) if err != nil { log.Print(err) return 1 } if dockererr != nil { - cr.CrunchLog.Printf("%s: %v", containerId, dockererr) + cr.CrunchLog.Printf("%s: %v", containerID, dockererr) cr.checkBrokenNode(dockererr) cr.CrunchLog.Close() return 1 } - parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".") + cr.gateway = Gateway{ + Address: os.Getenv("GatewayAddress"), + AuthSecret: os.Getenv("GatewayAuthSecret"), + ContainerUUID: containerID, + DockerContainerID: &cr.ContainerID, + Log: cr.CrunchLog, + } + os.Unsetenv("GatewayAuthSecret") + err = cr.gateway.Start() + if err != nil { + log.Printf("error starting gateway server: %s", err) + return 1 + } + + parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".") if tmperr != nil { - log.Printf("%s: %v", containerId, tmperr) + log.Printf("%s: %v", containerID, tmperr) return 1 } @@ -1904,7 +1929,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } if runerr != nil { - log.Printf("%s: %v", containerId, runerr) + log.Printf("%s: %v", containerID, runerr) return 1 } return 0