import (
"bytes"
- "context"
"encoding/json"
"errors"
"flag"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+ "golang.org/x/net/context"
dockertypes "github.com/docker/docker/api/types"
dockercontainer "github.com/docker/docker/api/types/container"
ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
- ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+ ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
}
-// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
-// that executes the docker requests on dockerclient.Client
-type ThinDockerClientProxy struct {
- Docker *dockerclient.Client
-}
-
-// ContainerAttach invokes dockerclient.Client.ContainerAttach
-func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
- return proxy.Docker.ContainerAttach(ctx, container, options)
-}
-
-// ContainerCreate invokes dockerclient.Client.ContainerCreate
-func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
- networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
- return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
-}
-
-// ContainerStart invokes dockerclient.Client.ContainerStart
-func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
- return proxy.Docker.ContainerStart(ctx, container, options)
-}
-
-// ContainerStop invokes dockerclient.Client.ContainerStop
-func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
- return proxy.Docker.ContainerStop(ctx, container, timeout)
-}
-
-// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
- return proxy.Docker.ContainerWait(ctx, container, condition)
-}
-
-// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
-func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
- return proxy.Docker.ImageInspectWithRaw(ctx, image)
-}
-
-// ImageLoad invokes dockerclient.Client.ImageLoad
-func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
- return proxy.Docker.ImageLoad(ctx, input, quiet)
-}
-
-// ImageRemove invokes dockerclient.Client.ImageRemove
-func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
- return proxy.Docker.ImageRemove(ctx, image, options)
-}
-
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
setCgroupParent string
cStateLock sync.Mutex
- cStarted bool // StartContainer() succeeded
cCancelled bool // StopContainer() invoked
enableNetwork string // one of "default" or "always"
signal.Notify(runner.SigChan, syscall.SIGQUIT)
go func(sig chan os.Signal) {
- s := <-sig
- if s != nil {
- runner.CrunchLog.Printf("Caught signal %v", s)
+ for s := range sig {
+ runner.CrunchLog.Printf("caught signal: %v", s)
+ runner.stop()
}
- runner.stop()
}(runner.SigChan)
}
func (runner *ContainerRunner) stop() {
runner.cStateLock.Lock()
defer runner.cStateLock.Unlock()
- if runner.cCancelled {
+ if runner.ContainerID == "" {
return
}
runner.cCancelled = true
- if runner.cStarted {
- timeout := time.Duration(10)
- err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
- if err != nil {
- runner.CrunchLog.Printf("StopContainer failed: %s", err)
- }
- // Suppress multiple calls to stop()
- runner.cStarted = false
+ runner.CrunchLog.Printf("removing container")
+ err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+ if err != nil {
+ runner.CrunchLog.Printf("error removing container: %s", err)
}
}
func (runner *ContainerRunner) stopSignals() {
if runner.SigChan != nil {
signal.Stop(runner.SigChan)
- close(runner.SigChan)
}
}
}
}
- runner.loggingDone <- true
close(runner.loggingDone)
return
}
cmd []string
}
-// LogNodeInfo gathers node information and store it on the log for debugging
-// purposes.
-func (runner *ContainerRunner) LogNodeInfo() (err error) {
+// LogHostInfo logs info about the current host, for debugging and
+// accounting purposes. Although it's logged as "node-info", this is
+// about the environment where crunch-run is actually running, which
+// might differ from what's described in the node record (see
+// LogNodeRecord).
+func (runner *ContainerRunner) LogHostInfo() (err error) {
w := runner.NewLogWriter("node-info")
- logger := log.New(w, "node-info", 0)
commands := []infoCommand{
{
}
// Run commands with informational output to be logged.
- var out []byte
for _, command := range commands {
- out, err = exec.Command(command.cmd[0], command.cmd[1:]...).CombinedOutput()
- if err != nil {
- return fmt.Errorf("While running command %q: %v",
- command.cmd, err)
- }
- logger.Println(command.label)
- for _, line := range strings.Split(string(out), "\n") {
- logger.Println(" ", line)
+ fmt.Fprintln(w, command.label)
+ cmd := exec.Command(command.cmd[0], command.cmd[1:]...)
+ cmd.Stdout = w
+ cmd.Stderr = w
+ if err := cmd.Run(); err != nil {
+ err = fmt.Errorf("While running command %q: %v", command.cmd, err)
+ fmt.Fprintln(w, err)
+ return err
}
+ fmt.Fprintln(w, "")
}
err = w.Close()
}
// LogContainerRecord gets and saves the raw JSON container record from the API server
-func (runner *ContainerRunner) LogContainerRecord() (err error) {
+func (runner *ContainerRunner) LogContainerRecord() error {
+ logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}, nil)
+ if !logged && err == nil {
+ err = fmt.Errorf("error: no container record found for %s", runner.Container.UUID)
+ }
+ return err
+}
+
+// LogNodeRecord logs arvados#node record corresponding to the current host.
+func (runner *ContainerRunner) LogNodeRecord() error {
+ hostname := os.Getenv("SLURMD_NODENAME")
+ if hostname == "" {
+ hostname, _ = os.Hostname()
+ }
+ _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
+ // The "info" field has admin-only info when obtained
+ // with a privileged token, and should not be logged.
+ node, ok := resp.(map[string]interface{})
+ if ok {
+ delete(node, "info")
+ }
+ })
+ return err
+}
+
+func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
w := &ArvLogWriter{
ArvClient: runner.ArvClient,
UUID: runner.Container.UUID,
- loggingStream: "container",
- writeCloser: runner.LogCollection.Open("container.json"),
+ loggingStream: label,
+ writeCloser: runner.LogCollection.Open(label + ".json"),
}
- // Get Container record JSON from the API Server
- reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+ reader, err := runner.ArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
if err != nil {
- return fmt.Errorf("While retrieving container record from the API server: %v", err)
+ return false, fmt.Errorf("error getting %s record: %v", label, err)
}
defer reader.Close()
dec := json.NewDecoder(reader)
dec.UseNumber()
- var cr map[string]interface{}
- if err = dec.Decode(&cr); err != nil {
- return fmt.Errorf("While decoding the container record JSON response: %v", err)
+ var resp map[string]interface{}
+ if err = dec.Decode(&resp); err != nil {
+ return false, fmt.Errorf("error decoding %s list response: %v", label, err)
+ }
+ items, ok := resp["items"].([]interface{})
+ if !ok {
+ return false, fmt.Errorf("error decoding %s list response: no \"items\" key in API list response", label)
+ } else if len(items) < 1 {
+ return false, nil
+ }
+ if munge != nil {
+ munge(items[0])
}
// Re-encode it using indentation to improve readability
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
- if err = enc.Encode(cr); err != nil {
- return fmt.Errorf("While logging the JSON container record: %v", err)
+ if err = enc.Encode(items[0]); err != nil {
+ return false, fmt.Errorf("error logging %s record: %v", label, err)
}
err = w.Close()
if err != nil {
- return fmt.Errorf("While closing container.json log: %v", err)
+ return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
}
- return nil
+ return true, nil
}
// AttachStreams connects the docker container stdin, stdout and stderr logs
}
return fmt.Errorf("could not start container: %v%s", err, advice)
}
- runner.cStarted = true
return nil
}
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() (err error) {
+func (runner *ContainerRunner) WaitFinish() error {
runner.CrunchLog.Print("Waiting for container to finish")
- waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+ waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+ arvMountExit := runner.ArvMountExit
+ for {
+ select {
+ case waitBody := <-waitOk:
+ runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+ code := int(waitBody.StatusCode)
+ runner.ExitCode = &code
+
+ // wait for stdout/stderr to complete
+ <-runner.loggingDone
+ return nil
- go func() {
- <-runner.ArvMountExit
- if runner.cStarted {
+ case err := <-waitErr:
+ return fmt.Errorf("container wait: %v", err)
+
+ case <-arvMountExit:
runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
runner.stop()
+ // arvMountExit will always be ready now that
+ // it's closed, but that doesn't interest us.
+ arvMountExit = nil
}
- }()
-
- var waitBody dockercontainer.ContainerWaitOKBody
- select {
- case waitBody = <-waitOk:
- case err = <-waitErr:
- }
-
- // Container isn't running any more
- runner.cStarted = false
-
- if err != nil {
- return fmt.Errorf("container wait: %v", err)
}
-
- runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
- code := int(waitBody.StatusCode)
- runner.ExitCode = &code
-
- // wait for stdout/stderr to complete
- <-runner.loggingDone
-
- return nil
}
var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
relocateTo string,
followed int) (manifestText string, err error) {
- if info.Mode().IsDir() {
- return
- }
-
if infoerr != nil {
return "", infoerr
}
+ if info.Mode().IsDir() {
+ // if empty, need to create a .keep file
+ dir, direrr := os.Open(path)
+ if direrr != nil {
+ return "", direrr
+ }
+ defer dir.Close()
+ names, eof := dir.Readdirnames(1)
+ if len(names) == 0 && eof == io.EOF && path != runner.HostOutputDir {
+ containerPath := runner.OutputPath + path[len(runner.HostOutputDir):]
+ for _, bind := range binds {
+ mnt := runner.Container.Mounts[bind]
+ // Check if there is a bind for this
+ // directory, in which case assume we don't need .keep
+ if (containerPath == bind || strings.HasPrefix(containerPath, bind+"/")) && mnt.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+ return
+ }
+ }
+ outputSuffix := path[len(runner.HostOutputDir)+1:]
+ return fmt.Sprintf("./%v d41d8cd98f00b204e9800998ecf8427e+0 0:0:.keep\n", outputSuffix), nil
+ }
+ return
+ }
+
if followed >= limitFollowSymlinks {
// Got stuck in a loop or just a pathological number of
// directory links, give up.
return
}
- // When following symlinks, the source path may need to be logically
- // relocated to some other path within the output collection. Remove
- // the relocateFrom prefix and replace it with relocateTo.
+ // "path" is the actual path we are visiting
+ // "tgt" is the target of "path" (a non-symlink) after following symlinks
+ // "relocated" is the path in the output manifest where the file should be placed,
+ // but has HostOutputDir as a prefix.
+
+ // The destination path in the output manifest may need to be
+ // logically relocated to some other path in order to appear
+ // in the correct location as a result of following a symlink.
+ // Remove the relocateFrom prefix and replace it with
+ // relocateTo.
relocated := relocateTo + path[len(relocateFrom):]
tgt, readlinktgt, info, derefErr := runner.derefOutputSymlink(path, info)
// Terminates in this keep mount, so add the
// manifest text at appropriate location.
- outputSuffix := path[len(runner.HostOutputDir):]
+ outputSuffix := relocated[len(runner.HostOutputDir):]
manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
return
}
// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
func (runner *ContainerRunner) CaptureOutput() error {
- if runner.finalState != "Complete" {
- return nil
- }
-
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
// Output may have been set directly by the container, so
// refresh the container record to check.
if err != nil {
return
}
-
- // Gather and record node information
- err = runner.LogNodeInfo()
+ err = runner.LogHostInfo()
+ if err != nil {
+ return
+ }
+ err = runner.LogNodeRecord()
if err != nil {
return
}
- // Save container.json record on log collection
err = runner.LogContainerRecord()
if err != nil {
return
}
err = runner.WaitFinish()
- if err == nil {
+ if err == nil && !runner.IsCancelled() {
runner.finalState = "Complete"
}
return
// API version 1.21 corresponds to Docker 1.9, which is currently the
// minimum version we want to support.
docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- dockerClientProxy := ThinDockerClientProxy{Docker: docker}
-
- cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+ cr := NewContainerRunner(api, kc, docker, containerId)
if dockererr != nil {
cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
cr.checkBrokenNode(dockererr)