Merge branch 'master' of git.curoverse.com:arvados into 11876-r-sdk
authorFuad Muhic <fmuhic@capeannenterprises.com>
Thu, 25 Jan 2018 14:15:04 +0000 (15:15 +0100)
committerFuad Muhic <fmuhic@capeannenterprises.com>
Thu, 25 Jan 2018 14:15:04 +0000 (15:15 +0100)
Arvados-DCO-1.1-Signed-off-by: Fuad Muhic <fmuhic@capeannenterprises.com>

services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/slurm.go [new file with mode: 0644]
services/crunch-dispatch-slurm/squeue.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go

index 3c89103f38dbc1cd094047d173c10bfe0b28f08e..ae2ca58421d3f3a58f0a4a0f28cbdc2beb56e6af 100644 (file)
@@ -7,14 +7,12 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
-       "bytes"
        "context"
        "flag"
        "fmt"
        "log"
        "math"
        "os"
-       "os/exec"
        "regexp"
        "strings"
        "time"
@@ -43,9 +41,12 @@ type Config struct {
 
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
+
+       slurm Slurm
 }
 
 func main() {
+       theConfig.slurm = &slurmCLI{}
        err := doMain()
        if err != nil {
                log.Fatal(err)
@@ -175,8 +176,7 @@ func niceness(priority int) int {
        return (1000 - priority) * 10
 }
 
-// sbatchCmd
-func sbatchFunc(container arvados.Container) *exec.Cmd {
+func sbatchArgs(container arvados.Container) []string {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
 
        var disk int64
@@ -198,61 +198,22 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
 
-       return exec.Command("sbatch", sbatchArgs...)
-}
-
-// scancelCmd
-func scancelFunc(container arvados.Container) *exec.Cmd {
-       return exec.Command("scancel", "--name="+container.UUID)
-}
-
-// scontrolCmd
-func scontrolFunc(container arvados.Container) *exec.Cmd {
-       return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+       return sbatchArgs
 }
 
-// Wrap these so that they can be overridden by tests
-var sbatchCmd = sbatchFunc
-var scancelCmd = scancelFunc
-var scontrolCmd = scontrolFunc
-
-// Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
-       cmd := sbatchCmd(container)
-
-       // Send a tiny script on stdin to execute the crunch-run
-       // command (slurm requires this to be a #! script)
-
        // append() here avoids modifying crunchRunCommand's
        // underlying array, which is shared with other goroutines.
-       args := append([]string(nil), crunchRunCommand...)
-       args = append(args, container.UUID)
-       cmd.Stdin = strings.NewReader(execScript(args))
-
-       var stdout, stderr bytes.Buffer
-       cmd.Stdout = &stdout
-       cmd.Stderr = &stderr
+       crArgs := append([]string(nil), crunchRunCommand...)
+       crArgs = append(crArgs, container.UUID)
+       crScript := strings.NewReader(execScript(crArgs))
 
-       // Mutex between squeue sync and running sbatch or scancel.
        sqCheck.L.Lock()
        defer sqCheck.L.Unlock()
 
-       log.Printf("exec sbatch %+q", cmd.Args)
-       err := cmd.Run()
-
-       switch err.(type) {
-       case nil:
-               log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
-               return nil
-
-       case *exec.ExitError:
-               dispatcher.Unlock(container.UUID)
-               return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
-
-       default:
-               dispatcher.Unlock(container.UUID)
-               return fmt.Errorf("exec failed: %v", err)
-       }
+       sbArgs := sbatchArgs(container)
+       log.Printf("running sbatch %+q", sbArgs)
+       return theConfig.slurm.Batch(crScript, sbArgs)
 }
 
 // Submit a container to the slurm queue (or resume monitoring if it's
@@ -313,10 +274,8 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
                        } else if updated.Priority == 0 {
                                log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
                                scancel(ctr)
-                       } else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
-                               // dynamically adjust priority
-                               log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
-                               scontrolUpdate(updated)
+                       } else {
+                               renice(updated)
                        }
                }
        }
@@ -324,12 +283,11 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
 
 func scancel(ctr arvados.Container) {
        sqCheck.L.Lock()
-       cmd := scancelCmd(ctr)
-       msg, err := cmd.CombinedOutput()
+       err := theConfig.slurm.Cancel(ctr.UUID)
        sqCheck.L.Unlock()
 
        if err != nil {
-               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               log.Printf("scancel: %s", err)
                time.Sleep(time.Second)
        } else if sqCheck.HasUUID(ctr.UUID) {
                log.Printf("container %s is still in squeue after scancel", ctr.UUID)
@@ -337,17 +295,24 @@ func scancel(ctr arvados.Container) {
        }
 }
 
-func scontrolUpdate(ctr arvados.Container) {
+func renice(ctr arvados.Container) {
+       nice := niceness(ctr.Priority)
+       oldnice := sqCheck.GetNiceness(ctr.UUID)
+       if nice == oldnice || oldnice == -1 {
+               return
+       }
+       log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
        sqCheck.L.Lock()
-       cmd := scontrolCmd(ctr)
-       msg, err := cmd.CombinedOutput()
+       err := theConfig.slurm.Renice(ctr.UUID, nice)
        sqCheck.L.Unlock()
 
        if err != nil {
-               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               log.Printf("renice: %s", err)
                time.Sleep(time.Second)
-       } else if sqCheck.HasUUID(ctr.UUID) {
-               log.Printf("Container %s priority is now %v, niceness is now %v",
+               return
+       }
+       if sqCheck.HasUUID(ctr.UUID) {
+               log.Printf("container %s has arvados priority %d, slurm nice %d",
                        ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
        }
 }
index a823755379574155420e465b2ec2b72f234a3024..830976d66bdd9e3dbce855936a241b7b83ec8e7b 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "bytes"
        "context"
+       "errors"
        "fmt"
        "io"
        "io/ioutil"
@@ -64,51 +65,55 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
        arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) integrationTest(c *C,
-       newSqueueCmd func() *exec.Cmd,
-       newScancelCmd func(arvados.Container) *exec.Cmd,
-       newSbatchCmd func(arvados.Container) *exec.Cmd,
-       newScontrolCmd func(arvados.Container) *exec.Cmd,
-       sbatchCmdComps []string,
-       runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
-       arvadostest.ResetEnv()
+type slurmFake struct {
+       didBatch  [][]string
+       didCancel []string
+       didRenice [][]string
+       queue     string
+       // If non-nil, run this func during the 2nd+ call to Cancel()
+       onCancel func()
+       // Error returned by Batch()
+       errBatch error
+}
 
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, IsNil)
+func (sf *slurmFake) Batch(script io.Reader, args []string) error {
+       sf.didBatch = append(sf.didBatch, args)
+       return sf.errBatch
+}
 
-       var sbatchCmdLine []string
+func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd {
+       return exec.Command("echo", sf.queue)
+}
 
-       // Override sbatchCmd
-       defer func(orig func(arvados.Container) *exec.Cmd) {
-               sbatchCmd = orig
-       }(sbatchCmd)
+func (sf *slurmFake) Renice(name string, nice int) error {
+       sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
+       return nil
+}
 
-       if newSbatchCmd != nil {
-               sbatchCmd = newSbatchCmd
-       } else {
-               sbatchCmd = func(container arvados.Container) *exec.Cmd {
-                       sbatchCmdLine = sbatchFunc(container).Args
-                       return exec.Command("sh")
-               }
+func (sf *slurmFake) Cancel(name string) error {
+       sf.didCancel = append(sf.didCancel, name)
+       if len(sf.didCancel) == 1 {
+               // simulate error on first attempt
+               return errors.New("something terrible happened")
+       }
+       if sf.onCancel != nil {
+               sf.onCancel()
        }
+       return nil
+}
 
-       // Override squeueCmd
-       defer func(orig func() *exec.Cmd) {
-               squeueCmd = orig
-       }(squeueCmd)
-       squeueCmd = newSqueueCmd
+func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+       expectBatch [][]string,
+       runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+       arvadostest.ResetEnv()
 
-       // Override scancel
-       defer func(orig func(arvados.Container) *exec.Cmd) {
-               scancelCmd = orig
-       }(scancelCmd)
-       scancelCmd = newScancelCmd
+       arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, IsNil)
 
-       // Override scontrol
-       defer func(orig func(arvados.Container) *exec.Cmd) {
-               scontrolCmd = orig
-       }(scontrolCmd)
-       scontrolCmd = newScontrolCmd
+       defer func(orig Slurm) {
+               theConfig.slurm = orig
+       }(theConfig.slurm)
+       theConfig.slurm = slurm
 
        // There should be one queued container
        params := arvadosclient.Dict{
@@ -130,6 +135,7 @@ func (s *TestSuite) integrationTest(c *C,
                RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
                        go func() {
                                runContainer(disp, ctr)
+                               slurm.queue = ""
                                doneRun <- struct{}{}
                        }()
                        run(disp, ctr, status)
@@ -145,7 +151,7 @@ func (s *TestSuite) integrationTest(c *C,
 
        sqCheck.Stop()
 
-       c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+       c.Check(slurm.didBatch, DeepEquals, expectBatch)
 
        // There should be no queued containers now
        err = arv.List("containers", params, &containers)
@@ -160,77 +166,47 @@ func (s *TestSuite) integrationTest(c *C,
 }
 
 func (s *TestSuite) TestIntegrationNormal(c *C) {
-       done := false
        container := s.integrationTest(c,
-               func() *exec.Cmd {
-                       if done {
-                               return exec.Command("true")
-                       } else {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-                       }
-               },
-               nil,
+               &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
                nil,
-               nil,
-               []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        time.Sleep(3 * time.Second)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
-                       done = true
                })
        c.Check(container.State, Equals, arvados.ContainerStateComplete)
 }
 
 func (s *TestSuite) TestIntegrationCancel(c *C) {
-       var cmd *exec.Cmd
-       var scancelCmdLine []string
-       attempt := 0
-
+       slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+       readyToCancel := make(chan bool)
+       slurm.onCancel = func() { <-readyToCancel }
        container := s.integrationTest(c,
-               func() *exec.Cmd {
-                       if cmd != nil && cmd.ProcessState != nil {
-                               return exec.Command("true")
-                       } else {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-                       }
-               },
-               func(container arvados.Container) *exec.Cmd {
-                       if attempt++; attempt == 1 {
-                               return exec.Command("false")
-                       } else {
-                               scancelCmdLine = scancelFunc(container).Args
-                               cmd = exec.Command("echo")
-                               return cmd
-                       }
-               },
-               nil,
+               slurm,
                nil,
-               []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
-                       time.Sleep(1 * time.Second)
+                       time.Sleep(time.Second)
                        dispatcher.Arv.Update("containers", container.UUID,
                                arvadosclient.Dict{
                                        "container": arvadosclient.Dict{"priority": 0}},
                                nil)
+                       readyToCancel <- true
+                       close(readyToCancel)
                })
        c.Check(container.State, Equals, arvados.ContainerStateCancelled)
-       c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
+       c.Check(len(slurm.didCancel) > 1, Equals, true)
+       c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
 }
 
 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
-       container := s.integrationTest(c,
-               func() *exec.Cmd { return exec.Command("echo") },
-               nil,
-               nil,
-               nil,
-               []string{"sbatch",
+       container := s.integrationTest(c, &slurmFake{},
+               [][]string{{
                        fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
                        fmt.Sprintf("--mem=%d", 11445),
                        fmt.Sprintf("--cpus-per-task=%d", 4),
                        fmt.Sprintf("--tmp=%d", 45777),
-                       fmt.Sprintf("--nice=%d", 9990)},
+                       fmt.Sprintf("--nice=%d", 9990)}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        time.Sleep(3 * time.Second)
@@ -241,13 +217,8 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
 
 func (s *TestSuite) TestSbatchFail(c *C) {
        container := s.integrationTest(c,
-               func() *exec.Cmd { return exec.Command("echo") },
-               nil,
-               func(container arvados.Container) *exec.Cmd {
-                       return exec.Command("false")
-               },
-               nil,
-               []string(nil),
+               &slurmFake{errBatch: errors.New("something terrible happened")},
+               [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
@@ -387,71 +358,47 @@ func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
 }
 
 func testSbatchFuncWithArgs(c *C, args []string) {
+       defer func() { theConfig.SbatchArguments = nil }()
        theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
 
        container := arvados.Container{
                UUID:               "123",
                RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
                Priority:           1}
-       sbatchCmd := sbatchFunc(container)
 
        var expected []string
-       expected = append(expected, "sbatch")
        expected = append(expected, theConfig.SbatchArguments...)
        expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
-
-       c.Check(sbatchCmd.Args, DeepEquals, expected)
+       c.Check(sbatchArgs(container), DeepEquals, expected)
 }
 
 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
-       theConfig.SbatchArguments = nil
        container := arvados.Container{
                UUID:                 "123",
                RuntimeConstraints:   arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
                SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
                Priority:             1}
-       sbatchCmd := sbatchFunc(container)
 
-       var expected []string
-       expected = append(expected, "sbatch")
-       expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990", "--partition=blurb,b2")
-
-       c.Check(sbatchCmd.Args, DeepEquals, expected)
+       c.Check(sbatchArgs(container), DeepEquals, []string{
+               "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
+               "--partition=blurb,b2",
+       })
 }
 
 func (s *TestSuite) TestIntegrationChangePriority(c *C) {
-       var scontrolCmdLine []string
-       step := 0
-
-       container := s.integrationTest(c,
-               func() *exec.Cmd {
-                       if step == 0 {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-                       } else if step == 1 {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 4000 100")
-                       } else {
-                               return exec.Command("echo")
-                       }
-               },
-               func(arvados.Container) *exec.Cmd { return exec.Command("true") },
-               nil,
-               func(container arvados.Container) *exec.Cmd {
-                       scontrolCmdLine = scontrolFunc(container).Args
-                       step = 1
-                       return exec.Command("true")
-               },
-               []string(nil),
+       slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+       container := s.integrationTest(c, slurm, nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
-                       time.Sleep(1 * time.Second)
+                       time.Sleep(time.Second)
                        dispatcher.Arv.Update("containers", container.UUID,
                                arvadosclient.Dict{
                                        "container": arvadosclient.Dict{"priority": 600}},
                                nil)
-                       time.Sleep(1 * time.Second)
-                       step = 2
+                       time.Sleep(time.Second)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
                })
        c.Check(container.State, Equals, arvados.ContainerStateComplete)
-       c.Check(scontrolCmdLine, DeepEquals, []string{"scontrol", "update", "JobName=zzzzz-dz642-queuedcontainer", "Nice=4000"})
+       c.Assert(len(slurm.didRenice), Not(Equals), 0)
+       c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
 }
diff --git a/services/crunch-dispatch-slurm/slurm.go b/services/crunch-dispatch-slurm/slurm.go
new file mode 100644 (file)
index 0000000..bd19377
--- /dev/null
@@ -0,0 +1,73 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "fmt"
+       "io"
+       "log"
+       "os/exec"
+       "strings"
+)
+
+type Slurm interface {
+       Cancel(name string) error
+       Renice(name string, nice int) error
+       QueueCommand(args []string) *exec.Cmd
+       Batch(script io.Reader, args []string) error
+}
+
+type slurmCLI struct{}
+
+func (scli *slurmCLI) Batch(script io.Reader, args []string) error {
+       return scli.run(script, "sbatch", args)
+}
+
+func (scli *slurmCLI) Cancel(name string) error {
+       for _, args := range [][]string{
+               // If the slurm job hasn't started yet, remove it from
+               // the queue.
+               {"--state=pending"},
+               // If the slurm job has started, send SIGTERM. If we
+               // cancel a running job without a --signal argument,
+               // slurm will send SIGTERM and then (after some
+               // site-configured interval) SIGKILL. This would kill
+               // crunch-run without stopping the container, which we
+               // don't want.
+               {"--batch", "--signal=TERM", "--state=running"},
+               {"--batch", "--signal=TERM", "--state=suspended"},
+       } {
+               err := scli.run(nil, "scancel", append([]string{"--name=" + name}, args...))
+               if err != nil {
+                       // scancel exits 0 if no job matches the given
+                       // name and state. Any error from scancel here
+                       // really indicates something is wrong.
+                       return err
+               }
+       }
+       return nil
+}
+
+func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd {
+       return exec.Command("squeue", args...)
+}
+
+func (scli *slurmCLI) Renice(name string, nice int) error {
+       return scli.run(nil, "scontrol", []string{"update", "JobName=" + name, fmt.Sprintf("Nice=%d", nice)})
+}
+
+func (scli *slurmCLI) run(stdin io.Reader, prog string, args []string) error {
+       cmd := exec.Command(prog, args...)
+       cmd.Stdin = stdin
+       out, err := cmd.CombinedOutput()
+       outTrim := strings.TrimSpace(string(out))
+       if err != nil || len(out) > 0 {
+               log.Printf("%q %q: %q", cmd.Path, cmd.Args, outTrim)
+       }
+       if err != nil {
+               err = fmt.Errorf("%s: %s (%q)", cmd.Path, err, outTrim)
+       }
+       return err
+}
index 819c2d2510f992ab75c18249d8b46d6c13828d6d..5ecfe8ff2fc049201eb27c8e58c7a02eb84cbbb4 100644 (file)
@@ -8,7 +8,6 @@ import (
        "bytes"
        "fmt"
        "log"
-       "os/exec"
        "strings"
        "sync"
        "time"
@@ -29,12 +28,6 @@ type SqueueChecker struct {
        sync.Cond
 }
 
-func squeueFunc() *exec.Cmd {
-       return exec.Command("squeue", "--all", "--format=%j %y %Q")
-}
-
-var squeueCmd = squeueFunc
-
 // HasUUID checks if a given container UUID is in the slurm queue.
 // This does not run squeue directly, but instead blocks until woken
 // up by next successful update of squeue.
@@ -84,7 +77,7 @@ func (sqc *SqueueChecker) check() {
        sqc.L.Lock()
        defer sqc.L.Unlock()
 
-       cmd := squeueCmd()
+       cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
        stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
        cmd.Stdout, cmd.Stderr = stdout, stderr
        if err := cmd.Run(); err != nil {
index b480c068c597ecc178b08be7278af2edbd72bce9..7eefb1aaaded35be0b4ab99a03f4d82e30076c06 100644 (file)
@@ -6,7 +6,6 @@ package main
 
 import (
        "bytes"
-       "context"
        "encoding/json"
        "errors"
        "flag"
@@ -33,6 +32,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "golang.org/x/net/context"
 
        dockertypes "github.com/docker/docker/api/types"
        dockercontainer "github.com/docker/docker/api/types/container"
@@ -75,60 +75,13 @@ type ThinDockerClient interface {
        ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
                networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
        ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
-       ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+       ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
        ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
        ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
        ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
        ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
 }
 
-// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
-// that executes the docker requests on dockerclient.Client
-type ThinDockerClientProxy struct {
-       Docker *dockerclient.Client
-}
-
-// ContainerAttach invokes dockerclient.Client.ContainerAttach
-func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
-       return proxy.Docker.ContainerAttach(ctx, container, options)
-}
-
-// ContainerCreate invokes dockerclient.Client.ContainerCreate
-func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
-       networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
-       return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
-}
-
-// ContainerStart invokes dockerclient.Client.ContainerStart
-func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
-       return proxy.Docker.ContainerStart(ctx, container, options)
-}
-
-// ContainerStop invokes dockerclient.Client.ContainerStop
-func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
-       return proxy.Docker.ContainerStop(ctx, container, timeout)
-}
-
-// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
-       return proxy.Docker.ContainerWait(ctx, container, condition)
-}
-
-// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
-func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
-       return proxy.Docker.ImageInspectWithRaw(ctx, image)
-}
-
-// ImageLoad invokes dockerclient.Client.ImageLoad
-func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
-       return proxy.Docker.ImageLoad(ctx, input, quiet)
-}
-
-// ImageRemove invokes dockerclient.Client.ImageRemove
-func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
-       return proxy.Docker.ImageRemove(ctx, image, options)
-}
-
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
@@ -180,7 +133,6 @@ type ContainerRunner struct {
        setCgroupParent string
 
        cStateLock sync.Mutex
-       cStarted   bool // StartContainer() succeeded
        cCancelled bool // StopContainer() invoked
 
        enableNetwork string // one of "default" or "always"
@@ -197,11 +149,10 @@ func (runner *ContainerRunner) setupSignals() {
        signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
        go func(sig chan os.Signal) {
-               s := <-sig
-               if s != nil {
-                       runner.CrunchLog.Printf("Caught signal %v", s)
+               for s := range sig {
+                       runner.CrunchLog.Printf("caught signal: %v", s)
+                       runner.stop()
                }
-               runner.stop()
        }(runner.SigChan)
 }
 
@@ -209,25 +160,20 @@ func (runner *ContainerRunner) setupSignals() {
 func (runner *ContainerRunner) stop() {
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
-       if runner.cCancelled {
+       if runner.ContainerID == "" {
                return
        }
        runner.cCancelled = true
-       if runner.cStarted {
-               timeout := time.Duration(10)
-               err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
-               if err != nil {
-                       runner.CrunchLog.Printf("StopContainer failed: %s", err)
-               }
-               // Suppress multiple calls to stop()
-               runner.cStarted = false
+       runner.CrunchLog.Printf("removing container")
+       err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+       if err != nil {
+               runner.CrunchLog.Printf("error removing container: %s", err)
        }
 }
 
 func (runner *ContainerRunner) stopSignals() {
        if runner.SigChan != nil {
                signal.Stop(runner.SigChan)
-               close(runner.SigChan)
        }
 }
 
@@ -636,7 +582,6 @@ func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
                                }
                        }
 
-                       runner.loggingDone <- true
                        close(runner.loggingDone)
                        return
                }
@@ -973,46 +918,38 @@ func (runner *ContainerRunner) StartContainer() error {
                }
                return fmt.Errorf("could not start container: %v%s", err, advice)
        }
-       runner.cStarted = true
        return nil
 }
 
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() (err error) {
+func (runner *ContainerRunner) WaitFinish() error {
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+       arvMountExit := runner.ArvMountExit
+       for {
+               select {
+               case waitBody := <-waitOk:
+                       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+                       code := int(waitBody.StatusCode)
+                       runner.ExitCode = &code
+
+                       // wait for stdout/stderr to complete
+                       <-runner.loggingDone
+                       return nil
+
+               case err := <-waitErr:
+                       return fmt.Errorf("container wait: %v", err)
 
-       go func() {
-               <-runner.ArvMountExit
-               if runner.cStarted {
+               case <-arvMountExit:
                        runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
                        runner.stop()
+                       // arvMountExit will always be ready now that
+                       // it's closed, but that doesn't interest us.
+                       arvMountExit = nil
                }
-       }()
-
-       var waitBody dockercontainer.ContainerWaitOKBody
-       select {
-       case waitBody = <-waitOk:
-       case err = <-waitErr:
        }
-
-       // Container isn't running any more
-       runner.cStarted = false
-
-       if err != nil {
-               return fmt.Errorf("container wait: %v", err)
-       }
-
-       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
-       code := int(waitBody.StatusCode)
-       runner.ExitCode = &code
-
-       // wait for stdout/stderr to complete
-       <-runner.loggingDone
-
-       return nil
 }
 
 var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
@@ -1191,10 +1128,6 @@ func (runner *ContainerRunner) UploadOutputFile(
 
 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
 func (runner *ContainerRunner) CaptureOutput() error {
-       if runner.finalState != "Complete" {
-               return nil
-       }
-
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
                // Output may have been set directly by the container, so
                // refresh the container record to check.
@@ -1647,7 +1580,7 @@ func (runner *ContainerRunner) Run() (err error) {
        }
 
        err = runner.WaitFinish()
-       if err == nil {
+       if err == nil && !runner.IsCancelled() {
                runner.finalState = "Complete"
        }
        return
@@ -1739,10 +1672,8 @@ func main() {
        // API version 1.21 corresponds to Docker 1.9, which is currently the
        // minimum version we want to support.
        docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-       dockerClientProxy := ThinDockerClientProxy{Docker: docker}
-
-       cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
 
+       cr := NewContainerRunner(api, kc, docker, containerId)
        if dockererr != nil {
                cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
                cr.checkBrokenNode(dockererr)
index 4979cf8a0c801ee5bc56e48933e97f736dc3d00d..22989bb2ece510e6f239151b267968fde25f1203 100644 (file)
@@ -7,7 +7,6 @@ package main
 import (
        "bufio"
        "bytes"
-       "context"
        "crypto/md5"
        "encoding/json"
        "errors"
@@ -30,6 +29,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "golang.org/x/net/context"
 
        dockertypes "github.com/docker/docker/api/types"
        dockercontainer "github.com/docker/docker/api/types/container"
@@ -42,11 +42,17 @@ func TestCrunchExec(t *testing.T) {
        TestingT(t)
 }
 
-type TestSuite struct{}
-
 // Gocheck boilerplate
 var _ = Suite(&TestSuite{})
 
+type TestSuite struct {
+       docker *TestDockerClient
+}
+
+func (s *TestSuite) SetUpTest(c *C) {
+       s.docker = NewTestDockerClient()
+}
+
 type ArvTestClient struct {
        Total   int64
        Calls   int
@@ -88,18 +94,18 @@ type TestDockerClient struct {
        logReader   io.ReadCloser
        logWriter   io.WriteCloser
        fn          func(t *TestDockerClient)
-       finish      int
+       exitCode    int
        stop        chan bool
        cwd         string
        env         []string
        api         *ArvTestClient
        realTemp    string
+       calledWait  bool
 }
 
-func NewTestDockerClient(exitCode int) *TestDockerClient {
+func NewTestDockerClient() *TestDockerClient {
        t := &TestDockerClient{}
        t.logReader, t.logWriter = io.Pipe()
-       t.finish = exitCode
        t.stop = make(chan bool, 1)
        t.cwd = "/"
        return t
@@ -131,16 +137,16 @@ func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockerco
 }
 
 func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
-       if t.finish == 3 {
+       if t.exitCode == 3 {
                return errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`)
        }
-       if t.finish == 4 {
+       if t.exitCode == 4 {
                return errors.New(`panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`)
        }
-       if t.finish == 5 {
+       if t.exitCode == 5 {
                return errors.New(`Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`)
        }
-       if t.finish == 6 {
+       if t.exitCode == 6 {
                return errors.New(`Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`)
        }
 
@@ -152,25 +158,24 @@ func (t *TestDockerClient) ContainerStart(ctx context.Context, container string,
        }
 }
 
-func (t *TestDockerClient) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+func (t *TestDockerClient) ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error {
        t.stop <- true
        return nil
 }
 
 func (t *TestDockerClient) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
-       body := make(chan dockercontainer.ContainerWaitOKBody)
+       t.calledWait = true
+       body := make(chan dockercontainer.ContainerWaitOKBody, 1)
        err := make(chan error)
        go func() {
                t.fn(t)
-               body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.finish)}
-               close(body)
-               close(err)
+               body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.exitCode)}
        }()
        return body, err
 }
 
 func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
-       if t.finish == 2 {
+       if t.exitCode == 2 {
                return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
        }
 
@@ -182,7 +187,7 @@ func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string
 }
 
 func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
-       if t.finish == 2 {
+       if t.exitCode == 2 {
                return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
        }
        _, err := io.Copy(ioutil.Discard, input)
@@ -396,8 +401,7 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
 
 func (s *TestSuite) TestLoadImage(c *C) {
        kc := &KeepTestClient{}
-       docker := NewTestDockerClient(0)
-       cr := NewContainerRunner(&ArvTestClient{}, kc, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
        _, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
@@ -512,8 +516,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
        // (2) Keep error
-       docker := NewTestDockerClient(0)
-       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.Container.ContainerImage = hwPDH
 
        err := cr.LoadImage()
@@ -531,8 +534,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
        // (4) Collection doesn't contain image
-       docker := NewTestDockerClient(0)
-       cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.Container.ContainerImage = hwPDH
 
        err := cr.LoadImage()
@@ -572,12 +574,11 @@ func dockerLog(fd byte, msg string) []byte {
 }
 
 func (s *TestSuite) TestRunContainer(c *C) {
-       docker := NewTestDockerClient(0)
-       docker.fn = func(t *TestDockerClient) {
+       s.docker.fn = func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, "Hello world\n"))
                t.logWriter.Close()
        }
-       cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
        var logs TestLogs
        cr.NewLogWriter = logs.NewTestLoggingWriter
@@ -667,18 +668,18 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
 
 // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
 // dress rehearsal of the Run() function, starting from a JSON container record.
-func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
+func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
        rec := arvados.Container{}
        err := json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient(exitCode)
-       docker.fn = fn
-       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+       s.docker.exitCode = exitCode
+       s.docker.fn = fn
+       s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api = &ArvTestClient{Container: rec}
-       docker.api = api
-       cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       s.docker.api = api
+       cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.statInterval = 100 * time.Millisecond
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
@@ -687,7 +688,7 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f
        c.Assert(err, IsNil)
        defer os.RemoveAll(realTemp)
 
-       docker.realTemp = realTemp
+       s.docker.realTemp = realTemp
 
        tempcount := 0
        cr.MkTempDir = func(_ string, prefix string) (string, error) {
@@ -730,7 +731,7 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f
 }
 
 func (s *TestSuite) TestFullRunHello(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -751,7 +752,7 @@ func (s *TestSuite) TestFullRunHello(c *C) {
 }
 
 func (s *TestSuite) TestCrunchstat(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
                "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
                "cwd": ".",
@@ -784,7 +785,7 @@ func (s *TestSuite) TestCrunchstat(c *C) {
 
 func (s *TestSuite) TestNodeInfoLog(c *C) {
        os.Setenv("SLURMD_NODENAME", "compute2")
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
                "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
                "cwd": ".",
@@ -818,7 +819,7 @@ func (s *TestSuite) TestNodeInfoLog(c *C) {
 }
 
 func (s *TestSuite) TestContainerRecordLog(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
                "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
                "cwd": ".",
@@ -841,7 +842,7 @@ func (s *TestSuite) TestContainerRecordLog(c *C) {
 }
 
 func (s *TestSuite) TestFullRunStderr(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -866,7 +867,7 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
 }
 
 func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["pwd"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -887,7 +888,7 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
 }
 
 func (s *TestSuite) TestFullRunSetCwd(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["pwd"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -909,7 +910,7 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
 func (s *TestSuite) TestStopOnSignal(c *C) {
        s.testStopContainer(c, func(cr *ContainerRunner) {
                go func() {
-                       for !cr.cStarted {
+                       for !s.docker.calledWait {
                                time.Sleep(time.Millisecond)
                        }
                        cr.SigChan <- syscall.SIGINT
@@ -943,16 +944,15 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
        err := json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient(0)
-       docker.fn = func(t *TestDockerClient) {
+       s.docker.fn = func(t *TestDockerClient) {
                <-t.stop
                t.logWriter.Write(dockerLog(1, "foo\n"))
                t.logWriter.Close()
        }
-       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+       s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api := &ArvTestClient{Container: rec}
-       cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
        setup(cr)
 
@@ -978,7 +978,7 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
 }
 
 func (s *TestSuite) TestFullRunSetEnv(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $FROBIZ"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -1359,7 +1359,7 @@ func (s *TestSuite) TestStdout(c *C) {
                "runtime_constraints": {}
        }`
 
-       api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1370,17 +1370,16 @@ func (s *TestSuite) TestStdout(c *C) {
 }
 
 // Used by the TestStdoutWithWrongPath*()
-func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
+func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
        rec := arvados.Container{}
        err = json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient(0)
-       docker.fn = fn
-       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+       s.docker.fn = fn
+       s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api = &ArvTestClient{Container: rec}
-       cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
 
@@ -1389,7 +1388,7 @@ func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (ap
 }
 
 func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
-       _, _, err := StdoutErrorRunHelper(c, `{
+       _, _, err := s.stdoutErrorRunHelper(c, `{
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} },
     "output_path": "/tmp"
 }`, func(t *TestDockerClient) {})
@@ -1399,7 +1398,7 @@ func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
 }
 
 func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
-       _, _, err := StdoutErrorRunHelper(c, `{
+       _, _, err := s.stdoutErrorRunHelper(c, `{
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} },
     "output_path": "/tmp"
 }`, func(t *TestDockerClient) {})
@@ -1409,7 +1408,7 @@ func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
 }
 
 func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
-       _, _, err := StdoutErrorRunHelper(c, `{
+       _, _, err := s.stdoutErrorRunHelper(c, `{
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} },
     "output_path": "/tmp"
 }`, func(t *TestDockerClient) {})
@@ -1421,7 +1420,7 @@ func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
 func (s *TestSuite) TestFullRunWithAPI(c *C) {
        os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
        defer os.Unsetenv("ARVADOS_API_HOST")
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -1444,7 +1443,7 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
 func (s *TestSuite) TestFullRunSetOutput(c *C) {
        os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
        defer os.Unsetenv("ARVADOS_API_HOST")
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -1484,7 +1483,7 @@ func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C
 
        extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1519,7 +1518,7 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
                "a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
        }
 
-       api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, runner, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1571,7 +1570,7 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
                "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
        }
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1612,7 +1611,7 @@ func (s *TestSuite) TestOutputSymlinkToInput(c *C) {
                "a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
        }
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                os.Symlink("/keep/foo/sub1file2", t.realTemp+"/2/baz")
                os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/2/baz2")
                os.Symlink("/keep/foo2/subdir1", t.realTemp+"/2/baz3")
@@ -1654,7 +1653,7 @@ func (s *TestSuite) TestOutputError(c *C) {
 
        extraMounts := []string{}
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                os.Symlink("/etc/hosts", t.realTemp+"/2/baz")
                t.logWriter.Close()
        })
@@ -1678,7 +1677,7 @@ func (s *TestSuite) TestOutputSymlinkToOutput(c *C) {
 
        extraMounts := []string{}
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                rf, _ := os.Create(t.realTemp + "/2/realfile")
                rf.Write([]byte("foo"))
                rf.Close()
@@ -1735,7 +1734,7 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
                "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
        }
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1770,7 +1769,7 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
                "runtime_constraints": {}
        }`
 
-       api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1790,7 +1789,7 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
 }
 
 func (s *TestSuite) TestStderrMount(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo hello;exit 1"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1887,7 +1886,7 @@ exec echo killme
        ech := tf.Name()
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1912,7 +1911,7 @@ func (s *TestSuite) TestFullBrokenDocker2(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1935,7 +1934,7 @@ func (s *TestSuite) TestFullBrokenDocker3(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1957,7 +1956,7 @@ func (s *TestSuite) TestBadCommand1(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1979,7 +1978,7 @@ func (s *TestSuite) TestBadCommand2(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -2001,7 +2000,7 @@ func (s *TestSuite) TestBadCommand3(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",