18290: LSF: make the bsub arguments completely configurable.
[arvados.git] / lib / lsf / dispatch_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "context"
9         "fmt"
10         "math/rand"
11         "os/exec"
12         "strconv"
13         "sync"
14         "testing"
15         "time"
16
17         "git.arvados.org/arvados.git/lib/config"
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/arvadostest"
20         "git.arvados.org/arvados.git/sdk/go/ctxlog"
21         "github.com/prometheus/client_golang/prometheus"
22         "gopkg.in/check.v1"
23 )
24
25 func Test(t *testing.T) {
26         check.TestingT(t)
27 }
28
29 var _ = check.Suite(&suite{})
30
31 type suite struct {
32         disp *dispatcher
33 }
34
35 func (s *suite) TearDownTest(c *check.C) {
36         arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
37 }
38
39 func (s *suite) SetUpTest(c *check.C) {
40         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
41         c.Assert(err, check.IsNil)
42         cluster, err := cfg.GetCluster("")
43         c.Assert(err, check.IsNil)
44         cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
45         s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
46         s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
47                 return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
48         }
49 }
50
51 type lsfstub struct {
52         sudoUser  string
53         errorRate float64
54 }
55
56 func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...string) *exec.Cmd {
57         mtx := sync.Mutex{}
58         nextjobid := 100
59         fakejobq := map[int]string{}
60         return func(prog string, args ...string) *exec.Cmd {
61                 c.Logf("stubCommand: %q %q", prog, args)
62                 if rand.Float64() < stub.errorRate {
63                         return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
64                 }
65                 if stub.sudoUser != "" && len(args) > 3 &&
66                         prog == "sudo" &&
67                         args[0] == "-E" &&
68                         args[1] == "-u" &&
69                         args[2] == stub.sudoUser {
70                         prog, args = args[3], args[4:]
71                 }
72                 switch prog {
73                 case "bsub":
74                         defaultArgs := s.disp.Cluster.Containers.LSF.BsubArgumentsList
75                         c.Assert(len(args), check.Equals, len(defaultArgs))
76                         // %%J must have been rewritten to %J
77                         c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out")
78                         args = args[4:]
79                         switch args[1] {
80                         case arvadostest.LockedContainerUUID:
81                                 c.Check(args, check.DeepEquals, []string{
82                                         "-J", arvadostest.LockedContainerUUID,
83                                         "-n", "4",
84                                         "-D", "11701MB",
85                                         "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]"})
86                                 mtx.Lock()
87                                 fakejobq[nextjobid] = args[1]
88                                 nextjobid++
89                                 mtx.Unlock()
90                         case arvadostest.QueuedContainerUUID:
91                                 c.Check(args, check.DeepEquals, []string{
92                                         "-J", arvadostest.QueuedContainerUUID,
93                                         "-n", "4",
94                                         "-D", "11701MB",
95                                         "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]"})
96                                 mtx.Lock()
97                                 fakejobq[nextjobid] = args[1]
98                                 nextjobid++
99                                 mtx.Unlock()
100                         default:
101                                 c.Errorf("unexpected uuid passed to bsub: args %q", args)
102                                 return exec.Command("false")
103                         }
104                         return exec.Command("echo", "submitted job")
105                 case "bjobs":
106                         c.Check(args, check.DeepEquals, []string{"-u", "all", "-noheader", "-o", "jobid stat job_name:30"})
107                         out := ""
108                         for jobid, uuid := range fakejobq {
109                                 out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid)
110                         }
111                         c.Logf("bjobs out: %q", out)
112                         return exec.Command("printf", out)
113                 case "bkill":
114                         killid, _ := strconv.Atoi(args[0])
115                         if uuid, ok := fakejobq[killid]; !ok {
116                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
117                         } else if uuid == "" {
118                                 return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
119                         } else {
120                                 go func() {
121                                         time.Sleep(time.Millisecond)
122                                         mtx.Lock()
123                                         delete(fakejobq, killid)
124                                         mtx.Unlock()
125                                 }()
126                                 return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
127                         }
128                 default:
129                         return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
130                 }
131         }
132 }
133
134 func (s *suite) TestSubmit(c *check.C) {
135         s.disp.lsfcli.stubCommand = lsfstub{
136                 errorRate: 0.1,
137                 sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
138         }.stubCommand(s, c)
139         s.disp.Start()
140         deadline := time.Now().Add(20 * time.Second)
141         for range time.NewTicker(time.Second).C {
142                 if time.Now().After(deadline) {
143                         c.Error("timed out")
144                         break
145                 }
146                 // "queuedcontainer" should be running
147                 if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
148                         continue
149                 }
150                 // "lockedcontainer" should be cancelled because it
151                 // has priority 0 (no matching container requests)
152                 if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
153                         continue
154                 }
155                 var ctr arvados.Container
156                 if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
157                         c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
158                         continue
159                 }
160                 if ctr.State != arvados.ContainerStateQueued {
161                         c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
162                         continue
163                 }
164                 c.Log("reached desired state")
165                 break
166         }
167 }