+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
- ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
+ ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
}
// NewLogWriter is a factory function to create a new log writer.
networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
- ContainerWait(ctx context.Context, container string) (int64, error)
+ ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
}
// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) {
- return proxy.Docker.ContainerWait(ctx, container)
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
+ return proxy.Docker.ContainerWait(ctx, container, condition)
}
// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
// Get and save the raw JSON container record from the API server
func (runner *ContainerRunner) LogContainerRecord() (err error) {
w := &ArvLogWriter{
- runner.ArvClient,
- runner.Container.UUID,
- "container",
- runner.LogCollection.Open("container.json"),
+ ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID,
+ loggingStream: "container",
+ writeCloser: runner.LogCollection.Open("container.json"),
}
+
// Get Container record JSON from the API Server
reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
if err != nil {
runner.CrunchLog.Print("Attaching container streams")
// If stdin mount is provided, attach it to the docker container
- var stdinRdr keepclient.Reader
+ var stdinRdr arvados.File
var stdinJson []byte
if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
if stdinMnt.Kind == "collection" {
if err != nil {
return nil, fmt.Errorf("While Stat on temp dir: %v", err)
}
- stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+ stdoutPath := filepath.Join(runner.HostOutputDir, subdirs)
err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
if err != nil {
return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
}
}
}
- stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath))
if err != nil {
return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
}
runner.ContainerConfig.Volumes = runner.Volumes
runner.HostConfig = dockercontainer.HostConfig{
- Binds: runner.Binds,
- Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+ Binds: runner.Binds,
LogConfig: dockercontainer.LogConfig{
Type: "none",
},
+ Resources: dockercontainer.Resources{
+ CgroupParent: runner.setCgroupParent,
+ },
}
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() error {
+func (runner *ContainerRunner) WaitFinish() (err error) {
runner.CrunchLog.Print("Waiting for container to finish")
- waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
+ waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+
+ var waitBody dockercontainer.ContainerWaitOKBody
+ select {
+ case waitBody = <-waitOk:
+ case err = <-waitErr:
+ }
+
if err != nil {
return fmt.Errorf("container wait: %v", err)
}
- runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
- code := int(waitDocker)
+ runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+ code := int(waitBody.StatusCode)
runner.ExitCode = &code
waitMount := runner.ArvMountExit
select {
- case err := <-waitMount:
+ case err = <-waitMount:
runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
waitMount = nil
runner.stop()
return fmt.Errorf("While checking host output path: %v", err)
}
+ // Pre-populate output from the configured mount points
+ var binds []string
+ for bind, mnt := range runner.Container.Mounts {
+ if mnt.Kind == "collection" {
+ binds = append(binds, bind)
+ }
+ }
+ sort.Strings(binds)
+
var manifestText string
collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
_, err = os.Stat(collectionMetafile)
if err != nil {
// Regular directory
+
+ // Find symlinks to arv-mounted files & dirs.
+ err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if info.Mode()&os.ModeSymlink == 0 {
+ return nil
+ }
+ // read link to get container internal path
+ // only support 1 level of symlinking here.
+ var tgt string
+ tgt, err = os.Readlink(path)
+ if err != nil {
+ return err
+ }
+
+ // get path relative to output dir
+ outputSuffix := path[len(runner.HostOutputDir):]
+
+ if strings.HasPrefix(tgt, "/") {
+ // go through mounts and try reverse map to collection reference
+ for _, bind := range binds {
+ mnt := runner.Container.Mounts[bind]
+ if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+ // get path relative to bind
+ targetSuffix := tgt[len(bind):]
+
+ // Copy mount and adjust the path to add path relative to the bind
+ adjustedMount := mnt
+ adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
+
+ // get manifest text
+ var m string
+ m, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
+ if err != nil {
+ return err
+ }
+ manifestText = manifestText + m
+ // delete symlink so WriteTree won't try to to dereference it.
+ os.Remove(path)
+ return nil
+ }
+ }
+ }
+
+ // Not a link to a mount. Must be dereferencible and
+ // point into the output directory.
+ tgt, err = filepath.EvalSymlinks(path)
+ if err != nil {
+ os.Remove(path)
+ return err
+ }
+
+ // Symlink target must be within the output directory otherwise it's an error.
+ if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
+ os.Remove(path)
+ return fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.",
+ outputSuffix, tgt)
+ }
+ return nil
+ })
+ if err != nil {
+ return fmt.Errorf("While checking output symlinks: %v", err)
+ }
+
cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
- manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+ var m string
+ m, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+ manifestText = manifestText + m
if err != nil {
return fmt.Errorf("While uploading output files: %v", err)
}
manifestText = rec.ManifestText
}
- // Pre-populate output from the configured mount points
- var binds []string
- for bind, _ := range runner.Container.Mounts {
- binds = append(binds, bind)
- }
- sort.Strings(binds)
-
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
// point, but re-open crunch log with ArvClient in case there are any
// other further (such as failing to write the log to Keep!) while
// shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
- "crunch-run", nil})
+ runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
// NewArvLogWriter creates an ArvLogWriter
func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
- return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
+ return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
+ writeCloser: runner.LogCollection.Open(name + ".txt")}
}
// Run the full container lifecycle.
if err == nil {
err = e
}
+ if runner.finalState == "Complete" {
+ // There was an error in the finalization.
+ runner.finalState = "Cancelled"
+ }
}
// Log the error encountered in Run(), if any
cr.Container.UUID = containerUUID
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+
+ loadLogThrottleParams(api)
+
return cr
}