19418: LSF: use InstanceTypes to detect unsatisfiable constraints.
[arvados.git] / lib / lsf / dispatch_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "math/rand"
12         "os/exec"
13         "strconv"
14         "sync"
15         "testing"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/prometheus/client_golang/prometheus"
23         "gopkg.in/check.v1"
24 )
25
26 func Test(t *testing.T) {
27         check.TestingT(t)
28 }
29
30 var _ = check.Suite(&suite{})
31
32 type suite struct {
33         disp          *dispatcher
34         crTooBig      arvados.ContainerRequest
35         crPending     arvados.ContainerRequest
36         crCUDARequest arvados.ContainerRequest
37 }
38
39 func (s *suite) TearDownTest(c *check.C) {
40         arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
41 }
42
43 func (s *suite) SetUpTest(c *check.C) {
44         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
45         c.Assert(err, check.IsNil)
46         cluster, err := cfg.GetCluster("")
47         c.Assert(err, check.IsNil)
48         cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
49         cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
50         cluster.InstanceTypes = arvados.InstanceTypeMap{
51                 "biggest_available_node": arvados.InstanceType{
52                         RAM:             100 << 30, // 100 GiB
53                         VCPUs:           4,
54                         IncludedScratch: 100 << 30,
55                         Scratch:         100 << 30,
56                 }}
57         s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
58         s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
59                 return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
60         }
61         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
62                 "container_request": map[string]interface{}{
63                         "runtime_constraints": arvados.RuntimeConstraints{
64                                 RAM:   1000000000000,
65                                 VCPUs: 1,
66                         },
67                         "container_image":     arvadostest.DockerImage112PDH,
68                         "command":             []string{"sleep", "1"},
69                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
70                         "output_path":         "/mnt/out",
71                         "state":               arvados.ContainerRequestStateCommitted,
72                         "priority":            1,
73                         "container_count_max": 1,
74                 },
75         })
76         c.Assert(err, check.IsNil)
77
78         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crPending, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
79                 "container_request": map[string]interface{}{
80                         "runtime_constraints": arvados.RuntimeConstraints{
81                                 RAM:   100000000,
82                                 VCPUs: 2,
83                         },
84                         "container_image":     arvadostest.DockerImage112PDH,
85                         "command":             []string{"sleep", "1"},
86                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
87                         "output_path":         "/mnt/out",
88                         "state":               arvados.ContainerRequestStateCommitted,
89                         "priority":            1,
90                         "container_count_max": 1,
91                 },
92         })
93         c.Assert(err, check.IsNil)
94
95         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crCUDARequest, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
96                 "container_request": map[string]interface{}{
97                         "runtime_constraints": arvados.RuntimeConstraints{
98                                 RAM:   16000000,
99                                 VCPUs: 1,
100                                 CUDA: arvados.CUDARuntimeConstraints{
101                                         DeviceCount:        1,
102                                         DriverVersion:      "11.0",
103                                         HardwareCapability: "8.0",
104                                 },
105                         },
106                         "container_image":     arvadostest.DockerImage112PDH,
107                         "command":             []string{"sleep", "1"},
108                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
109                         "output_path":         "/mnt/out",
110                         "state":               arvados.ContainerRequestStateCommitted,
111                         "priority":            1,
112                         "container_count_max": 1,
113                 },
114         })
115         c.Assert(err, check.IsNil)
116
117 }
118
119 type lsfstub struct {
120         sudoUser  string
121         errorRate float64
122 }
123
124 func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...string) *exec.Cmd {
125         mtx := sync.Mutex{}
126         nextjobid := 100
127         fakejobq := map[int]string{}
128         return func(prog string, args ...string) *exec.Cmd {
129                 c.Logf("stubCommand: %q %q", prog, args)
130                 if rand.Float64() < stub.errorRate {
131                         return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
132                 }
133                 if stub.sudoUser != "" && len(args) > 3 &&
134                         prog == "sudo" &&
135                         args[0] == "-E" &&
136                         args[1] == "-u" &&
137                         args[2] == stub.sudoUser {
138                         prog, args = args[3], args[4:]
139                 }
140                 switch prog {
141                 case "bsub":
142                         defaultArgs := s.disp.Cluster.Containers.LSF.BsubArgumentsList
143                         if args[5] == s.crCUDARequest.ContainerUUID {
144                                 c.Assert(len(args), check.Equals, len(defaultArgs)+len(s.disp.Cluster.Containers.LSF.BsubCUDAArguments))
145                         } else {
146                                 c.Assert(len(args), check.Equals, len(defaultArgs))
147                         }
148                         // %%J must have been rewritten to %J
149                         c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out")
150                         args = args[4:]
151                         switch args[1] {
152                         case arvadostest.LockedContainerUUID:
153                                 c.Check(args, check.DeepEquals, []string{
154                                         "-J", arvadostest.LockedContainerUUID,
155                                         "-n", "4",
156                                         "-D", "11701MB",
157                                         "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
158                                         "-R", "select[mem>=11701MB]",
159                                         "-R", "select[tmp>=0MB]",
160                                         "-R", "select[ncpus>=4]"})
161                                 mtx.Lock()
162                                 fakejobq[nextjobid] = args[1]
163                                 nextjobid++
164                                 mtx.Unlock()
165                         case arvadostest.QueuedContainerUUID:
166                                 c.Check(args, check.DeepEquals, []string{
167                                         "-J", arvadostest.QueuedContainerUUID,
168                                         "-n", "4",
169                                         "-D", "11701MB",
170                                         "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
171                                         "-R", "select[mem>=11701MB]",
172                                         "-R", "select[tmp>=45777MB]",
173                                         "-R", "select[ncpus>=4]"})
174                                 mtx.Lock()
175                                 fakejobq[nextjobid] = args[1]
176                                 nextjobid++
177                                 mtx.Unlock()
178                         case s.crPending.ContainerUUID:
179                                 c.Check(args, check.DeepEquals, []string{
180                                         "-J", s.crPending.ContainerUUID,
181                                         "-n", "2",
182                                         "-D", "608MB",
183                                         "-R", "rusage[mem=608MB:tmp=256MB] span[hosts=1]",
184                                         "-R", "select[mem>=608MB]",
185                                         "-R", "select[tmp>=256MB]",
186                                         "-R", "select[ncpus>=2]"})
187                                 mtx.Lock()
188                                 fakejobq[nextjobid] = args[1]
189                                 nextjobid++
190                                 mtx.Unlock()
191                         case s.crCUDARequest.ContainerUUID:
192                                 c.Check(args, check.DeepEquals, []string{
193                                         "-J", s.crCUDARequest.ContainerUUID,
194                                         "-n", "1",
195                                         "-D", "528MB",
196                                         "-R", "rusage[mem=528MB:tmp=256MB] span[hosts=1]",
197                                         "-R", "select[mem>=528MB]",
198                                         "-R", "select[tmp>=256MB]",
199                                         "-R", "select[ncpus>=1]",
200                                         "-gpu", "num=1"})
201                                 mtx.Lock()
202                                 fakejobq[nextjobid] = args[1]
203                                 nextjobid++
204                                 mtx.Unlock()
205                         default:
206                                 c.Errorf("unexpected uuid passed to bsub: args %q", args)
207                                 return exec.Command("false")
208                         }
209                         return exec.Command("echo", "submitted job")
210                 case "bjobs":
211                         c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
212                         var records []map[string]interface{}
213                         for jobid, uuid := range fakejobq {
214                                 stat, reason := "RUN", ""
215                                 if uuid == s.crPending.ContainerUUID {
216                                         // The real bjobs output includes a trailing ';' here:
217                                         stat, reason = "PEND", "There are no suitable hosts for the job;"
218                                 }
219                                 records = append(records, map[string]interface{}{
220                                         "JOBID":       fmt.Sprintf("%d", jobid),
221                                         "STAT":        stat,
222                                         "JOB_NAME":    uuid,
223                                         "PEND_REASON": reason,
224                                 })
225                         }
226                         out, err := json.Marshal(map[string]interface{}{
227                                 "COMMAND": "bjobs",
228                                 "JOBS":    len(fakejobq),
229                                 "RECORDS": records,
230                         })
231                         if err != nil {
232                                 panic(err)
233                         }
234                         c.Logf("bjobs out: %s", out)
235                         return exec.Command("printf", string(out))
236                 case "bkill":
237                         killid, _ := strconv.Atoi(args[0])
238                         if uuid, ok := fakejobq[killid]; !ok {
239                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
240                         } else if uuid == "" {
241                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
242                         } else {
243                                 go func() {
244                                         time.Sleep(time.Millisecond)
245                                         mtx.Lock()
246                                         delete(fakejobq, killid)
247                                         mtx.Unlock()
248                                 }()
249                                 return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
250                         }
251                 default:
252                         return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
253                 }
254         }
255 }
256
257 func (s *suite) TestSubmit(c *check.C) {
258         s.disp.lsfcli.stubCommand = lsfstub{
259                 errorRate: 0.1,
260                 sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
261         }.stubCommand(s, c)
262         s.disp.Start()
263
264         deadline := time.Now().Add(20 * time.Second)
265         for range time.NewTicker(time.Second).C {
266                 if time.Now().After(deadline) {
267                         c.Error("timed out")
268                         break
269                 }
270                 // "crTooBig" should never be submitted to lsf because
271                 // it is bigger than any configured instance type
272                 if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
273                         c.Errorf("Lookup(crTooBig) == true, ent = %#v", ent)
274                         break
275                 }
276                 // "queuedcontainer" should be running
277                 if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
278                         c.Log("Lookup(queuedcontainer) == false")
279                         continue
280                 }
281                 // "crPending" should be pending
282                 if ent, ok := s.disp.lsfqueue.Lookup(s.crPending.ContainerUUID); !ok {
283                         c.Logf("Lookup(crPending) == false", ent)
284                         continue
285                 }
286                 // "lockedcontainer" should be cancelled because it
287                 // has priority 0 (no matching container requests)
288                 if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
289                         c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
290                         continue
291                 }
292                 var ctr arvados.Container
293                 if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
294                         c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
295                         continue
296                 } else if ctr.State != arvados.ContainerStateQueued {
297                         c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
298                         continue
299                 }
300
301                 if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
302                         c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
303                         continue
304                 } else if ctr.State != arvados.ContainerStateCancelled {
305                         c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
306                         continue
307                 } else {
308                         c.Check(ctr.RuntimeStatus["error"], check.Equals, "constraints not satisfiable by any configured instance type")
309                 }
310                 c.Log("reached desired state")
311                 break
312         }
313 }