package main
import (
+ "context"
"encoding/json"
"errors"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/lib/crunchstat"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
- "github.com/curoverse/dockerclient"
"io"
"io/ioutil"
"log"
"sync"
"syscall"
"time"
+
+ "git.curoverse.com/arvados.git/lib/crunchstat"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+
+ dockertypes "github.com/docker/docker/api/types"
+ dockercontainer "github.com/docker/docker/api/types/container"
+ dockernetwork "github.com/docker/docker/api/types/network"
+ dockerclient "github.com/docker/docker/client"
)
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
+ CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error)
Discovery(key string) (interface{}, error)
}
// ThinDockerClient is the minimal Docker client interface used by crunch-run.
type ThinDockerClient interface {
- StopContainer(id string, timeout int) error
- InspectImage(id string) (*dockerclient.ImageInfo, error)
- LoadImage(reader io.Reader) error
- CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
- StartContainer(id string, config *dockerclient.HostConfig) error
- AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
- Wait(id string) <-chan dockerclient.WaitResult
- RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
+ ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
+ ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+ networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
+ ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
+ ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+ ContainerWait(ctx context.Context, container string) (int64, error)
+ ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
+ ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
+ ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
+}
+
+// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
+// that executes the docker requests on dockerclient.Client
+type ThinDockerClientProxy struct {
+ Docker *dockerclient.Client
+}
+
+// ContainerAttach invokes dockerclient.Client.ContainerAttach
+func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
+ return proxy.Docker.ContainerAttach(ctx, container, options)
+}
+
+// ContainerCreate invokes dockerclient.Client.ContainerCreate
+func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+ networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
+ return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
+}
+
+// ContainerStart invokes dockerclient.Client.ContainerStart
+func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+ return proxy.Docker.ContainerStart(ctx, container, options)
+}
+
+// ContainerStop invokes dockerclient.Client.ContainerStop
+func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+ return proxy.Docker.ContainerStop(ctx, container, timeout)
+}
+
+// ContainerWait invokes dockerclient.Client.ContainerWait
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) {
+ return proxy.Docker.ContainerWait(ctx, container)
+}
+
+// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
+func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+ return proxy.Docker.ImageInspectWithRaw(ctx, image)
+}
+
+// ImageLoad invokes dockerclient.Client.ImageLoad
+func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+ return proxy.Docker.ImageLoad(ctx, input, quiet)
+}
+
+// ImageRemove invokes dockerclient.Client.ImageRemove
+func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
+ return proxy.Docker.ImageRemove(ctx, image, options)
}
// ContainerRunner is the main stateful struct used for a single execution of a
ArvClient IArvadosClient
Kc IKeepClient
arvados.Container
- dockerclient.ContainerConfig
- dockerclient.HostConfig
+ ContainerConfig dockercontainer.Config
+ dockercontainer.HostConfig
token string
ContainerID string
ExitCode *int
CleanupTempDir []string
Binds []string
OutputPDH *string
- CancelLock sync.Mutex
- Cancelled bool
SigChan chan os.Signal
ArvMountExit chan error
finalState string
// parent to be X" feature even on sites where the "specify
// cgroup parent" feature breaks.
setCgroupParent string
+
+ cStateLock sync.Mutex
+ cStarted bool // StartContainer() succeeded
+ cCancelled bool // StopContainer() invoked
+
+ enableNetwork string // one of "default" or "always"
+ networkMode string // passed through to HostConfig.NetworkMode
}
// SetupSignals sets up signal handling to gracefully terminate the underlying
signal.Notify(runner.SigChan, syscall.SIGINT)
signal.Notify(runner.SigChan, syscall.SIGQUIT)
- go func(sig <-chan os.Signal) {
- for range sig {
- if !runner.Cancelled {
- runner.CancelLock.Lock()
- runner.Cancelled = true
- if runner.ContainerID != "" {
- runner.Docker.StopContainer(runner.ContainerID, 10)
- }
- runner.CancelLock.Unlock()
- }
- }
+ go func(sig chan os.Signal) {
+ <-sig
+ runner.stop()
+ signal.Stop(sig)
}(runner.SigChan)
}
+// stop the underlying Docker container.
+func (runner *ContainerRunner) stop() {
+ runner.cStateLock.Lock()
+ defer runner.cStateLock.Unlock()
+ if runner.cCancelled {
+ return
+ }
+ runner.cCancelled = true
+ if runner.cStarted {
+ timeout := time.Duration(10)
+ err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
+ if err != nil {
+ log.Printf("StopContainer failed: %s", err)
+ }
+ }
+}
+
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
- _, err = runner.Docker.InspectImage(imageID)
+ _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
if err != nil {
runner.CrunchLog.Print("Loading Docker image from keep")
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
- err = runner.Docker.LoadImage(readCloser)
+ response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, false)
if err != nil {
return fmt.Errorf("While loading container image into Docker: %v", err)
}
+ response.Body.Close()
} else {
runner.CrunchLog.Print("Docker image is available")
}
runner.statReporter.Start()
}
+type infoCommand struct {
+ label string
+ cmd []string
+}
+
+// Gather node information and store it on the log for debugging
+// purposes.
+func (runner *ContainerRunner) LogNodeInfo() (err error) {
+ w := runner.NewLogWriter("node-info")
+ logger := log.New(w, "node-info", 0)
+
+ commands := []infoCommand{
+ infoCommand{
+ label: "Host Information",
+ cmd: []string{"uname", "-a"},
+ },
+ infoCommand{
+ label: "CPU Information",
+ cmd: []string{"cat", "/proc/cpuinfo"},
+ },
+ infoCommand{
+ label: "Memory Information",
+ cmd: []string{"cat", "/proc/meminfo"},
+ },
+ infoCommand{
+ label: "Disk Space",
+ cmd: []string{"df", "-m", "/", os.TempDir()},
+ },
+ infoCommand{
+ label: "Disk INodes",
+ cmd: []string{"df", "-i", "/", os.TempDir()},
+ },
+ }
+
+ // Run commands with informational output to be logged.
+ var out []byte
+ for _, command := range commands {
+ out, err = exec.Command(command.cmd[0], command.cmd[1:]...).CombinedOutput()
+ if err != nil {
+ return fmt.Errorf("While running command %q: %v",
+ command.cmd, err)
+ }
+ logger.Println(command.label)
+ for _, line := range strings.Split(string(out), "\n") {
+ logger.Println(" ", line)
+ }
+ }
+
+ err = w.Close()
+ if err != nil {
+ return fmt.Errorf("While closing node-info logs: %v", err)
+ }
+ return nil
+}
+
+// Get and save the raw JSON container record from the API server
+func (runner *ContainerRunner) LogContainerRecord() (err error) {
+ w := &ArvLogWriter{
+ runner.ArvClient,
+ runner.Container.UUID,
+ "container",
+ runner.LogCollection.Open("container.json"),
+ }
+ // Get Container record JSON from the API Server
+ reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+ if err != nil {
+ return fmt.Errorf("While retrieving container record from the API server: %v", err)
+ }
+ defer reader.Close()
+ // Read the API server response as []byte
+ json_bytes, err := ioutil.ReadAll(reader)
+ if err != nil {
+ return fmt.Errorf("While reading container record API server response: %v", err)
+ }
+ // Decode the JSON []byte
+ var cr map[string]interface{}
+ if err = json.Unmarshal(json_bytes, &cr); err != nil {
+ return fmt.Errorf("While decoding the container record JSON response: %v", err)
+ }
+ // Re-encode it using indentation to improve readability
+ enc := json.NewEncoder(w)
+ enc.SetIndent("", " ")
+ if err = enc.Encode(cr); err != nil {
+ return fmt.Errorf("While logging the JSON container record: %v", err)
+ }
+ err = w.Close()
+ if err != nil {
+ return fmt.Errorf("While closing container.json log: %v", err)
+ }
+ return nil
+}
+
// AttachLogs connects the docker container stdout and stderr logs to the
// Arvados logger which logs to Keep and the API server logs table.
func (runner *ContainerRunner) AttachStreams() (err error) {
runner.CrunchLog.Print("Attaching container streams")
- var containerReader io.Reader
- containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
- &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
+ response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
+ dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
if err != nil {
return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
}
}
runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
- go runner.ProcessDockerAttach(containerReader)
+ go runner.ProcessDockerAttach(response.Reader)
return nil
}
for k, v := range runner.Container.Environment {
runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
+
+ runner.HostConfig = dockercontainer.HostConfig{
+ Binds: runner.Binds,
+ Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+ LogConfig: dockercontainer.LogConfig{
+ Type: "none",
+ },
+ }
+
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
tok, err := runner.ContainerToken()
if err != nil {
"ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
"ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
)
- runner.ContainerConfig.NetworkDisabled = false
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
} else {
- runner.ContainerConfig.NetworkDisabled = true
+ if runner.enableNetwork == "always" {
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
+ } else {
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
+ }
}
- var err error
- runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
+ createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
}
- runner.HostConfig = dockerclient.HostConfig{
- Binds: runner.Binds,
- CgroupParent: runner.setCgroupParent,
- LogConfig: dockerclient.LogConfig{
- Type: "none",
- },
- }
+ runner.ContainerID = createdBody.ID
return runner.AttachStreams()
}
// StartContainer starts the docker container created by CreateContainer.
func (runner *ContainerRunner) StartContainer() error {
runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
- err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
+ runner.cStateLock.Lock()
+ defer runner.cStateLock.Unlock()
+ if runner.cCancelled {
+ return ErrCancelled
+ }
+ err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
+ dockertypes.ContainerStartOptions{})
if err != nil {
return fmt.Errorf("could not start container: %v", err)
}
+ runner.cStarted = true
return nil
}
func (runner *ContainerRunner) WaitFinish() error {
runner.CrunchLog.Print("Waiting for container to finish")
- result := runner.Docker.Wait(runner.ContainerID)
- wr := <-result
- if wr.Error != nil {
- return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
+ waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
+ if err != nil {
+ return fmt.Errorf("container wait: %v", err)
+ }
+
+ runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
+ code := int(waitDocker)
+ runner.ExitCode = &code
+
+ waitMount := runner.ArvMountExit
+ select {
+ case err := <-waitMount:
+ runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
+ waitMount = nil
+ runner.stop()
+ default:
}
- runner.ExitCode = &wr.ExitCode
// wait for stdout/stderr to complete
<-runner.loggingDone
_, err = os.Stat(collectionMetafile)
if err != nil {
// Regular directory
- cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
+ cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
if err != nil {
return fmt.Errorf("While uploading output files: %v", err)
manifestText = manifest.Extract(".", ".").Text
err = runner.ArvClient.Create("collections",
arvadosclient.Dict{
+ "ensure_unique_name": true,
"collection": arvadosclient.Dict{
"is_trashed": true,
"name": "output for " + runner.Container.UUID,
var response arvados.Collection
err = runner.ArvClient.Create("collections",
arvadosclient.Dict{
+ "ensure_unique_name": true,
"collection": arvadosclient.Dict{
"is_trashed": true,
"name": "logs for " + runner.Container.UUID,
// UpdateContainerRunning updates the container state to "Running"
func (runner *ContainerRunner) UpdateContainerRunning() error {
- runner.CancelLock.Lock()
- defer runner.CancelLock.Unlock()
- if runner.Cancelled {
+ runner.cStateLock.Lock()
+ defer runner.cStateLock.Unlock()
+ if runner.cCancelled {
return ErrCancelled
}
return runner.ArvClient.Update("containers", runner.Container.UUID,
// IsCancelled returns the value of Cancelled, with goroutine safety.
func (runner *ContainerRunner) IsCancelled() bool {
- runner.CancelLock.Lock()
- defer runner.CancelLock.Unlock()
- return runner.Cancelled
+ runner.cStateLock.Lock()
+ defer runner.cStateLock.Unlock()
+ return runner.cCancelled
}
// NewArvLogWriter creates an ArvLogWriter
return
}
+ // Gather and record node information
+ err = runner.LogNodeInfo()
+ if err != nil {
+ return
+ }
+ // Save container.json record on log collection
+ err = runner.LogContainerRecord()
+ if err != nil {
+ return
+ }
+
runner.StartCrunchstat()
if runner.IsCancelled() {
cr.NewLogWriter = cr.NewArvLogWriter
cr.RunArvMount = cr.ArvMountCmd
cr.MkTempDir = ioutil.TempDir
- cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
+ cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
cr.Container.UUID = containerUUID
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
+ enableNetwork := flag.String("container-enable-networking", "default",
+ `Specify if networking should be enabled for container. One of 'default', 'always':
+ default: only enable networking if container requests it.
+ always: containers always have networking enabled
+ `)
+ networkMode := flag.String("container-network-mode", "default",
+ `Set networking mode for container. Corresponds to Docker network mode (--net).
+ `)
flag.Parse()
containerId := flag.Arg(0)
}
kc.Retries = 4
- var docker *dockerclient.DockerClient
- docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ var docker *dockerclient.Client
+ // API version 1.21 corresponds to Docker 1.9, which is currently the
+ // minimum version we want to support.
+ docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
if err != nil {
log.Fatalf("%s: %v", containerId, err)
}
- cr := NewContainerRunner(api, kc, docker, containerId)
+ dockerClientProxy := ThinDockerClientProxy{Docker: docker}
+
+ cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent
+ cr.enableNetwork = *enableNetwork
+ cr.networkMode = *networkMode
if *cgroupParentSubsystem != "" {
p := findCgroup(*cgroupParentSubsystem)
cr.setCgroupParent = p