import (
"context"
+ "encoding/json"
"fmt"
"math/rand"
"os/exec"
var _ = check.Suite(&suite{})
type suite struct {
- disp *dispatcher
+ disp *dispatcher
+ crTooBig arvados.ContainerRequest
+ crPending arvados.ContainerRequest
+ crCUDARequest arvados.ContainerRequest
}
func (s *suite) TearDownTest(c *check.C) {
c.Assert(err, check.IsNil)
cluster, err := cfg.GetCluster("")
c.Assert(err, check.IsNil)
- cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
+ cluster.Containers.ReserveExtraRAM = 256 << 20
+ cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
+ cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
+ cluster.InstanceTypes = arvados.InstanceTypeMap{
+ "biggest_available_node": arvados.InstanceType{
+ RAM: 100 << 30, // 100 GiB
+ VCPUs: 4,
+ IncludedScratch: 100 << 30,
+ Scratch: 100 << 30,
+ }}
s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
}
+ err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+ "container_request": map[string]interface{}{
+ "runtime_constraints": arvados.RuntimeConstraints{
+ RAM: 1000000000000,
+ VCPUs: 1,
+ },
+ "container_image": arvadostest.DockerImage112PDH,
+ "command": []string{"sleep", "1"},
+ "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+ "output_path": "/mnt/out",
+ "state": arvados.ContainerRequestStateCommitted,
+ "priority": 1,
+ "container_count_max": 1,
+ },
+ })
+ c.Assert(err, check.IsNil)
+
+ err = arvados.NewClientFromEnv().RequestAndDecode(&s.crPending, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+ "container_request": map[string]interface{}{
+ "runtime_constraints": arvados.RuntimeConstraints{
+ RAM: 100000000,
+ VCPUs: 2,
+ KeepCacheDisk: 8 << 30,
+ },
+ "container_image": arvadostest.DockerImage112PDH,
+ "command": []string{"sleep", "1"},
+ "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+ "output_path": "/mnt/out",
+ "state": arvados.ContainerRequestStateCommitted,
+ "priority": 1,
+ "container_count_max": 1,
+ },
+ })
+ c.Assert(err, check.IsNil)
+
+ err = arvados.NewClientFromEnv().RequestAndDecode(&s.crCUDARequest, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+ "container_request": map[string]interface{}{
+ "runtime_constraints": arvados.RuntimeConstraints{
+ RAM: 16000000,
+ VCPUs: 1,
+ CUDA: arvados.CUDARuntimeConstraints{
+ DeviceCount: 1,
+ DriverVersion: "11.0",
+ HardwareCapability: "8.0",
+ },
+ },
+ "container_image": arvadostest.DockerImage112PDH,
+ "command": []string{"sleep", "1"},
+ "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+ "output_path": "/mnt/out",
+ "state": arvados.ContainerRequestStateCommitted,
+ "priority": 1,
+ "container_count_max": 1,
+ },
+ })
+ c.Assert(err, check.IsNil)
+
}
type lsfstub struct {
switch prog {
case "bsub":
defaultArgs := s.disp.Cluster.Containers.LSF.BsubArgumentsList
- c.Assert(args, check.HasLen, 4+len(defaultArgs))
- c.Check(args[:len(defaultArgs)], check.DeepEquals, defaultArgs)
- args = args[len(defaultArgs):]
-
- c.Check(args[0], check.Equals, "-J")
+ if args[5] == s.crCUDARequest.ContainerUUID {
+ c.Assert(len(args), check.Equals, len(defaultArgs)+len(s.disp.Cluster.Containers.LSF.BsubCUDAArguments))
+ } else {
+ c.Assert(len(args), check.Equals, len(defaultArgs))
+ }
+ // %%J must have been rewritten to %J
+ c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out")
+ args = args[4:]
switch args[1] {
case arvadostest.LockedContainerUUID:
- c.Check(args, check.DeepEquals, []string{"-J", arvadostest.LockedContainerUUID, "-R", "rusage[mem=11701MB:tmp=0MB] affinity[core(4)]"})
+ c.Check(args, check.DeepEquals, []string{
+ "-J", arvadostest.LockedContainerUUID,
+ "-n", "4",
+ "-D", "11701MB",
+ "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
+ "-R", "select[mem>=11701MB]",
+ "-R", "select[tmp>=0MB]",
+ "-R", "select[ncpus>=4]"})
mtx.Lock()
fakejobq[nextjobid] = args[1]
nextjobid++
mtx.Unlock()
case arvadostest.QueuedContainerUUID:
- c.Check(args, check.DeepEquals, []string{"-J", arvadostest.QueuedContainerUUID, "-R", "rusage[mem=11701MB:tmp=45777MB] affinity[core(4)]"})
+ c.Check(args, check.DeepEquals, []string{
+ "-J", arvadostest.QueuedContainerUUID,
+ "-n", "4",
+ "-D", "11701MB",
+ "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
+ "-R", "select[mem>=11701MB]",
+ "-R", "select[tmp>=45777MB]",
+ "-R", "select[ncpus>=4]"})
+ mtx.Lock()
+ fakejobq[nextjobid] = args[1]
+ nextjobid++
+ mtx.Unlock()
+ case s.crPending.ContainerUUID:
+ c.Check(args, check.DeepEquals, []string{
+ "-J", s.crPending.ContainerUUID,
+ "-n", "2",
+ "-D", "352MB",
+ "-R", "rusage[mem=352MB:tmp=8448MB] span[hosts=1]",
+ "-R", "select[mem>=352MB]",
+ "-R", "select[tmp>=8448MB]",
+ "-R", "select[ncpus>=2]"})
+ mtx.Lock()
+ fakejobq[nextjobid] = args[1]
+ nextjobid++
+ mtx.Unlock()
+ case s.crCUDARequest.ContainerUUID:
+ c.Check(args, check.DeepEquals, []string{
+ "-J", s.crCUDARequest.ContainerUUID,
+ "-n", "1",
+ "-D", "528MB",
+ "-R", "rusage[mem=528MB:tmp=256MB] span[hosts=1]",
+ "-R", "select[mem>=528MB]",
+ "-R", "select[tmp>=256MB]",
+ "-R", "select[ncpus>=1]",
+ "-gpu", "num=1"})
mtx.Lock()
fakejobq[nextjobid] = args[1]
nextjobid++
}
return exec.Command("echo", "submitted job")
case "bjobs":
- c.Check(args, check.DeepEquals, []string{"-u", "all", "-noheader", "-o", "jobid stat job_name:30"})
- out := ""
+ c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
+ var records []map[string]interface{}
for jobid, uuid := range fakejobq {
- out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid)
+ stat, reason := "RUN", ""
+ if uuid == s.crPending.ContainerUUID {
+ // The real bjobs output includes a trailing ';' here:
+ stat, reason = "PEND", "There are no suitable hosts for the job;"
+ }
+ records = append(records, map[string]interface{}{
+ "JOBID": fmt.Sprintf("%d", jobid),
+ "STAT": stat,
+ "JOB_NAME": uuid,
+ "PEND_REASON": reason,
+ })
+ }
+ out, err := json.Marshal(map[string]interface{}{
+ "COMMAND": "bjobs",
+ "JOBS": len(fakejobq),
+ "RECORDS": records,
+ })
+ if err != nil {
+ panic(err)
}
- c.Logf("bjobs out: %q", out)
- return exec.Command("printf", out)
+ c.Logf("bjobs out: %s", out)
+ return exec.Command("printf", string(out))
case "bkill":
killid, _ := strconv.Atoi(args[0])
if uuid, ok := fakejobq[killid]; !ok {
sudoUser: s.disp.Cluster.Containers.LSF.BsubSudoUser,
}.stubCommand(s, c)
s.disp.Start()
+
deadline := time.Now().Add(20 * time.Second)
for range time.NewTicker(time.Second).C {
if time.Now().After(deadline) {
c.Error("timed out")
break
}
+ // "crTooBig" should never be submitted to lsf because
+ // it is bigger than any configured instance type
+ if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
+ c.Errorf("Lookup(crTooBig) == true, ent = %#v", ent)
+ break
+ }
// "queuedcontainer" should be running
- if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+ if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
+ c.Log("Lookup(queuedcontainer) == false")
+ continue
+ }
+ // "crPending" should be pending
+ if ent, ok := s.disp.lsfqueue.Lookup(s.crPending.ContainerUUID); !ok {
+ c.Logf("Lookup(crPending) == false", ent)
continue
}
// "lockedcontainer" should be cancelled because it
// has priority 0 (no matching container requests)
- if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+ if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
+ c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
continue
}
var ctr arvados.Container
if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
continue
- }
- if ctr.State != arvados.ContainerStateQueued {
+ } else if ctr.State != arvados.ContainerStateQueued {
c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
continue
}
+
+ if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
+ c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
+ continue
+ } else if ctr.State != arvados.ContainerStateCancelled {
+ c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
+ continue
+ } else {
+ c.Check(ctr.RuntimeStatus["error"], check.Equals, "constraints not satisfiable by any configured instance type")
+ }
c.Log("reached desired state")
break
}