"Dispatch Crunch containers on the local system"
package_go_binary services/crunch-dispatch-slurm crunch-dispatch-slurm \
"Dispatch Crunch containers to a SLURM cluster"
-package_go_binary services/crunch-run crunch-run \
+package_go_binary cmd/arvados-server crunch-run \
"Supervise a single Crunch container"
package_go_binary services/crunchstat crunchstat \
"Gather cpu/memory/network statistics of running Crunch jobs"
lib/controller/router
lib/controller/rpc
lib/crunchstat
+lib/crunch-run
lib/cloud
lib/cloud/azure
lib/cloud/cloudtest
services/login-sync
services/nodemanager
services/nodemanager_integration
-services/crunch-run
services/crunch-dispatch-local
services/crunch-dispatch-slurm
services/ws
"git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/controller"
+ "git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/lib/dispatchcloud"
)
"config-dump": config.DumpCommand,
"config-defaults": config.DumpDefaultsCommand,
"controller": controller.Command,
+ "crunch-run": crunchrun.Command,
"dispatch-cloud": dispatchcloud.Command,
})
)
type versionCommand struct{}
+func (versionCommand) String() string {
+ return fmt.Sprintf("%s (%s)", version, runtime.Version())
+}
+
func (versionCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
prog = regexp.MustCompile(` -*version$`).ReplaceAllLiteralString(prog, "")
fmt.Fprintf(stdout, "%s %s (%s)\n", prog, version, runtime.Version())
func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
_, basename := filepath.Split(prog)
- basename = strings.TrimPrefix(basename, "arvados-")
- basename = strings.TrimPrefix(basename, "crunch-")
- if cmd, ok := m[basename]; ok {
+ if i := strings.Index(basename, "~"); i >= 0 {
+ // drop "~anything" suffix (arvados-dispatch-cloud's
+ // DeployRunnerBinary feature relies on this)
+ basename = basename[:i]
+ }
+ cmd, ok := m[basename]
+ if !ok {
+ // "controller" command exists, and binary is named "arvados-controller"
+ cmd, ok = m[strings.TrimPrefix(basename, "arvados-")]
+ }
+ if !ok {
+ // "dispatch-slurm" command exists, and binary is named "crunch-dispatch-slurm"
+ cmd, ok = m[strings.TrimPrefix(basename, "crunch-")]
+ }
+ if ok {
return cmd.RunCommand(prog, args, stdin, stdout, stderr)
} else if len(args) < 1 {
fmt.Fprintf(stderr, "usage: %s command [args]\n", prog)
# Worker VM image ID.
ImageID: ""
+ # An executable file (located on the dispatcher host) to be
+ # copied to cloud instances at runtime and used as the
+ # container runner/supervisor. The default value is the
+ # dispatcher program itself.
+ #
+ # Use the empty string to disable this step: nothing will be
+ # copied, and cloud instances are assumed to have a suitable
+ # version of crunch-run installed.
+ DeployRunnerBinary: "/proc/self/exe"
+
# Tags to add on all resources (VMs, NICs, disks) created by
# the container dispatcher. (Arvados's own tags --
# InstanceType, IdleBehavior, and InstanceSecret -- will also
# Worker VM image ID.
ImageID: ""
+ # An executable file (located on the dispatcher host) to be
+ # copied to cloud instances at runtime and used as the
+ # container runner/supervisor. The default value is the
+ # dispatcher program itself.
+ #
+ # Use the empty string to disable this step: nothing will be
+ # copied, and cloud instances are assumed to have a suitable
+ # version of crunch-run installed.
+ DeployRunnerBinary: "/proc/self/exe"
+
# Tags to add on all resources (VMs, NICs, disks) created by
# the container dispatcher. (Arvados's own tags --
# InstanceType, IdleBehavior, and InstanceSecret -- will also
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"encoding/json"
//
// Stdout and stderr in the child process are sent to the systemd
// journal using the systemd-cat program.
-func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
- return exitcode(stderr, detach(uuid, args, stdout, stderr))
+func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int {
+ return exitcode(stderr, detach(uuid, prog, args, stdout, stderr))
}
-func detach(uuid string, args []string, stdout, stderr io.Writer) error {
+func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error {
lockfile, err := func() (*os.File, error) {
// We must hold the dir-level lock between
// opening/creating the lockfile and acquiring LOCK_EX
defer lockfile.Close()
lockfile.Truncate(0)
- cmd := exec.Command("systemd-cat", append([]string{"--identifier=crunch-run", args[0], "-no-detach"}, args[1:]...)...)
+ execargs := append([]string{"-no-detach"}, args...)
+ if strings.HasSuffix(prog, " crunch-run") {
+ // invoked as "/path/to/arvados-server crunch-run"
+ // (see arvados/lib/cmd.Multi)
+ execargs = append([]string{strings.TrimSuffix(prog, " crunch-run"), "crunch-run"}, execargs...)
+ } else {
+ // invoked as "/path/to/crunch-run"
+ execargs = append([]string{prog}, execargs...)
+ }
+ execargs = append([]string{
+ // Here, if the inner systemd-cat can't exec
+ // crunch-run, it writes an error message to stderr,
+ // and the outer systemd-cat writes it to the journal
+ // where the operator has a chance to discover it. (If
+ // we only used one systemd-cat command, it would be
+ // up to us to report the error -- but we are going to
+ // detach and exit, not wait for something to appear
+ // on stderr.) Note these systemd-cat calls don't
+ // result in additional processes -- they just connect
+ // stderr/stdout to sockets and call exec().
+ "systemd-cat", "--identifier=crunch-run",
+ "systemd-cat", "--identifier=crunch-run",
+ }, execargs...)
+
+ cmd := exec.Command(execargs[0], execargs[1:]...)
// Child inherits lockfile.
cmd.ExtraFiles = []*os.File{lockfile}
// Ensure child isn't interrupted even if we receive signals
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"bytes"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
. "gopkg.in/check.v1"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"encoding/json"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"io"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"bytes"
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/crunchstat"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
dockerclient "github.com/docker/docker/client"
)
-var version = "dev"
+type command struct{}
+
+var Command = command{}
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
type IArvadosClient interface {
// Run the full container lifecycle.
func (runner *ContainerRunner) Run() (err error) {
- runner.CrunchLog.Printf("crunch-run %s started", version)
+ runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
hostname, hosterr := os.Hostname()
return cr, nil
}
-func main() {
- statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
- cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
- cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
- cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
- caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
- detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
- stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
- sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
- kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
- list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
- enableNetwork := flag.String("container-enable-networking", "default",
+func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ flags := flag.NewFlagSet(prog, flag.ContinueOnError)
+ statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
+ cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
+ cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
+ cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
+ caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
+ detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
+ stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+ sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
+ kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
+ list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
+ enableNetwork := flags.String("container-enable-networking", "default",
`Specify if networking should be enabled for container. One of 'default', 'always':
default: only enable networking if container requests it.
always: containers always have networking enabled
`)
- networkMode := flag.String("container-network-mode", "default",
+ networkMode := flags.String("container-network-mode", "default",
`Set networking mode for container. Corresponds to Docker network mode (--net).
`)
- memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
- getVersion := flag.Bool("version", false, "Print version information and exit.")
- flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+ memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
+ flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
ignoreDetachFlag := false
- if len(os.Args) > 1 && os.Args[1] == "-no-detach" {
+ if len(args) > 0 && args[0] == "-no-detach" {
// This process was invoked by a parent process, which
// has passed along its own arguments, including
// -detach, after the leading -no-detach flag. Strip
// the leading -no-detach flag (it's not recognized by
- // flag.Parse()) and ignore the -detach flag that
+ // flags.Parse()) and ignore the -detach flag that
// comes later.
- os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
+ args = args[1:]
ignoreDetachFlag = true
}
- flag.Parse()
+ if err := flags.Parse(args); err == flag.ErrHelp {
+ return 0
+ } else if err != nil {
+ log.Print(err)
+ return 1
+ }
if *stdinEnv && !ignoreDetachFlag {
// Load env vars on stdin if asked (but not in a
// detached child process, in which case stdin is
// /dev/null).
- loadEnv(os.Stdin)
+ err := loadEnv(os.Stdin)
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
}
+ containerId := flags.Arg(0)
+
switch {
case *detach && !ignoreDetachFlag:
- os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
+ return Detach(containerId, prog, args, os.Stdout, os.Stderr)
case *kill >= 0:
- os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
+ return KillProcess(containerId, syscall.Signal(*kill), os.Stdout, os.Stderr)
case *list:
- os.Exit(ListProcesses(os.Stdout, os.Stderr))
+ return ListProcesses(os.Stdout, os.Stderr)
}
- // Print version information if requested
- if *getVersion {
- fmt.Printf("crunch-run %s\n", version)
- return
+ if containerId == "" {
+ log.Printf("usage: %s [options] UUID", prog)
+ return 1
}
- log.Printf("crunch-run %s started", version)
+ log.Printf("crunch-run %s started", cmd.Version.String())
time.Sleep(*sleep)
- containerId := flag.Arg(0)
-
if *caCertsPath != "" {
arvadosclient.CertFiles = []string{*caCertsPath}
}
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Fatalf("%s: %v", containerId, err)
+ log.Printf("%s: %v", containerId, err)
+ return 1
}
api.Retries = 8
kc, kcerr := keepclient.MakeKeepClient(api)
if kcerr != nil {
- log.Fatalf("%s: %v", containerId, kcerr)
+ log.Printf("%s: %v", containerId, kcerr)
+ return 1
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
if err != nil {
- log.Fatal(err)
+ log.Print(err)
+ return 1
}
if dockererr != nil {
cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
cr.checkBrokenNode(dockererr)
cr.CrunchLog.Close()
- os.Exit(1)
+ return 1
}
parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
if tmperr != nil {
- log.Fatalf("%s: %v", containerId, tmperr)
+ log.Printf("%s: %v", containerId, tmperr)
+ return 1
}
cr.parentTemp = parentTemp
}
if runerr != nil {
- log.Fatalf("%s: %v", containerId, runerr)
+ log.Printf("%s: %v", containerId, runerr)
+ return 1
}
+ return 0
}
-func loadEnv(rdr io.Reader) {
+func loadEnv(rdr io.Reader) error {
buf, err := ioutil.ReadAll(rdr)
if err != nil {
- log.Fatalf("read stdin: %s", err)
+ return fmt.Errorf("read stdin: %s", err)
}
var env map[string]string
err = json.Unmarshal(buf, &env)
if err != nil {
- log.Fatalf("decode stdin: %s", err)
+ return fmt.Errorf("decode stdin: %s", err)
}
for k, v := range env {
err = os.Setenv(k, v)
if err != nil {
- log.Fatalf("setenv(%q): %s", k, err)
+ return fmt.Errorf("setenv(%q): %s", k, err)
}
}
+ return nil
}
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"bufio"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"fmt"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"io/ioutil"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"bufio"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package crunchrun
import (
"fmt"
type pool interface {
scheduler.WorkerPool
+ CheckHealth() error
Instances() []worker.InstanceView
SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
KillInstance(id cloud.InstanceID, reason string) error
// CheckHealth implements service.Handler.
func (disp *dispatcher) CheckHealth() error {
disp.Start()
- return nil
+ return disp.pool.CheckHealth()
}
// Stop dispatching containers and release resources. Typically used
package worker
import (
+ "crypto/md5"
"crypto/rand"
"errors"
"fmt"
"io"
+ "io/ioutil"
"sort"
"strings"
"sync"
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
+ runnerSource string
imageID cloud.ImageID
instanceTypes map[string]arvados.InstanceType
syncInterval time.Duration
stop chan bool
mtx sync.RWMutex
setupOnce sync.Once
+ runnerData []byte
+ runnerMD5 [md5.Size]byte
+ runnerCmd string
throttleCreate throttle
throttleInstances throttle
instanceType arvados.InstanceType
}
+func (wp *Pool) CheckHealth() error {
+ wp.setupOnce.Do(wp.setup)
+ if err := wp.loadRunnerData(); err != nil {
+ return fmt.Errorf("error loading runner binary: %s", err)
+ }
+ return nil
+}
+
// Subscribe returns a buffered channel that becomes ready after any
// change to the pool's state that could have scheduling implications:
// a worker's state changes, a new worker appears, the cloud
func (wp *Pool) Create(it arvados.InstanceType) bool {
logger := wp.logger.WithField("InstanceType", it.Name)
wp.setupOnce.Do(wp.setup)
+ if wp.loadRunnerData() != nil {
+ // Boot probe is certain to fail.
+ return false
+ }
wp.mtx.Lock()
defer wp.mtx.Unlock()
if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
wp.exited = map[string]time.Time{}
wp.workers = map[cloud.InstanceID]*worker{}
wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+ wp.loadRunnerData()
+}
+
+// Load the runner program to be deployed on worker nodes into
+// wp.runnerData, if necessary. Errors are logged.
+//
+// If auto-deploy is disabled, len(wp.runnerData) will be 0.
+//
+// Caller must not have lock.
+func (wp *Pool) loadRunnerData() error {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ if wp.runnerData != nil {
+ return nil
+ } else if wp.runnerSource == "" {
+ wp.runnerCmd = "crunch-run"
+ wp.runnerData = []byte{}
+ return nil
+ }
+ logger := wp.logger.WithField("source", wp.runnerSource)
+ logger.Debug("loading runner")
+ buf, err := ioutil.ReadFile(wp.runnerSource)
+ if err != nil {
+ logger.WithError(err).Error("failed to load runner program")
+ return err
+ }
+ wp.runnerData = buf
+ wp.runnerMD5 = md5.Sum(buf)
+ wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", wp.runnerMD5)
+ return nil
}
func (wp *Pool) notify() {
c.Assert(err, check.IsNil)
newExecutor := func(cloud.Instance) Executor {
- return stubExecutor{
- "crunch-run --list": stubResp{},
- "true": stubResp{},
+ return &stubExecutor{
+ response: map[string]stubResp{
+ "crunch-run --list": stubResp{},
+ "true": stubResp{},
+ },
}
}
type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
pool := &Pool{
logger: logger,
- newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
+ newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
uuid string
executor Executor
envJSON json.RawMessage
+ runnerCmd string
remoteUser string
timeoutTERM time.Duration
timeoutSignal time.Duration
uuid: uuid,
executor: wkr.executor,
envJSON: envJSON,
+ runnerCmd: wkr.wp.runnerCmd,
remoteUser: wkr.instance.RemoteUser(),
timeoutTERM: wkr.wp.timeoutTERM,
timeoutSignal: wkr.wp.timeoutSignal,
// assume the remote process _might_ have started, at least until it
// probes the worker and finds otherwise.
func (rr *remoteRunner) Start() {
- cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+ cmd := rr.runnerCmd + " --detach --stdin-env '" + rr.uuid + "'"
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
func (rr *remoteRunner) kill(sig syscall.Signal) {
logger := rr.logger.WithField("Signal", int(sig))
logger.Info("sending signal")
- cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+ cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
package worker
import (
+ "bytes"
"fmt"
+ "path/filepath"
"strings"
"sync"
"time"
}
func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
- cmd := "crunch-run --list"
+ cmd := wkr.wp.runnerCmd + " --list"
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
}
return false, stderr
}
logger.Info("boot probe succeeded")
+ if err = wkr.wp.loadRunnerData(); err != nil {
+ wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
+ return false, stderr
+ } else if len(wkr.wp.runnerData) == 0 {
+ // Assume crunch-run is already installed
+ } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
+ wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
+ return false, stderr2
+ } else {
+ wkr.logger.Info("runner binary OK")
+ stderr = append(stderr, stderr2...)
+ }
return true, stderr
}
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
+ hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
+ dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+
+ stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
+ if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
+ return
+ }
+
+ // Note touch+chmod come before writing data, to avoid the
+ // possibility of md5 being correct while file mode is
+ // incorrect.
+ cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0700 "$dstfile"; cat >"$dstfile"`
+ if wkr.instance.RemoteUser() != "root" {
+ cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
+ }
+ stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
+ return
+}
+
// caller must have lock.
func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
if wkr.idleBehavior == IdleBehaviorHold {
package worker
import (
+ "bytes"
+ "crypto/md5"
"errors"
+ "fmt"
"io"
+ "strings"
"time"
"git.arvados.org/arvados.git/lib/cloud"
running int
starting int
respBoot stubResp // zero value is success
+ respDeploy stubResp // zero value is success
respRun stubResp // zero value is success + nothing running
+ respRunDeployed stubResp
+ deployRunner []byte
+ expectStdin []byte
expectState State
expectRunning int
}
errFail := errors.New("failed")
respFail := stubResp{"", "command failed\n", errFail}
respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil}
- for _, trial := range []trialT{
+ for idx, trial := range []trialT{
{
testCaseComment: "Unknown, probes fail",
state: StateUnknown,
starting: 1,
expectState: StateRunning,
},
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner succeeds, run probe succeeds",
+ state: StateBooting,
+ deployRunner: []byte("ELF"),
+ expectStdin: []byte("ELF"),
+ respRun: respFail,
+ respRunDeployed: respContainerRunning,
+ expectRunning: 1,
+ expectState: StateRunning,
+ },
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner fails",
+ state: StateBooting,
+ deployRunner: []byte("ELF"),
+ respDeploy: respFail,
+ expectStdin: []byte("ELF"),
+ expectState: StateBooting,
+ },
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner skipped, run probe succeeds",
+ state: StateBooting,
+ deployRunner: nil,
+ respDeploy: respFail,
+ expectState: StateIdle,
+ },
} {
- c.Logf("------- %#v", trial)
+ c.Logf("------- trial %d: %#v", idx, trial)
ctime := time.Now().Add(-trial.age)
- exr := stubExecutor{
- "bootprobe": trial.respBoot,
- "crunch-run --list": trial.respRun,
+ exr := &stubExecutor{
+ response: map[string]stubResp{
+ "bootprobe": trial.respBoot,
+ "crunch-run --list": trial.respRun,
+ "{deploy}": trial.respDeploy,
+ },
}
wp := &Pool{
arvClient: ac,
timeoutBooting: bootTimeout,
timeoutProbe: probeTimeout,
exited: map[string]time.Time{},
+ runnerCmd: "crunch-run",
+ runnerData: trial.deployRunner,
+ runnerMD5: md5.Sum(trial.deployRunner),
+ }
+ if trial.deployRunner != nil {
+ svHash := md5.Sum(trial.deployRunner)
+ wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", svHash)
+ exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
}
wkr := &worker{
logger: logger,
wkr.probeAndUpdate()
c.Check(wkr.state, check.Equals, trial.expectState)
c.Check(len(wkr.running), check.Equals, trial.expectRunning)
+ c.Check(exr.stdin.String(), check.Equals, string(trial.expectStdin))
}
}
stderr string
err error
}
-type stubExecutor map[string]stubResp
-func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-func (se stubExecutor) Close() {}
-func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
- resp, ok := se[cmd]
+type stubExecutor struct {
+ response map[string]stubResp
+ stdin bytes.Buffer
+}
+
+func (se *stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+func (se *stubExecutor) Close() {}
+func (se *stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
+ if stdin != nil {
+ _, err = io.Copy(&se.stdin, stdin)
+ if err != nil {
+ return nil, []byte(err.Error()), err
+ }
+ }
+ resp, ok := se.response[cmd]
+ if !ok && strings.Contains(cmd, `; cat >"$dstfile"`) {
+ resp, ok = se.response["{deploy}"]
+ }
if !ok {
- return nil, []byte("command not found\n"), errors.New("command not found")
+ return nil, []byte(fmt.Sprintf("%s: command not found\n", cmd)), errors.New("command not found")
}
return []byte(resp.stdout), []byte(resp.stderr), resp.err
}