21703: Merge branch 'main' into 21703-collection-update-lock
[arvados.git] / lib / lsf / dispatch_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "math/rand"
12         "os/exec"
13         "strconv"
14         "sync"
15         "testing"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/prometheus/client_golang/prometheus"
23         "gopkg.in/check.v1"
24 )
25
26 func Test(t *testing.T) {
27         check.TestingT(t)
28 }
29
30 var _ = check.Suite(&suite{})
31
32 type suite struct {
33         disp          *dispatcher
34         crTooBig      arvados.ContainerRequest
35         crPending     arvados.ContainerRequest
36         crCUDARequest arvados.ContainerRequest
37         crMaxRunTime  arvados.ContainerRequest
38 }
39
40 func (s *suite) TearDownTest(c *check.C) {
41         arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
42 }
43
44 func (s *suite) SetUpTest(c *check.C) {
45         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
46         c.Assert(err, check.IsNil)
47         cluster, err := cfg.GetCluster("")
48         c.Assert(err, check.IsNil)
49         cluster.Containers.ReserveExtraRAM = 256 << 20
50         cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
51         cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
52         cluster.InstanceTypes = arvados.InstanceTypeMap{
53                 "biggest_available_node": arvados.InstanceType{
54                         RAM:             100 << 30, // 100 GiB
55                         VCPUs:           4,
56                         IncludedScratch: 100 << 30,
57                         Scratch:         100 << 30,
58                 }}
59         s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
60         s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
61                 return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
62         }
63         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
64                 "container_request": map[string]interface{}{
65                         "runtime_constraints": arvados.RuntimeConstraints{
66                                 RAM:   1000000000000,
67                                 VCPUs: 1,
68                         },
69                         "container_image":     arvadostest.DockerImage112PDH,
70                         "command":             []string{"sleep", "1"},
71                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
72                         "output_path":         "/mnt/out",
73                         "state":               arvados.ContainerRequestStateCommitted,
74                         "priority":            1,
75                         "container_count_max": 1,
76                 },
77         })
78         c.Assert(err, check.IsNil)
79
80         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crPending, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
81                 "container_request": map[string]interface{}{
82                         "runtime_constraints": arvados.RuntimeConstraints{
83                                 RAM:           100000000,
84                                 VCPUs:         2,
85                                 KeepCacheDisk: 8 << 30,
86                         },
87                         "container_image":     arvadostest.DockerImage112PDH,
88                         "command":             []string{"sleep", "1"},
89                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
90                         "output_path":         "/mnt/out",
91                         "state":               arvados.ContainerRequestStateCommitted,
92                         "priority":            1,
93                         "container_count_max": 1,
94                 },
95         })
96         c.Assert(err, check.IsNil)
97
98         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crCUDARequest, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
99                 "container_request": map[string]interface{}{
100                         "runtime_constraints": arvados.RuntimeConstraints{
101                                 RAM:   16000000,
102                                 VCPUs: 1,
103                                 CUDA: arvados.CUDARuntimeConstraints{
104                                         DeviceCount:        1,
105                                         DriverVersion:      "11.0",
106                                         HardwareCapability: "8.0",
107                                 },
108                         },
109                         "container_image":     arvadostest.DockerImage112PDH,
110                         "command":             []string{"sleep", "1"},
111                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
112                         "output_path":         "/mnt/out",
113                         "state":               arvados.ContainerRequestStateCommitted,
114                         "priority":            1,
115                         "container_count_max": 1,
116                 },
117         })
118         c.Assert(err, check.IsNil)
119
120         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crMaxRunTime, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
121                 "container_request": map[string]interface{}{
122                         "runtime_constraints": arvados.RuntimeConstraints{
123                                 RAM:   1000000,
124                                 VCPUs: 1,
125                         },
126                         "scheduling_parameters": arvados.SchedulingParameters{
127                                 MaxRunTime: 124,
128                         },
129                         "container_image":     arvadostest.DockerImage112PDH,
130                         "command":             []string{"sleep", "123"},
131                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
132                         "output_path":         "/mnt/out",
133                         "state":               arvados.ContainerRequestStateCommitted,
134                         "priority":            1,
135                         "container_count_max": 1,
136                 },
137         })
138         c.Assert(err, check.IsNil)
139 }
140
141 type lsfstub struct {
142         sudoUser  string
143         errorRate float64
144 }
145
146 func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...string) *exec.Cmd {
147         mtx := sync.Mutex{}
148         nextjobid := 100
149         fakejobq := map[int]string{}
150         return func(prog string, args ...string) *exec.Cmd {
151                 c.Logf("stubCommand: %q %q", prog, args)
152                 if rand.Float64() < stub.errorRate {
153                         return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
154                 }
155                 if stub.sudoUser != "" && len(args) > 3 &&
156                         prog == "sudo" &&
157                         args[0] == "-E" &&
158                         args[1] == "-u" &&
159                         args[2] == stub.sudoUser {
160                         prog, args = args[3], args[4:]
161                 }
162                 switch prog {
163                 case "bsub":
164                         c.Assert(len(args) > 5, check.Equals, true)
165                         // %%J must have been rewritten to %J
166                         c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out")
167                         args = args[4:]
168                         switch args[1] {
169                         case arvadostest.LockedContainerUUID:
170                                 c.Check(args, check.DeepEquals, []string{
171                                         "-J", arvadostest.LockedContainerUUID,
172                                         "-n", "4",
173                                         "-D", "11701MB",
174                                         "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
175                                         "-R", "select[mem>=11701MB]",
176                                         "-R", "select[tmp>=0MB]",
177                                         "-R", "select[ncpus>=4]"})
178                                 mtx.Lock()
179                                 fakejobq[nextjobid] = args[1]
180                                 nextjobid++
181                                 mtx.Unlock()
182                         case arvadostest.QueuedContainerUUID:
183                                 c.Check(args, check.DeepEquals, []string{
184                                         "-J", arvadostest.QueuedContainerUUID,
185                                         "-n", "4",
186                                         "-D", "11701MB",
187                                         "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
188                                         "-R", "select[mem>=11701MB]",
189                                         "-R", "select[tmp>=45777MB]",
190                                         "-R", "select[ncpus>=4]"})
191                                 mtx.Lock()
192                                 fakejobq[nextjobid] = args[1]
193                                 nextjobid++
194                                 mtx.Unlock()
195                         case s.crPending.ContainerUUID:
196                                 c.Check(args, check.DeepEquals, []string{
197                                         "-J", s.crPending.ContainerUUID,
198                                         "-n", "2",
199                                         "-D", "352MB",
200                                         "-R", "rusage[mem=352MB:tmp=8448MB] span[hosts=1]",
201                                         "-R", "select[mem>=352MB]",
202                                         "-R", "select[tmp>=8448MB]",
203                                         "-R", "select[ncpus>=2]"})
204                                 mtx.Lock()
205                                 fakejobq[nextjobid] = args[1]
206                                 nextjobid++
207                                 mtx.Unlock()
208                         case s.crCUDARequest.ContainerUUID:
209                                 c.Check(args, check.DeepEquals, []string{
210                                         "-J", s.crCUDARequest.ContainerUUID,
211                                         "-n", "1",
212                                         "-D", "528MB",
213                                         "-R", "rusage[mem=528MB:tmp=256MB] span[hosts=1]",
214                                         "-R", "select[mem>=528MB]",
215                                         "-R", "select[tmp>=256MB]",
216                                         "-R", "select[ncpus>=1]",
217                                         "-gpu", "num=1"})
218                                 mtx.Lock()
219                                 fakejobq[nextjobid] = args[1]
220                                 nextjobid++
221                                 mtx.Unlock()
222                         case s.crMaxRunTime.ContainerUUID:
223                                 c.Check(args, check.DeepEquals, []string{
224                                         "-J", s.crMaxRunTime.ContainerUUID,
225                                         "-n", "1",
226                                         "-D", "257MB",
227                                         "-R", "rusage[mem=257MB:tmp=2304MB] span[hosts=1]",
228                                         "-R", "select[mem>=257MB]",
229                                         "-R", "select[tmp>=2304MB]",
230                                         "-R", "select[ncpus>=1]",
231                                         "-We", "8", // 124s + 5m overhead + roundup = 8m
232                                 })
233                                 mtx.Lock()
234                                 fakejobq[nextjobid] = args[1]
235                                 nextjobid++
236                                 mtx.Unlock()
237                         default:
238                                 c.Errorf("unexpected uuid passed to bsub: args %q", args)
239                                 return exec.Command("false")
240                         }
241                         return exec.Command("echo", "submitted job")
242                 case "bjobs":
243                         c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
244                         var records []map[string]interface{}
245                         for jobid, uuid := range fakejobq {
246                                 stat, reason := "RUN", ""
247                                 if uuid == s.crPending.ContainerUUID {
248                                         // The real bjobs output includes a trailing ';' here:
249                                         stat, reason = "PEND", "There are no suitable hosts for the job;"
250                                 }
251                                 records = append(records, map[string]interface{}{
252                                         "JOBID":       fmt.Sprintf("%d", jobid),
253                                         "STAT":        stat,
254                                         "JOB_NAME":    uuid,
255                                         "PEND_REASON": reason,
256                                 })
257                         }
258                         out, err := json.Marshal(map[string]interface{}{
259                                 "COMMAND": "bjobs",
260                                 "JOBS":    len(fakejobq),
261                                 "RECORDS": records,
262                         })
263                         if err != nil {
264                                 panic(err)
265                         }
266                         c.Logf("bjobs out: %s", out)
267                         return exec.Command("printf", string(out))
268                 case "bkill":
269                         killid, _ := strconv.Atoi(args[0])
270                         if uuid, ok := fakejobq[killid]; !ok {
271                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
272                         } else if uuid == "" {
273                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
274                         } else {
275                                 go func() {
276                                         time.Sleep(time.Millisecond)
277                                         mtx.Lock()
278                                         delete(fakejobq, killid)
279                                         mtx.Unlock()
280                                 }()
281                                 return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
282                         }
283                 default:
284                         return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
285                 }
286         }
287 }
288
289 func (s *suite) TestSubmit(c *check.C) {
290         s.disp.lsfcli.stubCommand = lsfstub{
291                 errorRate: 0.1,
292                 sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
293         }.stubCommand(s, c)
294         s.disp.Start()
295
296         deadline := time.Now().Add(20 * time.Second)
297         for range time.NewTicker(time.Second).C {
298                 if time.Now().After(deadline) {
299                         c.Error("timed out")
300                         break
301                 }
302                 // "crTooBig" should never be submitted to lsf because
303                 // it is bigger than any configured instance type
304                 if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
305                         c.Errorf("Lookup(crTooBig) == true, ent = %#v", ent)
306                         break
307                 }
308                 // "queuedcontainer" should be running
309                 if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
310                         c.Log("Lookup(queuedcontainer) == false")
311                         continue
312                 }
313                 // "crPending" should be pending
314                 if ent, ok := s.disp.lsfqueue.Lookup(s.crPending.ContainerUUID); !ok {
315                         c.Logf("Lookup(crPending) == false", ent)
316                         continue
317                 }
318                 // "lockedcontainer" should be cancelled because it
319                 // has priority 0 (no matching container requests)
320                 if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
321                         c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
322                         continue
323                 }
324                 var ctr arvados.Container
325                 if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
326                         c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
327                         continue
328                 } else if ctr.State != arvados.ContainerStateQueued {
329                         c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
330                         continue
331                 }
332
333                 if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
334                         c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
335                         continue
336                 } else if ctr.State != arvados.ContainerStateCancelled {
337                         c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
338                         continue
339                 } else {
340                         c.Check(ctr.RuntimeStatus["error"], check.Equals, "constraints not satisfiable by any configured instance type")
341                 }
342                 c.Log("reached desired state")
343                 break
344         }
345 }