18298: Request pend_reason field when polling bjobs.
[arvados.git] / lib / lsf / dispatch_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "math/rand"
12         "os/exec"
13         "strconv"
14         "sync"
15         "testing"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadostest"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/prometheus/client_golang/prometheus"
23         "gopkg.in/check.v1"
24 )
25
26 func Test(t *testing.T) {
27         check.TestingT(t)
28 }
29
30 var _ = check.Suite(&suite{})
31
32 type suite struct {
33         disp *dispatcher
34 }
35
36 func (s *suite) TearDownTest(c *check.C) {
37         arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
38 }
39
40 func (s *suite) SetUpTest(c *check.C) {
41         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
42         c.Assert(err, check.IsNil)
43         cluster, err := cfg.GetCluster("")
44         c.Assert(err, check.IsNil)
45         cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
46         s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
47         s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
48                 return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
49         }
50 }
51
52 type lsfstub struct {
53         sudoUser  string
54         errorRate float64
55 }
56
57 func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...string) *exec.Cmd {
58         mtx := sync.Mutex{}
59         nextjobid := 100
60         fakejobq := map[int]string{}
61         return func(prog string, args ...string) *exec.Cmd {
62                 c.Logf("stubCommand: %q %q", prog, args)
63                 if rand.Float64() < stub.errorRate {
64                         return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
65                 }
66                 if stub.sudoUser != "" && len(args) > 3 &&
67                         prog == "sudo" &&
68                         args[0] == "-E" &&
69                         args[1] == "-u" &&
70                         args[2] == stub.sudoUser {
71                         prog, args = args[3], args[4:]
72                 }
73                 switch prog {
74                 case "bsub":
75                         defaultArgs := s.disp.Cluster.Containers.LSF.BsubArgumentsList
76                         c.Assert(len(args), check.Equals, len(defaultArgs))
77                         // %%J must have been rewritten to %J
78                         c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out")
79                         args = args[4:]
80                         switch args[1] {
81                         case arvadostest.LockedContainerUUID:
82                                 c.Check(args, check.DeepEquals, []string{
83                                         "-J", arvadostest.LockedContainerUUID,
84                                         "-n", "4",
85                                         "-D", "11701MB",
86                                         "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]"})
87                                 mtx.Lock()
88                                 fakejobq[nextjobid] = args[1]
89                                 nextjobid++
90                                 mtx.Unlock()
91                         case arvadostest.QueuedContainerUUID:
92                                 c.Check(args, check.DeepEquals, []string{
93                                         "-J", arvadostest.QueuedContainerUUID,
94                                         "-n", "4",
95                                         "-D", "11701MB",
96                                         "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]"})
97                                 mtx.Lock()
98                                 fakejobq[nextjobid] = args[1]
99                                 nextjobid++
100                                 mtx.Unlock()
101                         default:
102                                 c.Errorf("unexpected uuid passed to bsub: args %q", args)
103                                 return exec.Command("false")
104                         }
105                         return exec.Command("echo", "submitted job")
106                 case "bjobs":
107                         c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
108                         var records []map[string]interface{}
109                         for jobid, uuid := range fakejobq {
110                                 records = append(records, map[string]interface{}{
111                                         "JOBID":       fmt.Sprintf("%d", jobid),
112                                         "STAT":        "RUN",
113                                         "JOB_NAME":    uuid,
114                                         "PEND_REASON": "",
115                                 })
116                         }
117                         out, err := json.Marshal(map[string]interface{}{
118                                 "COMMAND": "bjobs",
119                                 "JOBS":    len(fakejobq),
120                                 "RECORDS": records,
121                         })
122                         if err != nil {
123                                 panic(err)
124                         }
125                         c.Logf("bjobs out: %s", out)
126                         return exec.Command("printf", string(out))
127                 case "bkill":
128                         killid, _ := strconv.Atoi(args[0])
129                         if uuid, ok := fakejobq[killid]; !ok {
130                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
131                         } else if uuid == "" {
132                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
133                         } else {
134                                 go func() {
135                                         time.Sleep(time.Millisecond)
136                                         mtx.Lock()
137                                         delete(fakejobq, killid)
138                                         mtx.Unlock()
139                                 }()
140                                 return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
141                         }
142                 default:
143                         return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
144                 }
145         }
146 }
147
148 func (s *suite) TestSubmit(c *check.C) {
149         s.disp.lsfcli.stubCommand = lsfstub{
150                 errorRate: 0.1,
151                 sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
152         }.stubCommand(s, c)
153         s.disp.Start()
154         deadline := time.Now().Add(20 * time.Second)
155         for range time.NewTicker(time.Second).C {
156                 if time.Now().After(deadline) {
157                         c.Error("timed out")
158                         break
159                 }
160                 // "queuedcontainer" should be running
161                 if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
162                         continue
163                 }
164                 // "lockedcontainer" should be cancelled because it
165                 // has priority 0 (no matching container requests)
166                 if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
167                         continue
168                 }
169                 var ctr arvados.Container
170                 if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
171                         c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
172                         continue
173                 }
174                 if ctr.State != arvados.ContainerStateQueued {
175                         c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
176                         continue
177                 }
178                 c.Log("reached desired state")
179                 break
180         }
181 }