]> git.arvados.org - arvados.git/blob - lib/lsf/dispatch_test.go
23044: De-dup ContainerWebServices routing logic.
[arvados.git] / lib / lsf / dispatch_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "math/rand"
12         "os/exec"
13         "strconv"
14         "sync"
15         "testing"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/prometheus/client_golang/prometheus"
23         "gopkg.in/check.v1"
24 )
25
26 func Test(t *testing.T) {
27         check.TestingT(t)
28 }
29
30 var _ = check.Suite(&suite{})
31
32 type suite struct {
33         disp          *dispatcher
34         crTooBig      arvados.ContainerRequest
35         crPending     arvados.ContainerRequest
36         crCUDARequest arvados.ContainerRequest
37         crMaxRunTime  arvados.ContainerRequest
38 }
39
40 func (s *suite) TearDownTest(c *check.C) {
41         arvadostest.ResetDB(c)
42 }
43
44 func (s *suite) SetUpTest(c *check.C) {
45         arvadostest.ResetDB(c)
46
47         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
48         c.Assert(err, check.IsNil)
49         cluster, err := cfg.GetCluster("")
50         c.Assert(err, check.IsNil)
51         cluster.Containers.ReserveExtraRAM = 256 << 20
52         cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
53         cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
54         cluster.InstanceTypes = arvados.InstanceTypeMap{
55                 "biggest_available_node": arvados.InstanceType{
56                         RAM:             100 << 30, // 100 GiB
57                         VCPUs:           4,
58                         IncludedScratch: 100 << 30,
59                         Scratch:         100 << 30,
60                 },
61                 "biggest_available_node_with_gpu": arvados.InstanceType{
62                         RAM:             100 << 30, // 100 GiB
63                         VCPUs:           4,
64                         IncludedScratch: 100 << 30,
65                         Scratch:         100 << 30,
66                         GPU: arvados.GPUFeatures{
67                                 Stack:          "cuda",
68                                 DriverVersion:  "11.0",
69                                 HardwareTarget: "8.0",
70                                 DeviceCount:    2,
71                                 VRAM:           8000000000,
72                         },
73                 }}
74         s.disp = newHandler(context.Background(), cluster, arvadostest.SystemRootToken, prometheus.NewRegistry()).(*dispatcher)
75         s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
76                 return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
77         }
78         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
79                 "container_request": map[string]interface{}{
80                         "runtime_constraints": arvados.RuntimeConstraints{
81                                 RAM:   1000000000000,
82                                 VCPUs: 1,
83                         },
84                         "container_image":     arvadostest.DockerImage112PDH,
85                         "command":             []string{"sleep", "1"},
86                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
87                         "output_path":         "/mnt/out",
88                         "state":               arvados.ContainerRequestStateCommitted,
89                         "priority":            1,
90                         "container_count_max": 1,
91                 },
92         })
93         c.Assert(err, check.IsNil)
94
95         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crPending, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
96                 "container_request": map[string]interface{}{
97                         "runtime_constraints": arvados.RuntimeConstraints{
98                                 RAM:           100000000,
99                                 VCPUs:         2,
100                                 KeepCacheDisk: 8 << 30,
101                         },
102                         "container_image":     arvadostest.DockerImage112PDH,
103                         "command":             []string{"sleep", "1"},
104                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
105                         "output_path":         "/mnt/out",
106                         "state":               arvados.ContainerRequestStateCommitted,
107                         "priority":            1,
108                         "container_count_max": 1,
109                 },
110         })
111         c.Assert(err, check.IsNil)
112
113         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crCUDARequest, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
114                 "container_request": map[string]interface{}{
115                         "runtime_constraints": arvados.RuntimeConstraints{
116                                 RAM:   16000000000,
117                                 VCPUs: 4,
118                                 GPU: arvados.GPURuntimeConstraints{
119                                         Stack:          "cuda",
120                                         DeviceCount:    1,
121                                         DriverVersion:  "11.0",
122                                         HardwareTarget: []string{"8.0"},
123                                         VRAM:           8000000000,
124                                 },
125                         },
126                         "container_image":     arvadostest.DockerImage112PDH,
127                         "command":             []string{"sleep", "1"},
128                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
129                         "output_path":         "/mnt/out",
130                         "state":               arvados.ContainerRequestStateCommitted,
131                         "priority":            1,
132                         "container_count_max": 1,
133                 },
134         })
135         c.Assert(err, check.IsNil)
136
137         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crMaxRunTime, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
138                 "container_request": map[string]interface{}{
139                         "runtime_constraints": arvados.RuntimeConstraints{
140                                 RAM:   1000000,
141                                 VCPUs: 1,
142                         },
143                         "scheduling_parameters": arvados.SchedulingParameters{
144                                 MaxRunTime: 124,
145                         },
146                         "container_image":     arvadostest.DockerImage112PDH,
147                         "command":             []string{"sleep", "123"},
148                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
149                         "output_path":         "/mnt/out",
150                         "state":               arvados.ContainerRequestStateCommitted,
151                         "priority":            1,
152                         "container_count_max": 1,
153                 },
154         })
155         c.Assert(err, check.IsNil)
156 }
157
158 type lsfstub struct {
159         sudoUser  string
160         errorRate float64
161 }
162
163 func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...string) *exec.Cmd {
164         mtx := sync.Mutex{}
165         nextjobid := 100
166         fakejobq := map[int]string{}
167         return func(prog string, args ...string) *exec.Cmd {
168                 c.Logf("stubCommand: %q %q", prog, args)
169                 if rand.Float64() < stub.errorRate {
170                         return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
171                 }
172                 if stub.sudoUser != "" && len(args) > 3 &&
173                         prog == "sudo" &&
174                         args[0] == "-E" &&
175                         args[1] == "-u" &&
176                         args[2] == stub.sudoUser {
177                         prog, args = args[3], args[4:]
178                 }
179                 switch prog {
180                 case "bsub":
181                         c.Assert(len(args) > 5, check.Equals, true)
182                         // %%J must have been rewritten to %J
183                         c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out")
184                         args = args[4:]
185                         switch args[1] {
186                         case arvadostest.LockedContainerUUID:
187                                 c.Check(args, check.DeepEquals, []string{
188                                         "-J", arvadostest.LockedContainerUUID,
189                                         "-n", "4",
190                                         "-D", "11701MB",
191                                         "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
192                                         "-R", "select[mem>=11701MB]",
193                                         "-R", "select[tmp>=0MB]",
194                                         "-R", "select[ncpus>=4]"})
195                                 mtx.Lock()
196                                 fakejobq[nextjobid] = args[1]
197                                 nextjobid++
198                                 mtx.Unlock()
199                         case arvadostest.QueuedContainerUUID:
200                                 c.Check(args, check.DeepEquals, []string{
201                                         "-J", arvadostest.QueuedContainerUUID,
202                                         "-n", "4",
203                                         "-D", "11701MB",
204                                         "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
205                                         "-R", "select[mem>=11701MB]",
206                                         "-R", "select[tmp>=45777MB]",
207                                         "-R", "select[ncpus>=4]"})
208                                 mtx.Lock()
209                                 fakejobq[nextjobid] = args[1]
210                                 nextjobid++
211                                 mtx.Unlock()
212                         case s.crPending.ContainerUUID:
213                                 c.Check(args, check.DeepEquals, []string{
214                                         "-J", s.crPending.ContainerUUID,
215                                         "-n", "2",
216                                         "-D", "352MB",
217                                         "-R", "rusage[mem=352MB:tmp=8448MB] span[hosts=1]",
218                                         "-R", "select[mem>=352MB]",
219                                         "-R", "select[tmp>=8448MB]",
220                                         "-R", "select[ncpus>=2]"})
221                                 mtx.Lock()
222                                 fakejobq[nextjobid] = args[1]
223                                 nextjobid++
224                                 mtx.Unlock()
225                         case s.crCUDARequest.ContainerUUID:
226                                 c.Check(args, check.DeepEquals, []string{
227                                         "-J", s.crCUDARequest.ContainerUUID,
228                                         "-n", "4",
229                                         "-D", "15515MB",
230                                         "-R", "rusage[mem=15515MB:tmp=15515MB] span[hosts=1]",
231                                         "-R", "select[mem>=15515MB]",
232                                         "-R", "select[tmp>=15515MB]",
233                                         "-R", "select[ncpus>=4]",
234                                         "-gpu", "num=1"})
235                                 mtx.Lock()
236                                 fakejobq[nextjobid] = args[1]
237                                 nextjobid++
238                                 mtx.Unlock()
239                         case s.crMaxRunTime.ContainerUUID:
240                                 c.Check(args, check.DeepEquals, []string{
241                                         "-J", s.crMaxRunTime.ContainerUUID,
242                                         "-n", "1",
243                                         "-D", "257MB",
244                                         "-R", "rusage[mem=257MB:tmp=2304MB] span[hosts=1]",
245                                         "-R", "select[mem>=257MB]",
246                                         "-R", "select[tmp>=2304MB]",
247                                         "-R", "select[ncpus>=1]",
248                                         "-We", "8", // 124s + 5m overhead + roundup = 8m
249                                 })
250                                 mtx.Lock()
251                                 fakejobq[nextjobid] = args[1]
252                                 nextjobid++
253                                 mtx.Unlock()
254                         default:
255                                 c.Errorf("unexpected uuid passed to bsub: args %q", args)
256                                 return exec.Command("false")
257                         }
258                         return exec.Command("echo", "submitted job")
259                 case "bjobs":
260                         c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
261                         var records []map[string]interface{}
262                         for jobid, uuid := range fakejobq {
263                                 stat, reason := "RUN", ""
264                                 if uuid == s.crPending.ContainerUUID {
265                                         // The real bjobs output includes a trailing ';' here:
266                                         stat, reason = "PEND", "There are no suitable hosts for the job;"
267                                 }
268                                 records = append(records, map[string]interface{}{
269                                         "JOBID":       fmt.Sprintf("%d", jobid),
270                                         "STAT":        stat,
271                                         "JOB_NAME":    uuid,
272                                         "PEND_REASON": reason,
273                                 })
274                         }
275                         out, err := json.Marshal(map[string]interface{}{
276                                 "COMMAND": "bjobs",
277                                 "JOBS":    len(fakejobq),
278                                 "RECORDS": records,
279                         })
280                         if err != nil {
281                                 panic(err)
282                         }
283                         c.Logf("bjobs out: %s", out)
284                         return exec.Command("printf", string(out))
285                 case "bkill":
286                         killid, _ := strconv.Atoi(args[0])
287                         if uuid, ok := fakejobq[killid]; !ok {
288                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
289                         } else if uuid == "" {
290                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
291                         } else {
292                                 go func() {
293                                         time.Sleep(time.Millisecond)
294                                         mtx.Lock()
295                                         delete(fakejobq, killid)
296                                         mtx.Unlock()
297                                 }()
298                                 return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
299                         }
300                 default:
301                         return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
302                 }
303         }
304 }
305
306 func (s *suite) TestSubmit(c *check.C) {
307         s.disp.lsfcli.stubCommand = lsfstub{
308                 errorRate: 0.1,
309                 sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
310         }.stubCommand(s, c)
311         s.disp.Start()
312
313         deadline := time.Now().Add(20 * time.Second)
314         for range time.NewTicker(time.Second).C {
315                 if time.Now().After(deadline) {
316                         c.Error("timed out")
317                         break
318                 }
319                 // "crTooBig" should never be submitted to lsf because
320                 // it is bigger than any configured instance type
321                 if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
322                         c.Errorf("Lookup(crTooBig) == true, ent = %#v", ent)
323                         break
324                 }
325                 // "queuedcontainer" should be running
326                 if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
327                         c.Log("Lookup(queuedcontainer) == false")
328                         continue
329                 }
330                 // "crPending" should be pending
331                 if ent, ok := s.disp.lsfqueue.Lookup(s.crPending.ContainerUUID); !ok {
332                         c.Logf("Lookup(crPending) == false", ent)
333                         continue
334                 }
335                 // "lockedcontainer" should be cancelled because it
336                 // has priority 0 (no matching container requests)
337                 if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
338                         c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
339                         continue
340                 }
341                 var ctr arvados.Container
342                 if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
343                         c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
344                         continue
345                 } else if ctr.State != arvados.ContainerStateQueued {
346                         c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
347                         continue
348                 }
349
350                 if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
351                         c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
352                         continue
353                 } else if ctr.State != arvados.ContainerStateCancelled {
354                         c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
355                         continue
356                 } else {
357                         c.Check(ctr.RuntimeStatus["error"], check.Equals, "constraints not satisfiable by any configured instance type")
358                 }
359                 c.Log("reached desired state")
360                 break
361         }
362 }