1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.arvados.org/arvados.git/lib/config"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/arvadostest"
21 "git.arvados.org/arvados.git/sdk/go/ctxlog"
22 "github.com/prometheus/client_golang/prometheus"
26 func Test(t *testing.T) {
30 var _ = check.Suite(&suite{})
34 crTooBig arvados.ContainerRequest
35 crCUDARequest arvados.ContainerRequest
38 func (s *suite) TearDownTest(c *check.C) {
39 arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
42 func (s *suite) SetUpTest(c *check.C) {
43 cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
44 c.Assert(err, check.IsNil)
45 cluster, err := cfg.GetCluster("")
46 c.Assert(err, check.IsNil)
47 cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
48 cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
49 s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
50 s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
51 return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
53 err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
54 "container_request": map[string]interface{}{
55 "runtime_constraints": arvados.RuntimeConstraints{
59 "container_image": arvadostest.DockerImage112PDH,
60 "command": []string{"sleep", "1"},
61 "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
62 "output_path": "/mnt/out",
63 "state": arvados.ContainerRequestStateCommitted,
65 "container_count_max": 1,
68 c.Assert(err, check.IsNil)
70 err = arvados.NewClientFromEnv().RequestAndDecode(&s.crCUDARequest, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
71 "container_request": map[string]interface{}{
72 "runtime_constraints": arvados.RuntimeConstraints{
75 CUDA: arvados.CUDARuntimeConstraints{
77 DriverVersion: "11.0",
78 HardwareCapability: "8.0",
81 "container_image": arvadostest.DockerImage112PDH,
82 "command": []string{"sleep", "1"},
83 "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
84 "output_path": "/mnt/out",
85 "state": arvados.ContainerRequestStateCommitted,
87 "container_count_max": 1,
90 c.Assert(err, check.IsNil)
99 func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...string) *exec.Cmd {
102 fakejobq := map[int]string{}
103 return func(prog string, args ...string) *exec.Cmd {
104 c.Logf("stubCommand: %q %q", prog, args)
105 if rand.Float64() < stub.errorRate {
106 return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
108 if stub.sudoUser != "" && len(args) > 3 &&
112 args[2] == stub.sudoUser {
113 prog, args = args[3], args[4:]
117 defaultArgs := s.disp.Cluster.Containers.LSF.BsubArgumentsList
118 if args[5] == s.crCUDARequest.ContainerUUID {
119 c.Assert(len(args), check.Equals, len(defaultArgs)+len(s.disp.Cluster.Containers.LSF.BsubCUDAArguments))
121 c.Assert(len(args), check.Equals, len(defaultArgs))
123 // %%J must have been rewritten to %J
124 c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out")
127 case arvadostest.LockedContainerUUID:
128 c.Check(args, check.DeepEquals, []string{
129 "-J", arvadostest.LockedContainerUUID,
132 "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
133 "-R", "select[mem>=11701MB]",
134 "-R", "select[tmp>=0MB]",
135 "-R", "select[ncpus>=4]"})
137 fakejobq[nextjobid] = args[1]
140 case arvadostest.QueuedContainerUUID:
141 c.Check(args, check.DeepEquals, []string{
142 "-J", arvadostest.QueuedContainerUUID,
145 "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
146 "-R", "select[mem>=11701MB]",
147 "-R", "select[tmp>=45777MB]",
148 "-R", "select[ncpus>=4]"})
150 fakejobq[nextjobid] = args[1]
153 case s.crTooBig.ContainerUUID:
154 c.Check(args, check.DeepEquals, []string{
155 "-J", s.crTooBig.ContainerUUID,
158 "-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]",
159 "-R", "select[mem>=954187MB]",
160 "-R", "select[tmp>=256MB]",
161 "-R", "select[ncpus>=1]"})
163 fakejobq[nextjobid] = args[1]
166 case s.crCUDARequest.ContainerUUID:
167 c.Check(args, check.DeepEquals, []string{
168 "-J", s.crCUDARequest.ContainerUUID,
171 "-R", "rusage[mem=528MB:tmp=256MB] span[hosts=1]",
172 "-R", "select[mem>=528MB]",
173 "-R", "select[tmp>=256MB]",
174 "-R", "select[ncpus>=1]",
177 fakejobq[nextjobid] = args[1]
181 c.Errorf("unexpected uuid passed to bsub: args %q", args)
182 return exec.Command("false")
184 return exec.Command("echo", "submitted job")
186 c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
187 var records []map[string]interface{}
188 for jobid, uuid := range fakejobq {
189 stat, reason := "RUN", ""
190 if uuid == s.crTooBig.ContainerUUID {
191 // The real bjobs output includes a trailing ';' here:
192 stat, reason = "PEND", "There are no suitable hosts for the job;"
194 records = append(records, map[string]interface{}{
195 "JOBID": fmt.Sprintf("%d", jobid),
198 "PEND_REASON": reason,
201 out, err := json.Marshal(map[string]interface{}{
203 "JOBS": len(fakejobq),
209 c.Logf("bjobs out: %s", out)
210 return exec.Command("printf", string(out))
212 killid, _ := strconv.Atoi(args[0])
213 if uuid, ok := fakejobq[killid]; !ok {
214 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
215 } else if uuid == "" {
216 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
219 time.Sleep(time.Millisecond)
221 delete(fakejobq, killid)
224 return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
227 return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
232 func (s *suite) TestSubmit(c *check.C) {
233 s.disp.lsfcli.stubCommand = lsfstub{
235 sudoUser: s.disp.Cluster.Containers.LSF.BsubSudoUser,
239 deadline := time.Now().Add(20 * time.Second)
240 for range time.NewTicker(time.Second).C {
241 if time.Now().After(deadline) {
245 // "queuedcontainer" should be running
246 if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
247 c.Log("Lookup(queuedcontainer) == false")
250 // "lockedcontainer" should be cancelled because it
251 // has priority 0 (no matching container requests)
252 if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
253 c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
256 // "crTooBig" should be cancelled because lsf stub
257 // reports there is no suitable instance type
258 if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
259 c.Logf("Lookup(crTooBig) == true, ent = %#v", ent)
262 var ctr arvados.Container
263 if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
264 c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
266 } else if ctr.State != arvados.ContainerStateQueued {
267 c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
271 if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
272 c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
274 } else if ctr.State != arvados.ContainerStateCancelled {
275 c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
278 c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;")
280 c.Log("reached desired state")