ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
+ ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
cStateLock sync.Mutex
cCancelled bool // StopContainer() invoked
+ cRemoved bool // docker confirmed the container no longer exists
enableNetwork string // one of "default" or "always"
networkMode string // passed through to HostConfig.NetworkMode
arvMountLog *ThrottledLogger
checkContainerd time.Duration
+
+ containerWatchdogInterval time.Duration
}
// setupSignals sets up signal handling to gracefully terminate the underlying
if err != nil {
runner.CrunchLog.Printf("error removing container: %s", err)
}
+ if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) {
+ runner.cRemoved = true
+ }
}
var errorBlacklist = []string{
runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
}
+ containerGone := make(chan struct{})
+ go func() {
+ defer close(containerGone)
+ if runner.containerWatchdogInterval < 1 {
+ runner.containerWatchdogInterval = time.Minute
+ }
+ for range time.NewTicker(runner.containerWatchdogInterval).C {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
+ ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
+ cancel()
+ runner.cStateLock.Lock()
+ done := runner.cRemoved || runner.ExitCode != nil
+ runner.cStateLock.Unlock()
+ if done {
+ return
+ } else if err != nil {
+ runner.CrunchLog.Printf("Error inspecting container: %s", err)
+ runner.checkBrokenNode(err)
+ return
+ } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
+ runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State)
+ return
+ }
+ }
+ }()
+
containerdGone := make(chan error)
defer close(containerdGone)
if runner.checkContainerd > 0 {
runner.stop(nil)
runTimeExceeded = nil
+ case <-containerGone:
+ return errors.New("docker client never returned status")
+
case err := <-containerdGone:
return err
}
type TestSuite struct {
client *arvados.Client
docker *TestDockerClient
+ runner *ContainerRunner
}
func (s *TestSuite) SetUpTest(c *C) {
api *ArvTestClient
realTemp string
calledWait bool
+ ctrExited bool
}
func NewTestDockerClient() *TestDockerClient {
return body, err
}
+func (t *TestDockerClient) ContainerInspect(ctx context.Context, id string) (c dockertypes.ContainerJSON, err error) {
+ c.ContainerJSONBase = &dockertypes.ContainerJSONBase{}
+ c.ID = "abcde"
+ if t.ctrExited {
+ c.State = &dockertypes.ContainerState{Status: "exited", Dead: true}
+ } else {
+ c.State = &dockertypes.ContainerState{Status: "running", Pid: 1234, Running: true}
+ }
+ return
+}
+
func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
if t.exitCode == 2 {
return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
defer kc.Close()
cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
+ s.runner = cr
cr.statInterval = 100 * time.Millisecond
+ cr.containerWatchdogInterval = time.Second
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
}
+func (s *TestSuite) TestContainerWaitFails(c *C) {
+ api, _, _ := s.fullRunHelper(c, `{
+ "command": ["sleep", "3"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1
+}`, nil, 0, func(t *TestDockerClient) {
+ t.ctrExited = true
+ time.Sleep(10 * time.Second)
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*")
+}
+
func (s *TestSuite) TestCrunchstat(c *C) {
api, _, _ := s.fullRunHelper(c, `{
"command": ["sleep", "1"],