18308: Merge branch 'main' into 18308-enable-collection-versioning
[arvados.git] / lib / lsf / dispatch_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "math/rand"
12         "os/exec"
13         "strconv"
14         "sync"
15         "testing"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/prometheus/client_golang/prometheus"
23         "gopkg.in/check.v1"
24 )
25
26 func Test(t *testing.T) {
27         check.TestingT(t)
28 }
29
30 var _ = check.Suite(&suite{})
31
32 type suite struct {
33         disp     *dispatcher
34         crTooBig arvados.ContainerRequest
35 }
36
37 func (s *suite) TearDownTest(c *check.C) {
38         arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
39 }
40
41 func (s *suite) SetUpTest(c *check.C) {
42         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
43         c.Assert(err, check.IsNil)
44         cluster, err := cfg.GetCluster("")
45         c.Assert(err, check.IsNil)
46         cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
47         s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
48         s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
49                 return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
50         }
51         err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
52                 "container_request": map[string]interface{}{
53                         "runtime_constraints": arvados.RuntimeConstraints{
54                                 RAM:   1000000000000,
55                                 VCPUs: 1,
56                         },
57                         "container_image":     arvadostest.DockerImage112PDH,
58                         "command":             []string{"sleep", "1"},
59                         "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
60                         "output_path":         "/mnt/out",
61                         "state":               arvados.ContainerRequestStateCommitted,
62                         "priority":            1,
63                         "container_count_max": 1,
64                 },
65         })
66         c.Assert(err, check.IsNil)
67 }
68
69 type lsfstub struct {
70         sudoUser  string
71         errorRate float64
72 }
73
74 func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...string) *exec.Cmd {
75         mtx := sync.Mutex{}
76         nextjobid := 100
77         fakejobq := map[int]string{}
78         return func(prog string, args ...string) *exec.Cmd {
79                 c.Logf("stubCommand: %q %q", prog, args)
80                 if rand.Float64() < stub.errorRate {
81                         return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
82                 }
83                 if stub.sudoUser != "" && len(args) > 3 &&
84                         prog == "sudo" &&
85                         args[0] == "-E" &&
86                         args[1] == "-u" &&
87                         args[2] == stub.sudoUser {
88                         prog, args = args[3], args[4:]
89                 }
90                 switch prog {
91                 case "bsub":
92                         defaultArgs := s.disp.Cluster.Containers.LSF.BsubArgumentsList
93                         c.Assert(len(args), check.Equals, len(defaultArgs))
94                         // %%J must have been rewritten to %J
95                         c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out")
96                         args = args[4:]
97                         switch args[1] {
98                         case arvadostest.LockedContainerUUID:
99                                 c.Check(args, check.DeepEquals, []string{
100                                         "-J", arvadostest.LockedContainerUUID,
101                                         "-n", "4",
102                                         "-D", "11701MB",
103                                         "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
104                                         "-R", "select[mem>=11701MB]",
105                                         "-R", "select[tmp>=0MB]",
106                                         "-R", "select[ncpus>=4]"})
107                                 mtx.Lock()
108                                 fakejobq[nextjobid] = args[1]
109                                 nextjobid++
110                                 mtx.Unlock()
111                         case arvadostest.QueuedContainerUUID:
112                                 c.Check(args, check.DeepEquals, []string{
113                                         "-J", arvadostest.QueuedContainerUUID,
114                                         "-n", "4",
115                                         "-D", "11701MB",
116                                         "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
117                                         "-R", "select[mem>=11701MB]",
118                                         "-R", "select[tmp>=45777MB]",
119                                         "-R", "select[ncpus>=4]"})
120                                 mtx.Lock()
121                                 fakejobq[nextjobid] = args[1]
122                                 nextjobid++
123                                 mtx.Unlock()
124                         case s.crTooBig.ContainerUUID:
125                                 c.Check(args, check.DeepEquals, []string{
126                                         "-J", s.crTooBig.ContainerUUID,
127                                         "-n", "1",
128                                         "-D", "954187MB",
129                                         "-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]",
130                                         "-R", "select[mem>=954187MB]",
131                                         "-R", "select[tmp>=256MB]",
132                                         "-R", "select[ncpus>=1]"})
133                                 mtx.Lock()
134                                 fakejobq[nextjobid] = args[1]
135                                 nextjobid++
136                                 mtx.Unlock()
137                         default:
138                                 c.Errorf("unexpected uuid passed to bsub: args %q", args)
139                                 return exec.Command("false")
140                         }
141                         return exec.Command("echo", "submitted job")
142                 case "bjobs":
143                         c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
144                         var records []map[string]interface{}
145                         for jobid, uuid := range fakejobq {
146                                 stat, reason := "RUN", ""
147                                 if uuid == s.crTooBig.ContainerUUID {
148                                         // The real bjobs output includes a trailing ';' here:
149                                         stat, reason = "PEND", "There are no suitable hosts for the job;"
150                                 }
151                                 records = append(records, map[string]interface{}{
152                                         "JOBID":       fmt.Sprintf("%d", jobid),
153                                         "STAT":        stat,
154                                         "JOB_NAME":    uuid,
155                                         "PEND_REASON": reason,
156                                 })
157                         }
158                         out, err := json.Marshal(map[string]interface{}{
159                                 "COMMAND": "bjobs",
160                                 "JOBS":    len(fakejobq),
161                                 "RECORDS": records,
162                         })
163                         if err != nil {
164                                 panic(err)
165                         }
166                         c.Logf("bjobs out: %s", out)
167                         return exec.Command("printf", string(out))
168                 case "bkill":
169                         killid, _ := strconv.Atoi(args[0])
170                         if uuid, ok := fakejobq[killid]; !ok {
171                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
172                         } else if uuid == "" {
173                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
174                         } else {
175                                 go func() {
176                                         time.Sleep(time.Millisecond)
177                                         mtx.Lock()
178                                         delete(fakejobq, killid)
179                                         mtx.Unlock()
180                                 }()
181                                 return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
182                         }
183                 default:
184                         return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
185                 }
186         }
187 }
188
189 func (s *suite) TestSubmit(c *check.C) {
190         s.disp.lsfcli.stubCommand = lsfstub{
191                 errorRate: 0.1,
192                 sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
193         }.stubCommand(s, c)
194         s.disp.Start()
195
196         deadline := time.Now().Add(20 * time.Second)
197         for range time.NewTicker(time.Second).C {
198                 if time.Now().After(deadline) {
199                         c.Error("timed out")
200                         break
201                 }
202                 // "queuedcontainer" should be running
203                 if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
204                         continue
205                 }
206                 // "lockedcontainer" should be cancelled because it
207                 // has priority 0 (no matching container requests)
208                 if _, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
209                         continue
210                 }
211                 // "crTooBig" should be cancelled because lsf stub
212                 // reports there is no suitable instance type
213                 if _, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
214                         continue
215                 }
216                 var ctr arvados.Container
217                 if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
218                         c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
219                         continue
220                 } else if ctr.State != arvados.ContainerStateQueued {
221                         c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
222                         continue
223                 }
224
225                 if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
226                         c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
227                         continue
228                 } else if ctr.State != arvados.ContainerStateCancelled {
229                         c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
230                         continue
231                 } else {
232                         c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;")
233                 }
234                 c.Log("reached desired state")
235                 break
236         }
237 }