9187: Slurm dispatcher improvements around squeue
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/dispatch"
9         "io"
10         "log"
11         "math"
12         "net/http"
13         "net/http/httptest"
14         "os"
15         "os/exec"
16         "strings"
17         "testing"
18         "time"
19
20         . "gopkg.in/check.v1"
21 )
22
23 // Gocheck boilerplate
24 func Test(t *testing.T) {
25         TestingT(t)
26 }
27
28 var _ = Suite(&TestSuite{})
29 var _ = Suite(&MockArvadosServerSuite{})
30
31 type TestSuite struct{}
32 type MockArvadosServerSuite struct{}
33
34 var initialArgs []string
35
36 func (s *TestSuite) SetUpSuite(c *C) {
37         initialArgs = os.Args
38 }
39
40 func (s *TestSuite) TearDownSuite(c *C) {
41 }
42
43 func (s *TestSuite) SetUpTest(c *C) {
44         args := []string{"crunch-dispatch-slurm"}
45         os.Args = args
46
47         arvadostest.StartAPI()
48         os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
49 }
50
51 func (s *TestSuite) TearDownTest(c *C) {
52         os.Args = initialArgs
53         arvadostest.StopAPI()
54 }
55
56 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
57         arvadostest.ResetEnv()
58 }
59
60 func (s *TestSuite) TestIntegrationNormal(c *C) {
61         s.integrationTest(c, false)
62 }
63
64 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
65         s.integrationTest(c, true)
66 }
67
68 func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
69         arvadostest.ResetEnv()
70
71         arv, err := arvadosclient.MakeArvadosClient()
72         c.Assert(err, IsNil)
73
74         var sbatchCmdLine []string
75
76         // Override sbatchCmd
77         defer func(orig func(dispatch.Container) *exec.Cmd) {
78                 sbatchCmd = orig
79         }(sbatchCmd)
80         sbatchCmd = func(container dispatch.Container) *exec.Cmd {
81                 sbatchCmdLine = sbatchFunc(container).Args
82                 return exec.Command("sh")
83         }
84
85         // Override squeueCmd
86         defer func(orig func() *exec.Cmd) {
87                 squeueCmd = orig
88         }(squeueCmd)
89         squeueCmd = func() *exec.Cmd {
90                 if missingFromSqueue {
91                         return exec.Command("echo")
92                 } else {
93                         return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
94                 }
95         }
96
97         // There should be no queued containers now
98         params := arvadosclient.Dict{
99                 "filters": [][]string{[]string{"state", "=", "Queued"}},
100         }
101         var containers dispatch.ContainerList
102         err = arv.List("containers", params, &containers)
103         c.Check(err, IsNil)
104         c.Check(len(containers.Items), Equals, 1)
105
106         echo := "echo"
107         crunchRunCommand = &echo
108
109         doneProcessing := make(chan struct{})
110         dispatcher := dispatch.Dispatcher{
111                 Arv:          arv,
112                 PollInterval: time.Duration(1) * time.Second,
113                 RunContainer: func(dispatcher *dispatch.Dispatcher,
114                         container dispatch.Container,
115                         status chan dispatch.Container) {
116                         go func() {
117                                 dispatcher.UpdateState(container.UUID, dispatch.Running)
118                                 time.Sleep(3 * time.Second)
119                                 dispatcher.UpdateState(container.UUID, dispatch.Complete)
120                         }()
121                         run(dispatcher, container, status)
122                         doneProcessing <- struct{}{}
123                 },
124                 DoneProcessing: doneProcessing}
125
126         squeueUpdater.SqueueDone = make(chan struct{})
127         go squeueUpdater.SyncSqueue(time.Duration(500) * time.Millisecond)
128
129         err = dispatcher.RunDispatcher()
130         c.Assert(err, IsNil)
131
132         squeueUpdater.SqueueDone <- struct{}{}
133         close(squeueUpdater.SqueueDone)
134
135         item := containers.Items[0]
136         sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
137                 fmt.Sprintf("--job-name=%s", item.UUID),
138                 fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
139                 fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
140                 fmt.Sprintf("--priority=%d", item.Priority)}
141
142         if missingFromSqueue {
143                 // not in squeue when run() started, so it will have called sbatch
144                 c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
145         } else {
146                 // already in squeue when run() started, will have just monitored it instead
147                 c.Check(sbatchCmdLine, DeepEquals, []string(nil))
148         }
149
150         // There should be no queued containers now
151         err = arv.List("containers", params, &containers)
152         c.Check(err, IsNil)
153         c.Check(len(containers.Items), Equals, 0)
154
155         // Previously "Queued" container should now be in "Complete" state
156         var container dispatch.Container
157         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
158         c.Check(err, IsNil)
159         if missingFromSqueue {
160                 c.Check(container.State, Equals, "Cancelled")
161         } else {
162                 c.Check(container.State, Equals, "Complete")
163         }
164 }
165
166 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
167         apiStubResponses := make(map[string]arvadostest.StubResponse)
168         apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
169         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
170
171         testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
172 }
173
174 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
175         apiStub := arvadostest.ServerStub{apiStubResponses}
176
177         api := httptest.NewServer(&apiStub)
178         defer api.Close()
179
180         arv := arvadosclient.ArvadosClient{
181                 Scheme:    "http",
182                 ApiServer: api.URL[7:],
183                 ApiToken:  "abc123",
184                 Client:    &http.Client{Transport: &http.Transport{}},
185                 Retries:   0,
186         }
187
188         buf := bytes.NewBuffer(nil)
189         log.SetOutput(io.MultiWriter(buf, os.Stderr))
190         defer log.SetOutput(os.Stderr)
191
192         crunchRunCommand = &crunchCmd
193
194         doneProcessing := make(chan struct{})
195         dispatcher := dispatch.Dispatcher{
196                 Arv:          arv,
197                 PollInterval: time.Duration(1) * time.Second,
198                 RunContainer: func(dispatcher *dispatch.Dispatcher,
199                         container dispatch.Container,
200                         status chan dispatch.Container) {
201                         go func() {
202                                 time.Sleep(1 * time.Second)
203                                 dispatcher.UpdateState(container.UUID, dispatch.Running)
204                                 dispatcher.UpdateState(container.UUID, dispatch.Complete)
205                         }()
206                         run(dispatcher, container, status)
207                         doneProcessing <- struct{}{}
208                 },
209                 DoneProcessing: doneProcessing}
210
211         go func() {
212                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
213                         time.Sleep(100 * time.Millisecond)
214                 }
215                 dispatcher.DoneProcessing <- struct{}{}
216         }()
217
218         err := dispatcher.RunDispatcher()
219         c.Assert(err, IsNil)
220
221         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
222 }