// Dispatcher service for Crunch that submits containers to the slurm queue.
import (
- "bytes"
"context"
"flag"
"fmt"
"log"
"math"
"os"
- "os/exec"
"regexp"
"strings"
"time"
// Minimum time between two attempts to run the same container
MinRetryPeriod arvados.Duration
+
+ slurm Slurm
}
func main() {
+ theConfig.slurm = &slurmCLI{}
err := doMain()
if err != nil {
log.Fatal(err)
return (1000 - priority) * 10
}
-// sbatchCmd
-func sbatchFunc(container arvados.Container) *exec.Cmd {
+func sbatchArgs(container arvados.Container) []string {
mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
var disk int64
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
- return exec.Command("sbatch", sbatchArgs...)
-}
-
-// scancelCmd
-func scancelFunc(container arvados.Container) *exec.Cmd {
- return exec.Command("scancel", "--name="+container.UUID)
-}
-
-// scontrolCmd
-func scontrolFunc(container arvados.Container) *exec.Cmd {
- return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+ return sbatchArgs
}
-// Wrap these so that they can be overridden by tests
-var sbatchCmd = sbatchFunc
-var scancelCmd = scancelFunc
-var scontrolCmd = scontrolFunc
-
-// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
- cmd := sbatchCmd(container)
-
- // Send a tiny script on stdin to execute the crunch-run
- // command (slurm requires this to be a #! script)
-
// append() here avoids modifying crunchRunCommand's
// underlying array, which is shared with other goroutines.
- args := append([]string(nil), crunchRunCommand...)
- args = append(args, container.UUID)
- cmd.Stdin = strings.NewReader(execScript(args))
-
- var stdout, stderr bytes.Buffer
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
+ crArgs := append([]string(nil), crunchRunCommand...)
+ crArgs = append(crArgs, container.UUID)
+ crScript := strings.NewReader(execScript(crArgs))
- // Mutex between squeue sync and running sbatch or scancel.
sqCheck.L.Lock()
defer sqCheck.L.Unlock()
- log.Printf("exec sbatch %+q", cmd.Args)
- err := cmd.Run()
-
- switch err.(type) {
- case nil:
- log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
- return nil
-
- case *exec.ExitError:
- dispatcher.Unlock(container.UUID)
- return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
-
- default:
- dispatcher.Unlock(container.UUID)
- return fmt.Errorf("exec failed: %v", err)
- }
+ sbArgs := sbatchArgs(container)
+ log.Printf("running sbatch %+q", sbArgs)
+ return theConfig.slurm.Batch(crScript, sbArgs)
}
// Submit a container to the slurm queue (or resume monitoring if it's
} else if updated.Priority == 0 {
log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
scancel(ctr)
- } else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
- // dynamically adjust priority
- log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
- scontrolUpdate(updated)
+ } else {
+ renice(updated)
}
}
}
func scancel(ctr arvados.Container) {
sqCheck.L.Lock()
- cmd := scancelCmd(ctr)
- msg, err := cmd.CombinedOutput()
+ err := theConfig.slurm.Cancel(ctr.UUID)
sqCheck.L.Unlock()
if err != nil {
- log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+ log.Printf("scancel: %s", err)
time.Sleep(time.Second)
} else if sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s is still in squeue after scancel", ctr.UUID)
}
}
-func scontrolUpdate(ctr arvados.Container) {
+func renice(ctr arvados.Container) {
+ nice := niceness(ctr.Priority)
+ oldnice := sqCheck.GetNiceness(ctr.UUID)
+ if nice == oldnice || oldnice == -1 {
+ return
+ }
+ log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
sqCheck.L.Lock()
- cmd := scontrolCmd(ctr)
- msg, err := cmd.CombinedOutput()
+ err := theConfig.slurm.Renice(ctr.UUID, nice)
sqCheck.L.Unlock()
if err != nil {
- log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+ log.Printf("renice: %s", err)
time.Sleep(time.Second)
- } else if sqCheck.HasUUID(ctr.UUID) {
- log.Printf("Container %s priority is now %v, niceness is now %v",
+ return
+ }
+ if sqCheck.HasUUID(ctr.UUID) {
+ log.Printf("container %s has arvados priority %d, slurm nice %d",
ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
}
}
import (
"bytes"
"context"
+ "errors"
"fmt"
"io"
"io/ioutil"
arvadostest.ResetEnv()
}
-func (s *TestSuite) integrationTest(c *C,
- newSqueueCmd func() *exec.Cmd,
- newScancelCmd func(arvados.Container) *exec.Cmd,
- newSbatchCmd func(arvados.Container) *exec.Cmd,
- newScontrolCmd func(arvados.Container) *exec.Cmd,
- sbatchCmdComps []string,
- runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
- arvadostest.ResetEnv()
+type slurmFake struct {
+ didBatch [][]string
+ didCancel []string
+ didRenice [][]string
+ queue string
+ // If non-nil, run this func during the 2nd+ call to Cancel()
+ onCancel func()
+ // Error returned by Batch()
+ errBatch error
+}
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, IsNil)
+func (sf *slurmFake) Batch(script io.Reader, args []string) error {
+ sf.didBatch = append(sf.didBatch, args)
+ return sf.errBatch
+}
- var sbatchCmdLine []string
+func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd {
+ return exec.Command("echo", sf.queue)
+}
- // Override sbatchCmd
- defer func(orig func(arvados.Container) *exec.Cmd) {
- sbatchCmd = orig
- }(sbatchCmd)
+func (sf *slurmFake) Renice(name string, nice int) error {
+ sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
+ return nil
+}
- if newSbatchCmd != nil {
- sbatchCmd = newSbatchCmd
- } else {
- sbatchCmd = func(container arvados.Container) *exec.Cmd {
- sbatchCmdLine = sbatchFunc(container).Args
- return exec.Command("sh")
- }
+func (sf *slurmFake) Cancel(name string) error {
+ sf.didCancel = append(sf.didCancel, name)
+ if len(sf.didCancel) == 1 {
+ // simulate error on first attempt
+ return errors.New("something terrible happened")
+ }
+ if sf.onCancel != nil {
+ sf.onCancel()
}
+ return nil
+}
- // Override squeueCmd
- defer func(orig func() *exec.Cmd) {
- squeueCmd = orig
- }(squeueCmd)
- squeueCmd = newSqueueCmd
+func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+ expectBatch [][]string,
+ runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+ arvadostest.ResetEnv()
- // Override scancel
- defer func(orig func(arvados.Container) *exec.Cmd) {
- scancelCmd = orig
- }(scancelCmd)
- scancelCmd = newScancelCmd
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, IsNil)
- // Override scontrol
- defer func(orig func(arvados.Container) *exec.Cmd) {
- scontrolCmd = orig
- }(scontrolCmd)
- scontrolCmd = newScontrolCmd
+ defer func(orig Slurm) {
+ theConfig.slurm = orig
+ }(theConfig.slurm)
+ theConfig.slurm = slurm
// There should be one queued container
params := arvadosclient.Dict{
RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
go func() {
runContainer(disp, ctr)
+ slurm.queue = ""
doneRun <- struct{}{}
}()
run(disp, ctr, status)
sqCheck.Stop()
- c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+ c.Check(slurm.didBatch, DeepEquals, expectBatch)
// There should be no queued containers now
err = arv.List("containers", params, &containers)
}
func (s *TestSuite) TestIntegrationNormal(c *C) {
- done := false
container := s.integrationTest(c,
- func() *exec.Cmd {
- if done {
- return exec.Command("true")
- } else {
- return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
- }
- },
- nil,
+ &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
nil,
- nil,
- []string(nil),
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
- done = true
})
c.Check(container.State, Equals, arvados.ContainerStateComplete)
}
func (s *TestSuite) TestIntegrationCancel(c *C) {
- var cmd *exec.Cmd
- var scancelCmdLine []string
- attempt := 0
-
+ slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ readyToCancel := make(chan bool)
+ slurm.onCancel = func() { <-readyToCancel }
container := s.integrationTest(c,
- func() *exec.Cmd {
- if cmd != nil && cmd.ProcessState != nil {
- return exec.Command("true")
- } else {
- return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
- }
- },
- func(container arvados.Container) *exec.Cmd {
- if attempt++; attempt == 1 {
- return exec.Command("false")
- } else {
- scancelCmdLine = scancelFunc(container).Args
- cmd = exec.Command("echo")
- return cmd
- }
- },
- nil,
+ slurm,
nil,
- []string(nil),
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
- time.Sleep(1 * time.Second)
+ time.Sleep(time.Second)
dispatcher.Arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"priority": 0}},
nil)
+ readyToCancel <- true
+ close(readyToCancel)
})
c.Check(container.State, Equals, arvados.ContainerStateCancelled)
- c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
+ c.Check(len(slurm.didCancel) > 1, Equals, true)
+ c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
}
func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
- container := s.integrationTest(c,
- func() *exec.Cmd { return exec.Command("echo") },
- nil,
- nil,
- nil,
- []string{"sbatch",
+ container := s.integrationTest(c, &slurmFake{},
+ [][]string{{
fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
fmt.Sprintf("--mem=%d", 11445),
fmt.Sprintf("--cpus-per-task=%d", 4),
fmt.Sprintf("--tmp=%d", 45777),
- fmt.Sprintf("--nice=%d", 9990)},
+ fmt.Sprintf("--nice=%d", 9990)}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
func (s *TestSuite) TestSbatchFail(c *C) {
container := s.integrationTest(c,
- func() *exec.Cmd { return exec.Command("echo") },
- nil,
- func(container arvados.Container) *exec.Cmd {
- return exec.Command("false")
- },
- nil,
- []string(nil),
+ &slurmFake{errBatch: errors.New("something terrible happened")},
+ [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
}
func testSbatchFuncWithArgs(c *C, args []string) {
+ defer func() { theConfig.SbatchArguments = nil }()
theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
container := arvados.Container{
UUID: "123",
RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
Priority: 1}
- sbatchCmd := sbatchFunc(container)
var expected []string
- expected = append(expected, "sbatch")
expected = append(expected, theConfig.SbatchArguments...)
expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
-
- c.Check(sbatchCmd.Args, DeepEquals, expected)
+ c.Check(sbatchArgs(container), DeepEquals, expected)
}
func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
- theConfig.SbatchArguments = nil
container := arvados.Container{
UUID: "123",
RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
Priority: 1}
- sbatchCmd := sbatchFunc(container)
- var expected []string
- expected = append(expected, "sbatch")
- expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990", "--partition=blurb,b2")
-
- c.Check(sbatchCmd.Args, DeepEquals, expected)
+ c.Check(sbatchArgs(container), DeepEquals, []string{
+ "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
+ "--partition=blurb,b2",
+ })
}
func (s *TestSuite) TestIntegrationChangePriority(c *C) {
- var scontrolCmdLine []string
- step := 0
-
- container := s.integrationTest(c,
- func() *exec.Cmd {
- if step == 0 {
- return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
- } else if step == 1 {
- return exec.Command("echo", "zzzzz-dz642-queuedcontainer 4000 100")
- } else {
- return exec.Command("echo")
- }
- },
- func(arvados.Container) *exec.Cmd { return exec.Command("true") },
- nil,
- func(container arvados.Container) *exec.Cmd {
- scontrolCmdLine = scontrolFunc(container).Args
- step = 1
- return exec.Command("true")
- },
- []string(nil),
+ slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ container := s.integrationTest(c, slurm, nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
- time.Sleep(1 * time.Second)
+ time.Sleep(time.Second)
dispatcher.Arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"priority": 600}},
nil)
- time.Sleep(1 * time.Second)
- step = 2
+ time.Sleep(time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
})
c.Check(container.State, Equals, arvados.ContainerStateComplete)
- c.Check(scontrolCmdLine, DeepEquals, []string{"scontrol", "update", "JobName=zzzzz-dz642-queuedcontainer", "Nice=4000"})
+ c.Assert(len(slurm.didRenice), Not(Equals), 0)
+ c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "fmt"
+ "io"
+ "log"
+ "os/exec"
+ "strings"
+)
+
+type Slurm interface {
+ Cancel(name string) error
+ Renice(name string, nice int) error
+ QueueCommand(args []string) *exec.Cmd
+ Batch(script io.Reader, args []string) error
+}
+
+type slurmCLI struct{}
+
+func (scli *slurmCLI) Batch(script io.Reader, args []string) error {
+ return scli.run(script, "sbatch", args)
+}
+
+func (scli *slurmCLI) Cancel(name string) error {
+ for _, args := range [][]string{
+ // If the slurm job hasn't started yet, remove it from
+ // the queue.
+ {"--state=pending"},
+ // If the slurm job has started, send SIGTERM. If we
+ // cancel a running job without a --signal argument,
+ // slurm will send SIGTERM and then (after some
+ // site-configured interval) SIGKILL. This would kill
+ // crunch-run without stopping the container, which we
+ // don't want.
+ {"--batch", "--signal=TERM", "--state=running"},
+ {"--batch", "--signal=TERM", "--state=suspended"},
+ } {
+ err := scli.run(nil, "scancel", append([]string{"--name=" + name}, args...))
+ if err != nil {
+ // scancel exits 0 if no job matches the given
+ // name and state. Any error from scancel here
+ // really indicates something is wrong.
+ return err
+ }
+ }
+ return nil
+}
+
+func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd {
+ return exec.Command("squeue", args...)
+}
+
+func (scli *slurmCLI) Renice(name string, nice int) error {
+ return scli.run(nil, "scontrol", []string{"update", "JobName=" + name, fmt.Sprintf("Nice=%d", nice)})
+}
+
+func (scli *slurmCLI) run(stdin io.Reader, prog string, args []string) error {
+ cmd := exec.Command(prog, args...)
+ cmd.Stdin = stdin
+ out, err := cmd.CombinedOutput()
+ outTrim := strings.TrimSpace(string(out))
+ if err != nil || len(out) > 0 {
+ log.Printf("%q %q: %q", cmd.Path, cmd.Args, outTrim)
+ }
+ if err != nil {
+ err = fmt.Errorf("%s: %s (%q)", cmd.Path, err, outTrim)
+ }
+ return err
+}
"bytes"
"fmt"
"log"
- "os/exec"
"strings"
"sync"
"time"
sync.Cond
}
-func squeueFunc() *exec.Cmd {
- return exec.Command("squeue", "--all", "--format=%j %y %Q")
-}
-
-var squeueCmd = squeueFunc
-
// HasUUID checks if a given container UUID is in the slurm queue.
// This does not run squeue directly, but instead blocks until woken
// up by next successful update of squeue.
sqc.L.Lock()
defer sqc.L.Unlock()
- cmd := squeueCmd()
+ cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
import (
"bytes"
- "context"
"encoding/json"
"errors"
"flag"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+ "golang.org/x/net/context"
dockertypes "github.com/docker/docker/api/types"
dockercontainer "github.com/docker/docker/api/types/container"
ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
- ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+ ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
}
-// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
-// that executes the docker requests on dockerclient.Client
-type ThinDockerClientProxy struct {
- Docker *dockerclient.Client
-}
-
-// ContainerAttach invokes dockerclient.Client.ContainerAttach
-func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
- return proxy.Docker.ContainerAttach(ctx, container, options)
-}
-
-// ContainerCreate invokes dockerclient.Client.ContainerCreate
-func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
- networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
- return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
-}
-
-// ContainerStart invokes dockerclient.Client.ContainerStart
-func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
- return proxy.Docker.ContainerStart(ctx, container, options)
-}
-
-// ContainerStop invokes dockerclient.Client.ContainerStop
-func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
- return proxy.Docker.ContainerStop(ctx, container, timeout)
-}
-
-// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
- return proxy.Docker.ContainerWait(ctx, container, condition)
-}
-
-// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
-func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
- return proxy.Docker.ImageInspectWithRaw(ctx, image)
-}
-
-// ImageLoad invokes dockerclient.Client.ImageLoad
-func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
- return proxy.Docker.ImageLoad(ctx, input, quiet)
-}
-
-// ImageRemove invokes dockerclient.Client.ImageRemove
-func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
- return proxy.Docker.ImageRemove(ctx, image, options)
-}
-
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
setCgroupParent string
cStateLock sync.Mutex
- cStarted bool // StartContainer() succeeded
cCancelled bool // StopContainer() invoked
enableNetwork string // one of "default" or "always"
signal.Notify(runner.SigChan, syscall.SIGQUIT)
go func(sig chan os.Signal) {
- s := <-sig
- if s != nil {
- runner.CrunchLog.Printf("Caught signal %v", s)
+ for s := range sig {
+ runner.CrunchLog.Printf("caught signal: %v", s)
+ runner.stop()
}
- runner.stop()
}(runner.SigChan)
}
func (runner *ContainerRunner) stop() {
runner.cStateLock.Lock()
defer runner.cStateLock.Unlock()
- if runner.cCancelled {
+ if runner.ContainerID == "" {
return
}
runner.cCancelled = true
- if runner.cStarted {
- timeout := time.Duration(10)
- err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
- if err != nil {
- runner.CrunchLog.Printf("StopContainer failed: %s", err)
- }
- // Suppress multiple calls to stop()
- runner.cStarted = false
+ runner.CrunchLog.Printf("removing container")
+ err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+ if err != nil {
+ runner.CrunchLog.Printf("error removing container: %s", err)
}
}
func (runner *ContainerRunner) stopSignals() {
if runner.SigChan != nil {
signal.Stop(runner.SigChan)
- close(runner.SigChan)
}
}
}
}
- runner.loggingDone <- true
close(runner.loggingDone)
return
}
}
return fmt.Errorf("could not start container: %v%s", err, advice)
}
- runner.cStarted = true
return nil
}
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() (err error) {
+func (runner *ContainerRunner) WaitFinish() error {
runner.CrunchLog.Print("Waiting for container to finish")
- waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+ waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+ arvMountExit := runner.ArvMountExit
+ for {
+ select {
+ case waitBody := <-waitOk:
+ runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+ code := int(waitBody.StatusCode)
+ runner.ExitCode = &code
+
+ // wait for stdout/stderr to complete
+ <-runner.loggingDone
+ return nil
+
+ case err := <-waitErr:
+ return fmt.Errorf("container wait: %v", err)
- go func() {
- <-runner.ArvMountExit
- if runner.cStarted {
+ case <-arvMountExit:
runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
runner.stop()
+ // arvMountExit will always be ready now that
+ // it's closed, but that doesn't interest us.
+ arvMountExit = nil
}
- }()
-
- var waitBody dockercontainer.ContainerWaitOKBody
- select {
- case waitBody = <-waitOk:
- case err = <-waitErr:
}
-
- // Container isn't running any more
- runner.cStarted = false
-
- if err != nil {
- return fmt.Errorf("container wait: %v", err)
- }
-
- runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
- code := int(waitBody.StatusCode)
- runner.ExitCode = &code
-
- // wait for stdout/stderr to complete
- <-runner.loggingDone
-
- return nil
}
var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
func (runner *ContainerRunner) CaptureOutput() error {
- if runner.finalState != "Complete" {
- return nil
- }
-
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
// Output may have been set directly by the container, so
// refresh the container record to check.
}
err = runner.WaitFinish()
- if err == nil {
+ if err == nil && !runner.IsCancelled() {
runner.finalState = "Complete"
}
return
// API version 1.21 corresponds to Docker 1.9, which is currently the
// minimum version we want to support.
docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- dockerClientProxy := ThinDockerClientProxy{Docker: docker}
-
- cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+ cr := NewContainerRunner(api, kc, docker, containerId)
if dockererr != nil {
cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
cr.checkBrokenNode(dockererr)
import (
"bufio"
"bytes"
- "context"
"crypto/md5"
"encoding/json"
"errors"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+ "golang.org/x/net/context"
dockertypes "github.com/docker/docker/api/types"
dockercontainer "github.com/docker/docker/api/types/container"
TestingT(t)
}
-type TestSuite struct{}
-
// Gocheck boilerplate
var _ = Suite(&TestSuite{})
+type TestSuite struct {
+ docker *TestDockerClient
+}
+
+func (s *TestSuite) SetUpTest(c *C) {
+ s.docker = NewTestDockerClient()
+}
+
type ArvTestClient struct {
Total int64
Calls int
logReader io.ReadCloser
logWriter io.WriteCloser
fn func(t *TestDockerClient)
- finish int
+ exitCode int
stop chan bool
cwd string
env []string
api *ArvTestClient
realTemp string
+ calledWait bool
}
-func NewTestDockerClient(exitCode int) *TestDockerClient {
+func NewTestDockerClient() *TestDockerClient {
t := &TestDockerClient{}
t.logReader, t.logWriter = io.Pipe()
- t.finish = exitCode
t.stop = make(chan bool, 1)
t.cwd = "/"
return t
}
func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
- if t.finish == 3 {
+ if t.exitCode == 3 {
return errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`)
}
- if t.finish == 4 {
+ if t.exitCode == 4 {
return errors.New(`panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`)
}
- if t.finish == 5 {
+ if t.exitCode == 5 {
return errors.New(`Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`)
}
- if t.finish == 6 {
+ if t.exitCode == 6 {
return errors.New(`Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`)
}
}
}
-func (t *TestDockerClient) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+func (t *TestDockerClient) ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error {
t.stop <- true
return nil
}
func (t *TestDockerClient) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
- body := make(chan dockercontainer.ContainerWaitOKBody)
+ t.calledWait = true
+ body := make(chan dockercontainer.ContainerWaitOKBody, 1)
err := make(chan error)
go func() {
t.fn(t)
- body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.finish)}
- close(body)
- close(err)
+ body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.exitCode)}
}()
return body, err
}
func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
- if t.finish == 2 {
+ if t.exitCode == 2 {
return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
}
}
func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
- if t.finish == 2 {
+ if t.exitCode == 2 {
return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
}
_, err := io.Copy(ioutil.Discard, input)
func (s *TestSuite) TestLoadImage(c *C) {
kc := &KeepTestClient{}
- docker := NewTestDockerClient(0)
- cr := NewContainerRunner(&ArvTestClient{}, kc, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
_, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
- docker := NewTestDockerClient(0)
- cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.Container.ContainerImage = hwPDH
err := cr.LoadImage()
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
- docker := NewTestDockerClient(0)
- cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.Container.ContainerImage = hwPDH
err := cr.LoadImage()
}
func (s *TestSuite) TestRunContainer(c *C) {
- docker := NewTestDockerClient(0)
- docker.fn = func(t *TestDockerClient) {
+ s.docker.fn = func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, "Hello world\n"))
t.logWriter.Close()
}
- cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
var logs TestLogs
cr.NewLogWriter = logs.NewTestLoggingWriter
// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
// dress rehearsal of the Run() function, starting from a JSON container record.
-func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
+func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
rec := arvados.Container{}
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient(exitCode)
- docker.fn = fn
- docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+ s.docker.exitCode = exitCode
+ s.docker.fn = fn
+ s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api = &ArvTestClient{Container: rec}
- docker.api = api
- cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ s.docker.api = api
+ cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.statInterval = 100 * time.Millisecond
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
c.Assert(err, IsNil)
defer os.RemoveAll(realTemp)
- docker.realTemp = realTemp
+ s.docker.realTemp = realTemp
tempcount := 0
cr.MkTempDir = func(_ string, prefix string) (string, error) {
}
func (s *TestSuite) TestFullRunHello(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
}
func (s *TestSuite) TestCrunchstat(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["sleep", "1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
func (s *TestSuite) TestNodeInfoLog(c *C) {
os.Setenv("SLURMD_NODENAME", "compute2")
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["sleep", "1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
}
func (s *TestSuite) TestContainerRecordLog(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["sleep", "1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
}
func (s *TestSuite) TestFullRunStderr(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
}
func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["pwd"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
}
func (s *TestSuite) TestFullRunSetCwd(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["pwd"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": "/bin",
func (s *TestSuite) TestStopOnSignal(c *C) {
s.testStopContainer(c, func(cr *ContainerRunner) {
go func() {
- for !cr.cStarted {
+ for !s.docker.calledWait {
time.Sleep(time.Millisecond)
}
cr.SigChan <- syscall.SIGINT
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient(0)
- docker.fn = func(t *TestDockerClient) {
+ s.docker.fn = func(t *TestDockerClient) {
<-t.stop
t.logWriter.Write(dockerLog(1, "foo\n"))
t.logWriter.Close()
}
- docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+ s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api := &ArvTestClient{Container: rec}
- cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr := NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
setup(cr)
}
func (s *TestSuite) TestFullRunSetEnv(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo $FROBIZ"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": "/bin",
"runtime_constraints": {}
}`
- api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
}
// Used by the TestStdoutWithWrongPath*()
-func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
+func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
rec := arvados.Container{}
err = json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient(0)
- docker.fn = fn
- docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+ s.docker.fn = fn
+ s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api = &ArvTestClient{Container: rec}
- cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
}
func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
- _, _, err := StdoutErrorRunHelper(c, `{
+ _, _, err := s.stdoutErrorRunHelper(c, `{
"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} },
"output_path": "/tmp"
}`, func(t *TestDockerClient) {})
}
func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
- _, _, err := StdoutErrorRunHelper(c, `{
+ _, _, err := s.stdoutErrorRunHelper(c, `{
"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} },
"output_path": "/tmp"
}`, func(t *TestDockerClient) {})
}
func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
- _, _, err := StdoutErrorRunHelper(c, `{
+ _, _, err := s.stdoutErrorRunHelper(c, `{
"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} },
"output_path": "/tmp"
}`, func(t *TestDockerClient) {})
func (s *TestSuite) TestFullRunWithAPI(c *C) {
os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
defer os.Unsetenv("ARVADOS_API_HOST")
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": "/bin",
func (s *TestSuite) TestFullRunSetOutput(c *C) {
os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
defer os.Unsetenv("ARVADOS_API_HOST")
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": "/bin",
extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
"a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
}
- api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, runner, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
"b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
"a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
os.Symlink("/keep/foo/sub1file2", t.realTemp+"/2/baz")
os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/2/baz2")
os.Symlink("/keep/foo2/subdir1", t.realTemp+"/2/baz3")
extraMounts := []string{}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
os.Symlink("/etc/hosts", t.realTemp+"/2/baz")
t.logWriter.Close()
})
extraMounts := []string{}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
rf, _ := os.Create(t.realTemp + "/2/realfile")
rf.Write([]byte("foo"))
rf.Close()
"b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
"runtime_constraints": {}
}`
- api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
}
func (s *TestSuite) TestStderrMount(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo hello;exit 1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := tf.Name()
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := ""
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := ""
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := ""
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := ""
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := ""
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",