1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.arvados.org/arvados.git/lib/config"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/arvadostest"
21 "git.arvados.org/arvados.git/sdk/go/ctxlog"
22 "github.com/prometheus/client_golang/prometheus"
26 func Test(t *testing.T) {
30 var _ = check.Suite(&suite{})
34 crTooBig arvados.ContainerRequest
35 crPending arvados.ContainerRequest
36 crCUDARequest arvados.ContainerRequest
37 crMaxRunTime arvados.ContainerRequest
40 func (s *suite) TearDownTest(c *check.C) {
41 arvadostest.ResetDB(c)
44 func (s *suite) SetUpTest(c *check.C) {
45 arvadostest.ResetDB(c)
47 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
48 c.Assert(err, check.IsNil)
49 cluster, err := cfg.GetCluster("")
50 c.Assert(err, check.IsNil)
51 cluster.Containers.ReserveExtraRAM = 256 << 20
52 cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
53 cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
54 cluster.InstanceTypes = arvados.InstanceTypeMap{
55 "biggest_available_node": arvados.InstanceType{
56 RAM: 100 << 30, // 100 GiB
58 IncludedScratch: 100 << 30,
61 "biggest_available_node_with_gpu": arvados.InstanceType{
62 RAM: 100 << 30, // 100 GiB
64 IncludedScratch: 100 << 30,
66 GPU: arvados.GPUFeatures{
68 DriverVersion: "11.0",
69 HardwareTarget: "8.0",
74 s.disp = newHandler(context.Background(), cluster, arvadostest.SystemRootToken, prometheus.NewRegistry()).(*dispatcher)
75 s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
76 return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
78 err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
79 "container_request": map[string]interface{}{
80 "runtime_constraints": arvados.RuntimeConstraints{
84 "container_image": arvadostest.DockerImage112PDH,
85 "command": []string{"sleep", "1"},
86 "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
87 "output_path": "/mnt/out",
88 "state": arvados.ContainerRequestStateCommitted,
90 "container_count_max": 1,
93 c.Assert(err, check.IsNil)
95 err = arvados.NewClientFromEnv().RequestAndDecode(&s.crPending, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
96 "container_request": map[string]interface{}{
97 "runtime_constraints": arvados.RuntimeConstraints{
100 KeepCacheDisk: 8 << 30,
102 "container_image": arvadostest.DockerImage112PDH,
103 "command": []string{"sleep", "1"},
104 "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
105 "output_path": "/mnt/out",
106 "state": arvados.ContainerRequestStateCommitted,
108 "container_count_max": 1,
111 c.Assert(err, check.IsNil)
113 err = arvados.NewClientFromEnv().RequestAndDecode(&s.crCUDARequest, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
114 "container_request": map[string]interface{}{
115 "runtime_constraints": arvados.RuntimeConstraints{
118 GPU: arvados.GPURuntimeConstraints{
121 DriverVersion: "11.0",
122 HardwareTarget: []string{"8.0"},
126 "container_image": arvadostest.DockerImage112PDH,
127 "command": []string{"sleep", "1"},
128 "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
129 "output_path": "/mnt/out",
130 "state": arvados.ContainerRequestStateCommitted,
132 "container_count_max": 1,
135 c.Assert(err, check.IsNil)
137 err = arvados.NewClientFromEnv().RequestAndDecode(&s.crMaxRunTime, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
138 "container_request": map[string]interface{}{
139 "runtime_constraints": arvados.RuntimeConstraints{
143 "scheduling_parameters": arvados.SchedulingParameters{
146 "container_image": arvadostest.DockerImage112PDH,
147 "command": []string{"sleep", "123"},
148 "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
149 "output_path": "/mnt/out",
150 "state": arvados.ContainerRequestStateCommitted,
152 "container_count_max": 1,
155 c.Assert(err, check.IsNil)
158 type lsfstub struct {
163 func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...string) *exec.Cmd {
166 fakejobq := map[int]string{}
167 return func(prog string, args ...string) *exec.Cmd {
168 c.Logf("stubCommand: %q %q", prog, args)
169 if rand.Float64() < stub.errorRate {
170 return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
172 if stub.sudoUser != "" && len(args) > 3 &&
176 args[2] == stub.sudoUser {
177 prog, args = args[3], args[4:]
181 c.Assert(len(args) > 5, check.Equals, true)
182 // %%J must have been rewritten to %J
183 c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out")
186 case arvadostest.LockedContainerUUID:
187 c.Check(args, check.DeepEquals, []string{
188 "-J", arvadostest.LockedContainerUUID,
191 "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
192 "-R", "select[mem>=11701MB]",
193 "-R", "select[tmp>=0MB]",
194 "-R", "select[ncpus>=4]"})
196 fakejobq[nextjobid] = args[1]
199 case arvadostest.QueuedContainerUUID:
200 c.Check(args, check.DeepEquals, []string{
201 "-J", arvadostest.QueuedContainerUUID,
204 "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
205 "-R", "select[mem>=11701MB]",
206 "-R", "select[tmp>=45777MB]",
207 "-R", "select[ncpus>=4]"})
209 fakejobq[nextjobid] = args[1]
212 case s.crPending.ContainerUUID:
213 c.Check(args, check.DeepEquals, []string{
214 "-J", s.crPending.ContainerUUID,
217 "-R", "rusage[mem=352MB:tmp=8448MB] span[hosts=1]",
218 "-R", "select[mem>=352MB]",
219 "-R", "select[tmp>=8448MB]",
220 "-R", "select[ncpus>=2]"})
222 fakejobq[nextjobid] = args[1]
225 case s.crCUDARequest.ContainerUUID:
226 c.Check(args, check.DeepEquals, []string{
227 "-J", s.crCUDARequest.ContainerUUID,
230 "-R", "rusage[mem=15515MB:tmp=15515MB] span[hosts=1]",
231 "-R", "select[mem>=15515MB]",
232 "-R", "select[tmp>=15515MB]",
233 "-R", "select[ncpus>=4]",
236 fakejobq[nextjobid] = args[1]
239 case s.crMaxRunTime.ContainerUUID:
240 c.Check(args, check.DeepEquals, []string{
241 "-J", s.crMaxRunTime.ContainerUUID,
244 "-R", "rusage[mem=257MB:tmp=2304MB] span[hosts=1]",
245 "-R", "select[mem>=257MB]",
246 "-R", "select[tmp>=2304MB]",
247 "-R", "select[ncpus>=1]",
248 "-We", "8", // 124s + 5m overhead + roundup = 8m
251 fakejobq[nextjobid] = args[1]
255 c.Errorf("unexpected uuid passed to bsub: args %q", args)
256 return exec.Command("false")
258 return exec.Command("echo", "submitted job")
260 c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
261 var records []map[string]interface{}
262 for jobid, uuid := range fakejobq {
263 stat, reason := "RUN", ""
264 if uuid == s.crPending.ContainerUUID {
265 // The real bjobs output includes a trailing ';' here:
266 stat, reason = "PEND", "There are no suitable hosts for the job;"
268 records = append(records, map[string]interface{}{
269 "JOBID": fmt.Sprintf("%d", jobid),
272 "PEND_REASON": reason,
275 out, err := json.Marshal(map[string]interface{}{
277 "JOBS": len(fakejobq),
283 c.Logf("bjobs out: %s", out)
284 return exec.Command("printf", string(out))
286 killid, _ := strconv.Atoi(args[0])
287 if uuid, ok := fakejobq[killid]; !ok {
288 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
289 } else if uuid == "" {
290 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
293 time.Sleep(time.Millisecond)
295 delete(fakejobq, killid)
298 return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
301 return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
306 func (s *suite) TestSubmit(c *check.C) {
307 s.disp.lsfcli.stubCommand = lsfstub{
309 sudoUser: s.disp.Cluster.Containers.LSF.BsubSudoUser,
313 deadline := time.Now().Add(20 * time.Second)
314 for range time.NewTicker(time.Second).C {
315 if time.Now().After(deadline) {
319 // "crTooBig" should never be submitted to lsf because
320 // it is bigger than any configured instance type
321 if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
322 c.Errorf("Lookup(crTooBig) == true, ent = %#v", ent)
325 // "queuedcontainer" should be running
326 if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
327 c.Log("Lookup(queuedcontainer) == false")
330 // "crPending" should be pending
331 if ent, ok := s.disp.lsfqueue.Lookup(s.crPending.ContainerUUID); !ok {
332 c.Logf("Lookup(crPending) == false", ent)
335 // "lockedcontainer" should be cancelled because it
336 // has priority 0 (no matching container requests)
337 if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
338 c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
341 var ctr arvados.Container
342 if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
343 c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
345 } else if ctr.State != arvados.ContainerStateQueued {
346 c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
350 if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
351 c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
353 } else if ctr.State != arvados.ContainerStateCancelled {
354 c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
357 c.Check(ctr.RuntimeStatus["error"], check.Equals, "constraints not satisfiable by any configured instance type")
359 c.Log("reached desired state")