+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"os/signal"
"path"
"path/filepath"
+ "runtime"
+ "runtime/pprof"
"sort"
"strings"
"sync"
// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
- ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
+ ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
+ ClearBlockCache()
}
// NewLogWriter is a factory function to create a new log writer.
networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
- ContainerWait(ctx context.Context, container string) (int64, error)
+ ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
}
// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) {
- return proxy.Docker.ContainerWait(ctx, container)
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
+ return proxy.Docker.ContainerWait(ctx, container, condition)
}
// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
HostOutputDir string
CleanupTempDir []string
Binds []string
+ Volumes map[string]struct{}
OutputPDH *string
SigChan chan os.Signal
ArvMountExit chan error
networkMode string // passed through to HostConfig.NetworkMode
}
-// SetupSignals sets up signal handling to gracefully terminate the underlying
+// setupSignals sets up signal handling to gracefully terminate the underlying
// Docker container and update state when receiving a TERM, INT or QUIT signal.
-func (runner *ContainerRunner) SetupSignals() {
+func (runner *ContainerRunner) setupSignals() {
runner.SigChan = make(chan os.Signal, 1)
signal.Notify(runner.SigChan, syscall.SIGTERM)
signal.Notify(runner.SigChan, syscall.SIGINT)
go func(sig chan os.Signal) {
<-sig
runner.stop()
- signal.Stop(sig)
}(runner.SigChan)
}
}
}
+func (runner *ContainerRunner) teardown() {
+ if runner.SigChan != nil {
+ signal.Stop(runner.SigChan)
+ close(runner.SigChan)
+ }
+}
+
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
- response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, false)
+ response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
if err != nil {
return fmt.Errorf("While loading container image into Docker: %v", err)
}
- response.Body.Close()
+
+ defer response.Body.Close()
+ rbody, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ return fmt.Errorf("Reading response to image load: %v", err)
+ }
+ runner.CrunchLog.Printf("Docker response: %s", rbody)
} else {
runner.CrunchLog.Print("Docker image is available")
}
runner.ContainerConfig.Image = imageID
+ runner.Kc.ClearBlockCache()
+
return nil
}
collectionPaths := []string{}
runner.Binds = nil
+ runner.Volumes = make(map[string]struct{})
needCertMount := true
var binds []string
- for bind, _ := range runner.Container.Mounts {
+ for bind := range runner.Container.Mounts {
binds = append(binds, bind)
}
sort.Strings(binds)
}
collectionPaths = append(collectionPaths, src)
- case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
- runner.HostOutputDir, err = runner.MkTempDir("", "")
+ case mnt.Kind == "tmp":
+ var tmpdir string
+ tmpdir, err = runner.MkTempDir("", "")
if err != nil {
return fmt.Errorf("While creating mount temp dir: %v", err)
}
- st, staterr := os.Stat(runner.HostOutputDir)
+ st, staterr := os.Stat(tmpdir)
if staterr != nil {
return fmt.Errorf("While Stat on temp dir: %v", staterr)
}
- err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
+ err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
if staterr != nil {
return fmt.Errorf("While Chmod temp dir: %v", err)
}
- runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
- runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
-
- case mnt.Kind == "tmp":
- runner.Binds = append(runner.Binds, bind)
+ runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
+ runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
+ if bind == runner.Container.OutputPath {
+ runner.HostOutputDir = tmpdir
+ }
case mnt.Kind == "json":
jsondata, err := json.Marshal(mnt.Content)
logger := log.New(w, "node-info", 0)
commands := []infoCommand{
- infoCommand{
+ {
label: "Host Information",
cmd: []string{"uname", "-a"},
},
- infoCommand{
+ {
label: "CPU Information",
cmd: []string{"cat", "/proc/cpuinfo"},
},
- infoCommand{
+ {
label: "Memory Information",
cmd: []string{"cat", "/proc/meminfo"},
},
- infoCommand{
+ {
label: "Disk Space",
cmd: []string{"df", "-m", "/", os.TempDir()},
},
- infoCommand{
+ {
label: "Disk INodes",
cmd: []string{"df", "-i", "/", os.TempDir()},
},
// Get and save the raw JSON container record from the API server
func (runner *ContainerRunner) LogContainerRecord() (err error) {
w := &ArvLogWriter{
- runner.ArvClient,
- runner.Container.UUID,
- "container",
- runner.LogCollection.Open("container.json"),
+ ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID,
+ loggingStream: "container",
+ writeCloser: runner.LogCollection.Open("container.json"),
}
+
// Get Container record JSON from the API Server
reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
if err != nil {
return fmt.Errorf("While retrieving container record from the API server: %v", err)
}
defer reader.Close()
- // Read the API server response as []byte
- json_bytes, err := ioutil.ReadAll(reader)
- if err != nil {
- return fmt.Errorf("While reading container record API server response: %v", err)
- }
- // Decode the JSON []byte
+
+ dec := json.NewDecoder(reader)
+ dec.UseNumber()
var cr map[string]interface{}
- if err = json.Unmarshal(json_bytes, &cr); err != nil {
+ if err = dec.Decode(&cr); err != nil {
return fmt.Errorf("While decoding the container record JSON response: %v", err)
}
// Re-encode it using indentation to improve readability
runner.CrunchLog.Print("Attaching container streams")
// If stdin mount is provided, attach it to the docker container
- var stdinRdr keepclient.Reader
+ var stdinRdr arvados.File
var stdinJson []byte
if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
if stdinMnt.Kind == "collection" {
if err != nil {
return nil, fmt.Errorf("While Stat on temp dir: %v", err)
}
- stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+ stdoutPath := filepath.Join(runner.HostOutputDir, subdirs)
err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
if err != nil {
return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
}
}
}
- stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath))
if err != nil {
return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
}
runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
+ runner.ContainerConfig.Volumes = runner.Volumes
+
runner.HostConfig = dockercontainer.HostConfig{
- Binds: runner.Binds,
- Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+ Binds: runner.Binds,
LogConfig: dockercontainer.LogConfig{
Type: "none",
},
+ Resources: dockercontainer.Resources{
+ CgroupParent: runner.setCgroupParent,
+ },
}
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
dockertypes.ContainerStartOptions{})
if err != nil {
- return fmt.Errorf("could not start container: %v", err)
+ var advice string
+ if strings.Contains(err.Error(), "no such file or directory") {
+ advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
+ }
+ return fmt.Errorf("could not start container: %v%s", err, advice)
}
runner.cStarted = true
return nil
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() error {
+func (runner *ContainerRunner) WaitFinish() (err error) {
runner.CrunchLog.Print("Waiting for container to finish")
- waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
+ waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+
+ var waitBody dockercontainer.ContainerWaitOKBody
+ select {
+ case waitBody = <-waitOk:
+ case err = <-waitErr:
+ }
+
if err != nil {
return fmt.Errorf("container wait: %v", err)
}
- runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
- code := int(waitDocker)
+ runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+ code := int(waitBody.StatusCode)
runner.ExitCode = &code
waitMount := runner.ArvMountExit
select {
- case err := <-waitMount:
+ case err = <-waitMount:
runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
waitMount = nil
runner.stop()
return nil
}
+var NotInOutputDirError = fmt.Errorf("Must point to path within the output directory")
+
+func (runner *ContainerRunner) derefOutputSymlink(path string, startinfo os.FileInfo) (tgt string, readlinktgt string, info os.FileInfo, err error) {
+ // Follow symlinks if necessary
+ info = startinfo
+ tgt = path
+ readlinktgt = ""
+ nextlink := path
+ for followed := 0; info.Mode()&os.ModeSymlink != 0; followed++ {
+ if followed >= 16 {
+ // Got stuck in a loop or just a pathological number of links, give up.
+ err = fmt.Errorf("Followed too many symlinks from path %q", path)
+ return
+ }
+
+ readlinktgt, err = os.Readlink(nextlink)
+ if err != nil {
+ return
+ }
+
+ tgt = readlinktgt
+ if !strings.HasPrefix(tgt, "/") {
+ // Relative symlink, resolve it to host path
+ tgt = filepath.Join(filepath.Dir(path), tgt)
+ }
+ if strings.HasPrefix(tgt, runner.Container.OutputPath+"/") && !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
+ // Absolute symlink to container output path, adjust it to host output path.
+ tgt = filepath.Join(runner.HostOutputDir, tgt[len(runner.Container.OutputPath):])
+ }
+ if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
+ // After dereferencing, symlink target must either be
+ // within output directory, or must point to a
+ // collection mount.
+ err = NotInOutputDirError
+ return
+ }
+
+ info, err = os.Lstat(tgt)
+ if err != nil {
+ // tgt
+ err = fmt.Errorf("Symlink in output %q points to invalid location %q: %v",
+ path[len(runner.HostOutputDir):], readlinktgt, err)
+ return
+ }
+
+ nextlink = tgt
+ }
+
+ return
+}
+
+// UploadFile uploads files within the output directory, with special handling
+// for symlinks. If the symlink leads to a keep mount, copy the manifest text
+// from the keep mount into the output manifestText. Ensure that whether
+// symlinks are relative or absolute, they must remain within the output
+// directory.
+//
+// Assumes initial value of "path" is absolute, and located within runner.HostOutputDir.
+func (runner *ContainerRunner) UploadOutputFile(
+ path string,
+ info os.FileInfo,
+ infoerr error,
+ binds []string,
+ walkUpload *WalkUpload,
+ relocateFrom string,
+ relocateTo string,
+ followed int) (manifestText string, err error) {
+
+ if info.Mode().IsDir() {
+ return
+ }
+
+ if infoerr != nil {
+ return "", infoerr
+ }
+
+ if followed >= 8 {
+ // Got stuck in a loop or just a pathological number of
+ // directory links, give up.
+ err = fmt.Errorf("Followed too many symlinks from path %q", path)
+ return
+ }
+
+ // When following symlinks, the source path may need to be logically
+ // relocated to some other path within the output collection. Remove
+ // the relocateFrom prefix and replace it with relocateTo.
+ relocated := relocateTo + path[len(relocateFrom):]
+
+ tgt, readlinktgt, info, derefErr := runner.derefOutputSymlink(path, info)
+ if derefErr != nil && derefErr != NotInOutputDirError {
+ return "", derefErr
+ }
+
+ // go through mounts and try reverse map to collection reference
+ for _, bind := range binds {
+ mnt := runner.Container.Mounts[bind]
+ if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+ // get path relative to bind
+ targetSuffix := tgt[len(bind):]
+
+ // Copy mount and adjust the path to add path relative to the bind
+ adjustedMount := mnt
+ adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
+
+ // Terminates in this keep mount, so add the
+ // manifest text at appropriate location.
+ outputSuffix := path[len(runner.HostOutputDir):]
+ manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
+ return
+ }
+ }
+
+ // If target is not a collection mount, it must be located within the
+ // output directory, otherwise it is an error.
+ if derefErr == NotInOutputDirError {
+ err = fmt.Errorf("Symlink in output %q points to invalid location %q, must point to path within the output directory.",
+ path[len(runner.HostOutputDir):], readlinktgt)
+ return
+ }
+
+ if info.Mode().IsRegular() {
+ return "", walkUpload.UploadFile(relocated, tgt)
+ }
+
+ if info.Mode().IsDir() {
+ // Symlink leads to directory. Walk() doesn't follow
+ // directory symlinks, so we walk the target directory
+ // instead. Within the walk, file paths are relocated
+ // so they appear under the original symlink path.
+ err = filepath.Walk(tgt, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+ var m string
+ m, walkerr = runner.UploadOutputFile(walkpath, walkinfo, walkerr,
+ binds, walkUpload, tgt, relocated, followed+1)
+ if walkerr == nil {
+ manifestText = manifestText + m
+ }
+ return walkerr
+ })
+ return
+ }
+
+ return
+}
+
// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
func (runner *ContainerRunner) CaptureOutput() error {
if runner.finalState != "Complete" {
return fmt.Errorf("While checking host output path: %v", err)
}
+ // Pre-populate output from the configured mount points
+ var binds []string
+ for bind, mnt := range runner.Container.Mounts {
+ if mnt.Kind == "collection" {
+ binds = append(binds, bind)
+ }
+ }
+ sort.Strings(binds)
+
var manifestText string
collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
_, err = os.Stat(collectionMetafile)
if err != nil {
// Regular directory
+
cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
- manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+ walkUpload := cw.BeginUpload(runner.HostOutputDir, runner.CrunchLog.Logger)
+
+ var m string
+ err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
+ m, err = runner.UploadOutputFile(path, info, err, binds, walkUpload, "", "", 0)
+ if err == nil {
+ manifestText = manifestText + m
+ }
+ return err
+ })
+
+ cw.EndUpload(walkUpload)
+
+ if err != nil {
+ return fmt.Errorf("While uploading output files: %v", err)
+ }
+
+ m, err = cw.ManifestText()
+ manifestText = manifestText + m
if err != nil {
return fmt.Errorf("While uploading output files: %v", err)
}
manifestText = rec.ManifestText
}
- // Pre-populate output from the configured mount points
- var binds []string
- for bind, _ := range runner.Container.Mounts {
- binds = append(binds, bind)
- }
- sort.Strings(binds)
-
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
// point, but re-open crunch log with ArvClient in case there are any
// other further (such as failing to write the log to Keep!) while
// shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
- "crunch-run", nil})
+ runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
// NewArvLogWriter creates an ArvLogWriter
func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
- return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
+ return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
+ writeCloser: runner.LogCollection.Open(name + ".txt")}
}
// Run the full container lifecycle.
if err == nil {
err = e
}
+ if runner.finalState == "Complete" {
+ // There was an error in the finalization.
+ runner.finalState = "Cancelled"
+ }
}
// Log the error encountered in Run(), if any
// a new one in case we needed to log anything while
// finalizing.
runner.CrunchLog.Close()
+
+ runner.teardown()
}()
- err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
+ err = runner.fetchContainerRecord()
if err != nil {
- err = fmt.Errorf("While getting container record: %v", err)
return
}
// setup signal handling
- runner.SetupSignals()
+ runner.setupSignals()
// check for and/or load image
err = runner.LoadImage()
return
}
+// Fetch the current container record (uuid = runner.Container.UUID)
+// into runner.Container.
+func (runner *ContainerRunner) fetchContainerRecord() error {
+ reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+ if err != nil {
+ return fmt.Errorf("error fetching container record: %v", err)
+ }
+ defer reader.Close()
+
+ dec := json.NewDecoder(reader)
+ dec.UseNumber()
+ err = dec.Decode(&runner.Container)
+ if err != nil {
+ return fmt.Errorf("error decoding container record: %v", err)
+ }
+ return nil
+}
+
// NewContainerRunner creates a new container runner.
func NewContainerRunner(api IArvadosClient,
kc IKeepClient,
cr.Container.UUID = containerUUID
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+
+ loadLogThrottleParams(api)
+
return cr
}
networkMode := flag.String("container-network-mode", "default",
`Set networking mode for container. Corresponds to Docker network mode (--net).
`)
+ memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
flag.Parse()
containerId := flag.Arg(0)
if err != nil {
log.Fatalf("%s: %v", containerId, err)
}
+ kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
var docker *dockerclient.Client
cr.expectCgroupParent = p
}
- err = cr.Run()
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
+ runerr := cr.Run()
+
+ if *memprofile != "" {
+ f, err := os.Create(*memprofile)
+ if err != nil {
+ log.Printf("could not create memory profile: ", err)
+ }
+ runtime.GC() // get up-to-date statistics
+ if err := pprof.WriteHeapProfile(f); err != nil {
+ log.Printf("could not write memory profile: ", err)
+ }
+ closeerr := f.Close()
+ if closeerr != nil {
+ log.Printf("closing memprofile file: ", err)
+ }
}
+ if runerr != nil {
+ log.Fatalf("%s: %v", containerId, runerr)
+ }
}