15370: Re-enable docker tests.
[arvados.git] / lib / lsf / dispatch_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "math/rand"
12         "os/exec"
13         "strconv"
14         "sync"
15         "testing"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/prometheus/client_golang/prometheus"
23         "gopkg.in/check.v1"
24 )
25
26 func Test(t *testing.T) {
27         check.TestingT(t)
28 }
29
30 var _ = check.Suite(&suite{})
31
32 type suite struct {
33         disp          *dispatcher
34         crTooBig      arvados.ContainerRequest
35         crCUDARequest arvados.ContainerRequest
36 }
37
38 func (s *suite) TearDownTest(c *check.C) {
39         arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
40 }
41
42 func (s *suite) SetUpTest(c *check.C) {
43         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
44         c.Assert(err, check.IsNil)
45         cluster, err := cfg.GetCluster("")
46         c.Assert(err, check.IsNil)
47         cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
48         cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
49         s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
50         s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
51                 return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
52         }
53         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
54                 "container_request": map[string]interface{}{
55                         "runtime_constraints": arvados.RuntimeConstraints{
56                                 RAM:   1000000000000,
57                                 VCPUs: 1,
58                         },
59                         "container_image":     arvadostest.DockerImage112PDH,
60                         "command":             []string{"sleep", "1"},
61                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
62                         "output_path":         "/mnt/out",
63                         "state":               arvados.ContainerRequestStateCommitted,
64                         "priority":            1,
65                         "container_count_max": 1,
66                 },
67         })
68         c.Assert(err, check.IsNil)
69
70         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crCUDARequest, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
71                 "container_request": map[string]interface{}{
72                         "runtime_constraints": arvados.RuntimeConstraints{
73                                 RAM:   16000000,
74                                 VCPUs: 1,
75                                 CUDA: arvados.CUDARuntimeConstraints{
76                                         DeviceCount:        1,
77                                         DriverVersion:      "11.0",
78                                         HardwareCapability: "8.0",
79                                 },
80                         },
81                         "container_image":     arvadostest.DockerImage112PDH,
82                         "command":             []string{"sleep", "1"},
83                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
84                         "output_path":         "/mnt/out",
85                         "state":               arvados.ContainerRequestStateCommitted,
86                         "priority":            1,
87                         "container_count_max": 1,
88                 },
89         })
90         c.Assert(err, check.IsNil)
91
92 }
93
94 type lsfstub struct {
95         sudoUser  string
96         errorRate float64
97 }
98
99 func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...string) *exec.Cmd {
100         mtx := sync.Mutex{}
101         nextjobid := 100
102         fakejobq := map[int]string{}
103         return func(prog string, args ...string) *exec.Cmd {
104                 c.Logf("stubCommand: %q %q", prog, args)
105                 if rand.Float64() < stub.errorRate {
106                         return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
107                 }
108                 if stub.sudoUser != "" && len(args) > 3 &&
109                         prog == "sudo" &&
110                         args[0] == "-E" &&
111                         args[1] == "-u" &&
112                         args[2] == stub.sudoUser {
113                         prog, args = args[3], args[4:]
114                 }
115                 switch prog {
116                 case "bsub":
117                         defaultArgs := s.disp.Cluster.Containers.LSF.BsubArgumentsList
118                         if args[5] == s.crCUDARequest.ContainerUUID {
119                                 c.Assert(len(args), check.Equals, len(defaultArgs)+len(s.disp.Cluster.Containers.LSF.BsubCUDAArguments))
120                         } else {
121                                 c.Assert(len(args), check.Equals, len(defaultArgs))
122                         }
123                         // %%J must have been rewritten to %J
124                         c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out")
125                         args = args[4:]
126                         switch args[1] {
127                         case arvadostest.LockedContainerUUID:
128                                 c.Check(args, check.DeepEquals, []string{
129                                         "-J", arvadostest.LockedContainerUUID,
130                                         "-n", "4",
131                                         "-D", "11701MB",
132                                         "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
133                                         "-R", "select[mem>=11701MB]",
134                                         "-R", "select[tmp>=0MB]",
135                                         "-R", "select[ncpus>=4]"})
136                                 mtx.Lock()
137                                 fakejobq[nextjobid] = args[1]
138                                 nextjobid++
139                                 mtx.Unlock()
140                         case arvadostest.QueuedContainerUUID:
141                                 c.Check(args, check.DeepEquals, []string{
142                                         "-J", arvadostest.QueuedContainerUUID,
143                                         "-n", "4",
144                                         "-D", "11701MB",
145                                         "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
146                                         "-R", "select[mem>=11701MB]",
147                                         "-R", "select[tmp>=45777MB]",
148                                         "-R", "select[ncpus>=4]"})
149                                 mtx.Lock()
150                                 fakejobq[nextjobid] = args[1]
151                                 nextjobid++
152                                 mtx.Unlock()
153                         case s.crTooBig.ContainerUUID:
154                                 c.Check(args, check.DeepEquals, []string{
155                                         "-J", s.crTooBig.ContainerUUID,
156                                         "-n", "1",
157                                         "-D", "954187MB",
158                                         "-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]",
159                                         "-R", "select[mem>=954187MB]",
160                                         "-R", "select[tmp>=256MB]",
161                                         "-R", "select[ncpus>=1]"})
162                                 mtx.Lock()
163                                 fakejobq[nextjobid] = args[1]
164                                 nextjobid++
165                                 mtx.Unlock()
166                         case s.crCUDARequest.ContainerUUID:
167                                 c.Check(args, check.DeepEquals, []string{
168                                         "-J", s.crCUDARequest.ContainerUUID,
169                                         "-n", "1",
170                                         "-D", "528MB",
171                                         "-R", "rusage[mem=528MB:tmp=256MB] span[hosts=1]",
172                                         "-R", "select[mem>=528MB]",
173                                         "-R", "select[tmp>=256MB]",
174                                         "-R", "select[ncpus>=1]",
175                                         "-gpu", "num=1"})
176                                 mtx.Lock()
177                                 fakejobq[nextjobid] = args[1]
178                                 nextjobid++
179                                 mtx.Unlock()
180                         default:
181                                 c.Errorf("unexpected uuid passed to bsub: args %q", args)
182                                 return exec.Command("false")
183                         }
184                         return exec.Command("echo", "submitted job")
185                 case "bjobs":
186                         c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
187                         var records []map[string]interface{}
188                         for jobid, uuid := range fakejobq {
189                                 stat, reason := "RUN", ""
190                                 if uuid == s.crTooBig.ContainerUUID {
191                                         // The real bjobs output includes a trailing ';' here:
192                                         stat, reason = "PEND", "There are no suitable hosts for the job;"
193                                 }
194                                 records = append(records, map[string]interface{}{
195                                         "JOBID":       fmt.Sprintf("%d", jobid),
196                                         "STAT":        stat,
197                                         "JOB_NAME":    uuid,
198                                         "PEND_REASON": reason,
199                                 })
200                         }
201                         out, err := json.Marshal(map[string]interface{}{
202                                 "COMMAND": "bjobs",
203                                 "JOBS":    len(fakejobq),
204                                 "RECORDS": records,
205                         })
206                         if err != nil {
207                                 panic(err)
208                         }
209                         c.Logf("bjobs out: %s", out)
210                         return exec.Command("printf", string(out))
211                 case "bkill":
212                         killid, _ := strconv.Atoi(args[0])
213                         if uuid, ok := fakejobq[killid]; !ok {
214                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
215                         } else if uuid == "" {
216                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
217                         } else {
218                                 go func() {
219                                         time.Sleep(time.Millisecond)
220                                         mtx.Lock()
221                                         delete(fakejobq, killid)
222                                         mtx.Unlock()
223                                 }()
224                                 return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
225                         }
226                 default:
227                         return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
228                 }
229         }
230 }
231
232 func (s *suite) TestSubmit(c *check.C) {
233         s.disp.lsfcli.stubCommand = lsfstub{
234                 errorRate: 0.1,
235                 sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
236         }.stubCommand(s, c)
237         s.disp.Start()
238
239         deadline := time.Now().Add(20 * time.Second)
240         for range time.NewTicker(time.Second).C {
241                 if time.Now().After(deadline) {
242                         c.Error("timed out")
243                         break
244                 }
245                 // "queuedcontainer" should be running
246                 if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
247                         c.Log("Lookup(queuedcontainer) == false")
248                         continue
249                 }
250                 // "lockedcontainer" should be cancelled because it
251                 // has priority 0 (no matching container requests)
252                 if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
253                         c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
254                         continue
255                 }
256                 // "crTooBig" should be cancelled because lsf stub
257                 // reports there is no suitable instance type
258                 if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
259                         c.Logf("Lookup(crTooBig) == true, ent = %#v", ent)
260                         continue
261                 }
262                 var ctr arvados.Container
263                 if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
264                         c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
265                         continue
266                 } else if ctr.State != arvados.ContainerStateQueued {
267                         c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
268                         continue
269                 }
270
271                 if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
272                         c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
273                         continue
274                 } else if ctr.State != arvados.ContainerStateCancelled {
275                         c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
276                         continue
277                 } else {
278                         c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;")
279                 }
280                 c.Log("reached desired state")
281                 break
282         }
283 }