import (
"bytes"
- "context"
"encoding/json"
"errors"
"flag"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+ "golang.org/x/net/context"
dockertypes "github.com/docker/docker/api/types"
dockercontainer "github.com/docker/docker/api/types/container"
ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
- ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+ ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
}
-// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
-// that executes the docker requests on dockerclient.Client
-type ThinDockerClientProxy struct {
- Docker *dockerclient.Client
-}
-
-// ContainerAttach invokes dockerclient.Client.ContainerAttach
-func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
- return proxy.Docker.ContainerAttach(ctx, container, options)
-}
-
-// ContainerCreate invokes dockerclient.Client.ContainerCreate
-func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
- networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
- return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
-}
-
-// ContainerStart invokes dockerclient.Client.ContainerStart
-func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
- return proxy.Docker.ContainerStart(ctx, container, options)
-}
-
-// ContainerStop invokes dockerclient.Client.ContainerStop
-func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
- return proxy.Docker.ContainerStop(ctx, container, timeout)
-}
-
-// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
- return proxy.Docker.ContainerWait(ctx, container, condition)
-}
-
-// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
-func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
- return proxy.Docker.ImageInspectWithRaw(ctx, image)
-}
-
-// ImageLoad invokes dockerclient.Client.ImageLoad
-func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
- return proxy.Docker.ImageLoad(ctx, input, quiet)
-}
-
-// ImageRemove invokes dockerclient.Client.ImageRemove
-func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
- return proxy.Docker.ImageRemove(ctx, image, options)
-}
-
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
ArvMountExit chan error
finalState string
- statLogger io.WriteCloser
- statReporter *crunchstat.Reporter
- statInterval time.Duration
- cgroupRoot string
+ statLogger io.WriteCloser
+ statReporter *crunchstat.Reporter
+ hoststatLogger io.WriteCloser
+ hoststatReporter *crunchstat.Reporter
+ statInterval time.Duration
+ cgroupRoot string
// What we expect the container's cgroup parent to be.
expectCgroupParent string
// What we tell docker to use as the container's cgroup
setCgroupParent string
cStateLock sync.Mutex
- cStarted bool // StartContainer() succeeded
cCancelled bool // StopContainer() invoked
enableNetwork string // one of "default" or "always"
signal.Notify(runner.SigChan, syscall.SIGQUIT)
go func(sig chan os.Signal) {
- s := <-sig
- if s != nil {
- runner.CrunchLog.Printf("Caught signal %v", s)
+ for s := range sig {
+ runner.CrunchLog.Printf("caught signal: %v", s)
+ runner.stop()
}
- runner.stop()
}(runner.SigChan)
}
func (runner *ContainerRunner) stop() {
runner.cStateLock.Lock()
defer runner.cStateLock.Unlock()
- if runner.cCancelled {
+ if runner.ContainerID == "" {
return
}
runner.cCancelled = true
- if runner.cStarted {
- timeout := time.Duration(10)
- err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
- if err != nil {
- runner.CrunchLog.Printf("StopContainer failed: %s", err)
- }
- // Suppress multiple calls to stop()
- runner.cStarted = false
+ runner.CrunchLog.Printf("removing container")
+ err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+ if err != nil {
+ runner.CrunchLog.Printf("error removing container: %s", err)
}
}
func (runner *ContainerRunner) stopSignals() {
if runner.SigChan != nil {
signal.Stop(runner.SigChan)
- close(runner.SigChan)
}
}
return
}
+func copyfile(src string, dst string) (err error) {
+ srcfile, err := os.Open(src)
+ if err != nil {
+ return
+ }
+
+ os.MkdirAll(path.Dir(dst), 0770)
+
+ dstfile, err := os.Create(dst)
+ if err != nil {
+ return
+ }
+ _, err = io.Copy(dstfile, srcfile)
+ if err != nil {
+ return
+ }
+
+ err = srcfile.Close()
+ err2 := dstfile.Close()
+
+ if err != nil {
+ return
+ }
+
+ if err2 != nil {
+ return err2
+ }
+
+ return nil
+}
+
func (runner *ContainerRunner) SetupMounts() (err error) {
err = runner.SetupArvMountPoint("keep")
if err != nil {
runner.Binds = nil
runner.Volumes = make(map[string]struct{})
needCertMount := true
+ type copyFile struct {
+ src string
+ bind string
+ }
+ var copyFiles []copyFile
var binds []string
for bind := range runner.Container.Mounts {
pdhOnly = false
src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
} else if mnt.PortableDataHash != "" {
- if mnt.Writable {
+ if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
return fmt.Errorf("Can never write to a collection specified by portable data hash")
}
idx := strings.Index(mnt.PortableDataHash, "/")
if mnt.Writable {
if bind == runner.Container.OutputPath {
runner.HostOutputDir = src
+ runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
} else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
- return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
+ copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
+ } else {
+ runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
}
- runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
} else {
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
}
}
}
+ for _, cp := range copyFiles {
+ dir, err := os.Stat(cp.src)
+ if err != nil {
+ return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+ }
+ if dir.IsDir() {
+ err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+ if walkerr != nil {
+ return walkerr
+ }
+ if walkinfo.Mode().IsRegular() {
+ return copyfile(walkpath, path.Join(cp.bind, walkpath[len(cp.src):]))
+ } else if walkinfo.Mode().IsDir() {
+ return os.MkdirAll(path.Join(cp.bind, walkpath[len(cp.src):]), 0770)
+ } else {
+ return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
+ }
+ })
+ } else {
+ err = copyfile(cp.src, cp.bind)
+ }
+ if err != nil {
+ return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+ }
+ }
+
return nil
}
func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
// Handle docker log protocol
// https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
+ defer close(runner.loggingDone)
header := make([]byte, 8)
- for {
- _, readerr := io.ReadAtLeast(containerReader, header, 8)
-
- if readerr == nil {
- readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
- if header[0] == 1 {
- // stdout
- _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
- } else {
- // stderr
- _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
+ var err error
+ for err == nil {
+ _, err = io.ReadAtLeast(containerReader, header, 8)
+ if err != nil {
+ if err == io.EOF {
+ err = nil
}
+ break
}
+ readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
+ if header[0] == 1 {
+ // stdout
+ _, err = io.CopyN(runner.Stdout, containerReader, readsize)
+ } else {
+ // stderr
+ _, err = io.CopyN(runner.Stderr, containerReader, readsize)
+ }
+ }
- if readerr != nil {
- if readerr != io.EOF {
- runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
- }
-
- closeerr := runner.Stdout.Close()
- if closeerr != nil {
- runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
- }
+ if err != nil {
+ runner.CrunchLog.Printf("error reading docker logs: %v", err)
+ }
- closeerr = runner.Stderr.Close()
- if closeerr != nil {
- runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
- }
+ err = runner.Stdout.Close()
+ if err != nil {
+ runner.CrunchLog.Printf("error closing stdout logs: %v", err)
+ }
- if runner.statReporter != nil {
- runner.statReporter.Stop()
- closeerr = runner.statLogger.Close()
- if closeerr != nil {
- runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
- }
- }
+ err = runner.Stderr.Close()
+ if err != nil {
+ runner.CrunchLog.Printf("error closing stderr logs: %v", err)
+ }
- runner.loggingDone <- true
- close(runner.loggingDone)
- return
+ if runner.statReporter != nil {
+ runner.statReporter.Stop()
+ err = runner.statLogger.Close()
+ if err != nil {
+ runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
}
}
}
-func (runner *ContainerRunner) StartCrunchstat() {
+func (runner *ContainerRunner) stopHoststat() error {
+ if runner.hoststatReporter == nil {
+ return nil
+ }
+ runner.hoststatReporter.Stop()
+ err := runner.hoststatLogger.Close()
+ if err != nil {
+ return fmt.Errorf("error closing hoststat logs: %v", err)
+ }
+ return nil
+}
+
+func (runner *ContainerRunner) startHoststat() {
+ runner.hoststatLogger = NewThrottledLogger(runner.NewLogWriter("hoststat"))
+ runner.hoststatReporter = &crunchstat.Reporter{
+ Logger: log.New(runner.hoststatLogger, "", 0),
+ CgroupRoot: runner.cgroupRoot,
+ PollPeriod: runner.statInterval,
+ }
+ runner.hoststatReporter.Start()
+}
+
+func (runner *ContainerRunner) startCrunchstat() {
runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
runner.statReporter = &crunchstat.Reporter{
CID: runner.ContainerID,
writeCloser: runner.LogCollection.Open(label + ".json"),
}
- // Get Container record JSON from the API Server
- reader, err := runner.ArvClient.CallRaw("GET", path, "", "", nil)
+ reader, err := runner.ArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
if err != nil {
return false, fmt.Errorf("error getting %s record: %v", label, err)
}
}
return fmt.Errorf("could not start container: %v%s", err, advice)
}
- runner.cStarted = true
return nil
}
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() (err error) {
+func (runner *ContainerRunner) WaitFinish() error {
runner.CrunchLog.Print("Waiting for container to finish")
- waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+ waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+ arvMountExit := runner.ArvMountExit
+ for {
+ select {
+ case waitBody := <-waitOk:
+ runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+ code := int(waitBody.StatusCode)
+ runner.ExitCode = &code
+
+ // wait for stdout/stderr to complete
+ <-runner.loggingDone
+ return nil
- go func() {
- <-runner.ArvMountExit
- if runner.cStarted {
+ case err := <-waitErr:
+ return fmt.Errorf("container wait: %v", err)
+
+ case <-arvMountExit:
runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
runner.stop()
+ // arvMountExit will always be ready now that
+ // it's closed, but that doesn't interest us.
+ arvMountExit = nil
}
- }()
-
- var waitBody dockercontainer.ContainerWaitOKBody
- select {
- case waitBody = <-waitOk:
- case err = <-waitErr:
- }
-
- // Container isn't running any more
- runner.cStarted = false
-
- if err != nil {
- return fmt.Errorf("container wait: %v", err)
}
-
- runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
- code := int(waitBody.StatusCode)
- runner.ExitCode = &code
-
- // wait for stdout/stderr to complete
- <-runner.loggingDone
-
- return nil
}
var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
relocateTo string,
followed int) (manifestText string, err error) {
- if info.Mode().IsDir() {
- return
- }
-
if infoerr != nil {
return "", infoerr
}
+ if info.Mode().IsDir() {
+ // if empty, need to create a .keep file
+ dir, direrr := os.Open(path)
+ if direrr != nil {
+ return "", direrr
+ }
+ defer dir.Close()
+ names, eof := dir.Readdirnames(1)
+ if len(names) == 0 && eof == io.EOF && path != runner.HostOutputDir {
+ containerPath := runner.OutputPath + path[len(runner.HostOutputDir):]
+ for _, bind := range binds {
+ mnt := runner.Container.Mounts[bind]
+ // Check if there is a bind for this
+ // directory, in which case assume we don't need .keep
+ if (containerPath == bind || strings.HasPrefix(containerPath, bind+"/")) && mnt.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+ return
+ }
+ }
+ outputSuffix := path[len(runner.HostOutputDir)+1:]
+ return fmt.Sprintf("./%v d41d8cd98f00b204e9800998ecf8427e+0 0:0:.keep\n", outputSuffix), nil
+ }
+ return
+ }
+
if followed >= limitFollowSymlinks {
// Got stuck in a loop or just a pathological number of
// directory links, give up.
return
}
- // When following symlinks, the source path may need to be logically
- // relocated to some other path within the output collection. Remove
- // the relocateFrom prefix and replace it with relocateTo.
+ // "path" is the actual path we are visiting
+ // "tgt" is the target of "path" (a non-symlink) after following symlinks
+ // "relocated" is the path in the output manifest where the file should be placed,
+ // but has HostOutputDir as a prefix.
+
+ // The destination path in the output manifest may need to be
+ // logically relocated to some other path in order to appear
+ // in the correct location as a result of following a symlink.
+ // Remove the relocateFrom prefix and replace it with
+ // relocateTo.
relocated := relocateTo + path[len(relocateFrom):]
tgt, readlinktgt, info, derefErr := runner.derefOutputSymlink(path, info)
// go through mounts and try reverse map to collection reference
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
- if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+ if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable {
// get path relative to bind
targetSuffix := tgt[len(bind):]
// Terminates in this keep mount, so add the
// manifest text at appropriate location.
- outputSuffix := path[len(runner.HostOutputDir):]
+ outputSuffix := relocated[len(runner.HostOutputDir):]
manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
return
}
// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
func (runner *ContainerRunner) CaptureOutput() error {
- if runner.finalState != "Complete" {
- return nil
- }
-
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
// Output may have been set directly by the container, so
// refresh the container record to check.
continue
}
- if mnt.ExcludeFromOutput == true {
+ if mnt.ExcludeFromOutput == true || mnt.Writable {
continue
}
}
checkErr(runner.CaptureOutput())
+ checkErr(runner.stopHoststat())
checkErr(runner.CommitLogs())
checkErr(runner.UpdateContainerFinal())
}()
if err != nil {
return
}
-
- // setup signal handling
runner.setupSignals()
+ runner.startHoststat()
// check for and/or load image
err = runner.LoadImage()
}
runner.finalState = "Cancelled"
- runner.StartCrunchstat()
+ runner.startCrunchstat()
err = runner.StartContainer()
if err != nil {
}
err = runner.WaitFinish()
- if err == nil {
+ if err == nil && !runner.IsCancelled() {
runner.finalState = "Complete"
}
return
// API version 1.21 corresponds to Docker 1.9, which is currently the
// minimum version we want to support.
docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- dockerClientProxy := ThinDockerClientProxy{Docker: docker}
-
- cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+ cr := NewContainerRunner(api, kc, docker, containerId)
if dockererr != nil {
cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
cr.checkBrokenNode(dockererr)