Merge branch '14328-watch-docker-ps'
authorTom Clegg <tclegg@veritasgenetics.com>
Fri, 26 Oct 2018 17:14:38 +0000 (13:14 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Fri, 26 Oct 2018 17:14:38 +0000 (13:14 -0400)
refs #14328

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go

index 44560b80a0973e78488b2bf2dbb898877d98891f..27136b45227e01598cf51b0d80ab8d03ab12e4d4 100644 (file)
@@ -79,6 +79,7 @@ type ThinDockerClient interface {
        ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
        ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
        ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
+       ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error)
        ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
        ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
        ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
@@ -149,11 +150,14 @@ type ContainerRunner struct {
 
        cStateLock sync.Mutex
        cCancelled bool // StopContainer() invoked
+       cRemoved   bool // docker confirmed the container no longer exists
 
        enableNetwork   string // one of "default" or "always"
        networkMode     string // passed through to HostConfig.NetworkMode
        arvMountLog     *ThrottledLogger
        checkContainerd time.Duration
+
+       containerWatchdogInterval time.Duration
 }
 
 // setupSignals sets up signal handling to gracefully terminate the underlying
@@ -187,6 +191,9 @@ func (runner *ContainerRunner) stop(sig os.Signal) {
        if err != nil {
                runner.CrunchLog.Printf("error removing container: %s", err)
        }
+       if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) {
+               runner.cRemoved = true
+       }
 }
 
 var errorBlacklist = []string{
@@ -1124,6 +1131,32 @@ func (runner *ContainerRunner) WaitFinish() error {
                runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
        }
 
+       containerGone := make(chan struct{})
+       go func() {
+               defer close(containerGone)
+               if runner.containerWatchdogInterval < 1 {
+                       runner.containerWatchdogInterval = time.Minute
+               }
+               for range time.NewTicker(runner.containerWatchdogInterval).C {
+                       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
+                       ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
+                       cancel()
+                       runner.cStateLock.Lock()
+                       done := runner.cRemoved || runner.ExitCode != nil
+                       runner.cStateLock.Unlock()
+                       if done {
+                               return
+                       } else if err != nil {
+                               runner.CrunchLog.Printf("Error inspecting container: %s", err)
+                               runner.checkBrokenNode(err)
+                               return
+                       } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
+                               runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State)
+                               return
+                       }
+               }
+       }()
+
        containerdGone := make(chan error)
        defer close(containerdGone)
        if runner.checkContainerd > 0 {
@@ -1171,6 +1204,9 @@ func (runner *ContainerRunner) WaitFinish() error {
                        runner.stop(nil)
                        runTimeExceeded = nil
 
+               case <-containerGone:
+                       return errors.New("docker client never returned status")
+
                case err := <-containerdGone:
                        return err
                }
index 217d4236ba6728619f20e6158589c9129d145e68..eb4f220e227d399b31b0669b15ba77c75449e557 100644 (file)
@@ -47,6 +47,7 @@ var _ = Suite(&TestSuite{})
 type TestSuite struct {
        client *arvados.Client
        docker *TestDockerClient
+       runner *ContainerRunner
 }
 
 func (s *TestSuite) SetUpTest(c *C) {
@@ -103,6 +104,7 @@ type TestDockerClient struct {
        api         *ArvTestClient
        realTemp    string
        calledWait  bool
+       ctrExited   bool
 }
 
 func NewTestDockerClient() *TestDockerClient {
@@ -176,6 +178,17 @@ func (t *TestDockerClient) ContainerWait(ctx context.Context, container string,
        return body, err
 }
 
+func (t *TestDockerClient) ContainerInspect(ctx context.Context, id string) (c dockertypes.ContainerJSON, err error) {
+       c.ContainerJSONBase = &dockertypes.ContainerJSONBase{}
+       c.ID = "abcde"
+       if t.ctrExited {
+               c.State = &dockertypes.ContainerState{Status: "exited", Dead: true}
+       } else {
+               c.State = &dockertypes.ContainerState{Status: "running", Pid: 1234, Running: true}
+       }
+       return
+}
+
 func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
        if t.exitCode == 2 {
                return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
@@ -736,7 +749,9 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
        defer kc.Close()
        cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
+       s.runner = cr
        cr.statInterval = 100 * time.Millisecond
+       cr.containerWatchdogInterval = time.Second
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
 
@@ -830,6 +845,24 @@ func (s *TestSuite) TestRunTimeExceeded(c *C) {
        c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
 }
 
+func (s *TestSuite) TestContainerWaitFails(c *C) {
+       api, _, _ := s.fullRunHelper(c, `{
+    "command": ["sleep", "3"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1
+}`, nil, 0, func(t *TestDockerClient) {
+               t.ctrExited = true
+               time.Sleep(10 * time.Second)
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*")
+}
+
 func (s *TestSuite) TestCrunchstat(c *C) {
        api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],