+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
- ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
+ ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
}
// NewLogWriter is a factory function to create a new log writer.
networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
- ContainerWait(ctx context.Context, container string) (int64, error)
+ ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
}
// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) {
- return proxy.Docker.ContainerWait(ctx, container)
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
+ return proxy.Docker.ContainerWait(ctx, container, condition)
}
// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
loggingDone chan bool
CrunchLog *ThrottledLogger
Stdout io.WriteCloser
- Stderr *ThrottledLogger
+ Stderr io.WriteCloser
LogCollection *CollectionWriter
LogsPDH *string
RunArvMount
HostOutputDir string
CleanupTempDir []string
Binds []string
+ Volumes map[string]struct{}
OutputPDH *string
SigChan chan os.Signal
ArvMountExit chan error
collectionPaths := []string{}
runner.Binds = nil
+ runner.Volumes = make(map[string]struct{})
needCertMount := true
var binds []string
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
- if bind == "stdout" {
+ if bind == "stdout" || bind == "stderr" {
// Is it a "file" mount kind?
if mnt.Kind != "file" {
- return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+ return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
}
// Does path start with OutputPath?
prefix += "/"
}
if !strings.HasPrefix(mnt.Path, prefix) {
- return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+ return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
}
}
}
switch {
- case mnt.Kind == "collection":
+ case mnt.Kind == "collection" && bind != "stdin":
var src string
if mnt.UUID != "" && mnt.PortableDataHash != "" {
return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
}
collectionPaths = append(collectionPaths, src)
- case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
- runner.HostOutputDir, err = runner.MkTempDir("", "")
+ case mnt.Kind == "tmp":
+ var tmpdir string
+ tmpdir, err = runner.MkTempDir("", "")
if err != nil {
return fmt.Errorf("While creating mount temp dir: %v", err)
}
- st, staterr := os.Stat(runner.HostOutputDir)
+ st, staterr := os.Stat(tmpdir)
if staterr != nil {
return fmt.Errorf("While Stat on temp dir: %v", staterr)
}
- err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
+ err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
if staterr != nil {
return fmt.Errorf("While Chmod temp dir: %v", err)
}
- runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
- runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
-
- case mnt.Kind == "tmp":
- runner.Binds = append(runner.Binds, bind)
+ runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
+ runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
+ if bind == runner.Container.OutputPath {
+ runner.HostOutputDir = tmpdir
+ }
case mnt.Kind == "json":
jsondata, err := json.Marshal(mnt.Content)
// Get and save the raw JSON container record from the API server
func (runner *ContainerRunner) LogContainerRecord() (err error) {
w := &ArvLogWriter{
- runner.ArvClient,
- runner.Container.UUID,
- "container",
- runner.LogCollection.Open("container.json"),
+ ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID,
+ loggingStream: "container",
+ writeCloser: runner.LogCollection.Open("container.json"),
}
+
// Get Container record JSON from the API Server
reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
if err != nil {
return nil
}
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
func (runner *ContainerRunner) AttachStreams() (err error) {
runner.CrunchLog.Print("Attaching container streams")
// If stdin mount is provided, attach it to the docker container
- var stdinRdr keepclient.Reader
+ var stdinRdr arvados.File
var stdinJson []byte
if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
if stdinMnt.Kind == "collection" {
} else if err != nil {
return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
}
-
- defer stdinRdr.Close()
} else if stdinMnt.Kind == "json" {
stdinJson, err = json.Marshal(stdinMnt.Content)
if err != nil {
runner.loggingDone = make(chan bool)
if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
- stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
- index := strings.LastIndex(stdoutPath, "/")
- if index > 0 {
- subdirs := stdoutPath[:index]
- if subdirs != "" {
- st, err := os.Stat(runner.HostOutputDir)
- if err != nil {
- return fmt.Errorf("While Stat on temp dir: %v", err)
- }
- stdoutPath := path.Join(runner.HostOutputDir, subdirs)
- err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
- if err != nil {
- return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
- }
- }
- }
- stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
if err != nil {
- return fmt.Errorf("While creating stdout file: %v", err)
+ return err
}
runner.Stdout = stdoutFile
} else {
runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
}
- runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+
+ if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+ stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+ if err != nil {
+ return err
+ }
+ runner.Stderr = stderrFile
+ } else {
+ runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+ }
if stdinRdr != nil {
go func() {
runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
runner.stop()
}
- response.Conn.Close()
+ stdinRdr.Close()
+ response.CloseWrite()
}()
} else if len(stdinJson) != 0 {
go func() {
runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
runner.stop()
}
- response.Conn.Close()
+ response.CloseWrite()
}()
}
return nil
}
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+ stdoutPath := mntPath[len(runner.Container.OutputPath):]
+ index := strings.LastIndex(stdoutPath, "/")
+ if index > 0 {
+ subdirs := stdoutPath[:index]
+ if subdirs != "" {
+ st, err := os.Stat(runner.HostOutputDir)
+ if err != nil {
+ return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+ }
+ stdoutPath := filepath.Join(runner.HostOutputDir, subdirs)
+ err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+ if err != nil {
+ return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+ }
+ }
+ }
+ stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath))
+ if err != nil {
+ return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+ }
+
+ return stdoutFile, nil
+}
+
// CreateContainer creates the docker container.
func (runner *ContainerRunner) CreateContainer() error {
runner.CrunchLog.Print("Creating Docker container")
runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
+ runner.ContainerConfig.Volumes = runner.Volumes
+
runner.HostConfig = dockercontainer.HostConfig{
- Binds: runner.Binds,
- Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+ Binds: runner.Binds,
LogConfig: dockercontainer.LogConfig{
Type: "none",
},
+ Resources: dockercontainer.Resources{
+ CgroupParent: runner.setCgroupParent,
+ },
}
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
}
}
+ _, stdinUsed := runner.Container.Mounts["stdin"]
+ runner.ContainerConfig.OpenStdin = stdinUsed
+ runner.ContainerConfig.StdinOnce = stdinUsed
+ runner.ContainerConfig.AttachStdin = stdinUsed
+ runner.ContainerConfig.AttachStdout = true
+ runner.ContainerConfig.AttachStderr = true
+
createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() error {
+func (runner *ContainerRunner) WaitFinish() (err error) {
runner.CrunchLog.Print("Waiting for container to finish")
- waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
+ waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+
+ var waitBody dockercontainer.ContainerWaitOKBody
+ select {
+ case waitBody = <-waitOk:
+ case err = <-waitErr:
+ }
+
if err != nil {
return fmt.Errorf("container wait: %v", err)
}
- runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
- code := int(waitDocker)
+ runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+ code := int(waitBody.StatusCode)
runner.ExitCode = &code
waitMount := runner.ArvMountExit
select {
- case err := <-waitMount:
+ case err = <-waitMount:
runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
waitMount = nil
runner.stop()
return fmt.Errorf("While checking host output path: %v", err)
}
+ // Pre-populate output from the configured mount points
+ var binds []string
+ for bind, mnt := range runner.Container.Mounts {
+ if mnt.Kind == "collection" {
+ binds = append(binds, bind)
+ }
+ }
+ sort.Strings(binds)
+
var manifestText string
collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
_, err = os.Stat(collectionMetafile)
if err != nil {
// Regular directory
+
+ // Find symlinks to arv-mounted files & dirs.
+ err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if info.Mode()&os.ModeSymlink == 0 {
+ return nil
+ }
+ // read link to get container internal path
+ // only support 1 level of symlinking here.
+ var tgt string
+ tgt, err = os.Readlink(path)
+ if err != nil {
+ return err
+ }
+
+ // get path relative to output dir
+ outputSuffix := path[len(runner.HostOutputDir):]
+
+ if strings.HasPrefix(tgt, "/") {
+ // go through mounts and try reverse map to collection reference
+ for _, bind := range binds {
+ mnt := runner.Container.Mounts[bind]
+ if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+ // get path relative to bind
+ targetSuffix := tgt[len(bind):]
+
+ // Copy mount and adjust the path to add path relative to the bind
+ adjustedMount := mnt
+ adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
+
+ // get manifest text
+ var m string
+ m, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
+ if err != nil {
+ return err
+ }
+ manifestText = manifestText + m
+ // delete symlink so WriteTree won't try to to dereference it.
+ os.Remove(path)
+ return nil
+ }
+ }
+ }
+
+ // Not a link to a mount. Must be dereferencible and
+ // point into the output directory.
+ tgt, err = filepath.EvalSymlinks(path)
+ if err != nil {
+ os.Remove(path)
+ return err
+ }
+
+ // Symlink target must be within the output directory otherwise it's an error.
+ if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
+ os.Remove(path)
+ return fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.",
+ outputSuffix, tgt)
+ }
+ return nil
+ })
+ if err != nil {
+ return fmt.Errorf("While checking output symlinks: %v", err)
+ }
+
cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
- manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+ var m string
+ m, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+ manifestText = manifestText + m
if err != nil {
return fmt.Errorf("While uploading output files: %v", err)
}
manifestText = rec.ManifestText
}
- // Pre-populate output from the configured mount points
- var binds []string
- for bind, _ := range runner.Container.Mounts {
- binds = append(binds, bind)
- }
- sort.Strings(binds)
-
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
// point, but re-open crunch log with ArvClient in case there are any
// other further (such as failing to write the log to Keep!) while
// shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
- "crunch-run", nil})
+ runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
// NewArvLogWriter creates an ArvLogWriter
func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
- return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
+ return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
+ writeCloser: runner.LogCollection.Open(name + ".txt")}
}
// Run the full container lifecycle.
if err == nil {
err = e
}
+ if runner.finalState == "Complete" {
+ // There was an error in the finalization.
+ runner.finalState = "Cancelled"
+ }
}
// Log the error encountered in Run(), if any
cr.Container.UUID = containerUUID
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+
+ loadLogThrottleParams(api)
+
return cr
}