Merge branch '9408-arvbox-dockerfile-pulls-unnecessary-packages'
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvados"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9         "git.curoverse.com/arvados.git/sdk/go/dispatch"
10         "io"
11         "log"
12         "net/http"
13         "net/http/httptest"
14         "os"
15         "os/exec"
16         "strings"
17         "testing"
18         "time"
19
20         . "gopkg.in/check.v1"
21 )
22
23 // Gocheck boilerplate
24 func Test(t *testing.T) {
25         TestingT(t)
26 }
27
28 var _ = Suite(&TestSuite{})
29 var _ = Suite(&MockArvadosServerSuite{})
30
31 type TestSuite struct{}
32 type MockArvadosServerSuite struct{}
33
34 var initialArgs []string
35
36 func (s *TestSuite) SetUpSuite(c *C) {
37         initialArgs = os.Args
38 }
39
40 func (s *TestSuite) TearDownSuite(c *C) {
41 }
42
43 func (s *TestSuite) SetUpTest(c *C) {
44         args := []string{"crunch-dispatch-slurm"}
45         os.Args = args
46
47         arvadostest.StartAPI()
48         os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
49 }
50
51 func (s *TestSuite) TearDownTest(c *C) {
52         os.Args = initialArgs
53         arvadostest.StopAPI()
54 }
55
56 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
57         arvadostest.ResetEnv()
58 }
59
60 func (s *TestSuite) TestIntegrationNormal(c *C) {
61         container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
62                 []string(nil),
63                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
64                         dispatcher.UpdateState(container.UUID, dispatch.Running)
65                         time.Sleep(3 * time.Second)
66                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
67                 })
68         c.Check(container.State, Equals, arvados.ContainerStateComplete)
69 }
70
71 func (s *TestSuite) TestIntegrationCancel(c *C) {
72
73         // Override sbatchCmd
74         var scancelCmdLine []string
75         defer func(orig func(arvados.Container) *exec.Cmd) {
76                 scancelCmd = orig
77         }(scancelCmd)
78         scancelCmd = func(container arvados.Container) *exec.Cmd {
79                 scancelCmdLine = scancelFunc(container).Args
80                 return exec.Command("echo")
81         }
82
83         container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
84                 []string(nil),
85                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
86                         dispatcher.UpdateState(container.UUID, dispatch.Running)
87                         time.Sleep(1 * time.Second)
88                         dispatcher.Arv.Update("containers", container.UUID,
89                                 arvadosclient.Dict{
90                                         "container": arvadosclient.Dict{"priority": 0}},
91                                 nil)
92                 })
93         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
94         c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
95 }
96
97 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
98         container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share", "--parsable",
99                 fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
100                 fmt.Sprintf("--mem-per-cpu=%d", 2862),
101                 fmt.Sprintf("--cpus-per-task=%d", 4),
102                 fmt.Sprintf("--priority=%d", 1)},
103                 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
104                         dispatcher.UpdateState(container.UUID, dispatch.Running)
105                         time.Sleep(3 * time.Second)
106                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
107                 })
108         c.Check(container.State, Equals, arvados.ContainerStateCancelled)
109 }
110
111 func (s *TestSuite) integrationTest(c *C,
112         newSqueueCmd func() *exec.Cmd,
113         sbatchCmdComps []string,
114         runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
115         arvadostest.ResetEnv()
116
117         arv, err := arvadosclient.MakeArvadosClient()
118         c.Assert(err, IsNil)
119
120         var sbatchCmdLine []string
121
122         // Override sbatchCmd
123         defer func(orig func(arvados.Container) *exec.Cmd) {
124                 sbatchCmd = orig
125         }(sbatchCmd)
126         sbatchCmd = func(container arvados.Container) *exec.Cmd {
127                 sbatchCmdLine = sbatchFunc(container).Args
128                 return exec.Command("sh")
129         }
130
131         // Override squeueCmd
132         defer func(orig func() *exec.Cmd) {
133                 squeueCmd = orig
134         }(squeueCmd)
135         squeueCmd = newSqueueCmd
136
137         // There should be no queued containers now
138         params := arvadosclient.Dict{
139                 "filters": [][]string{[]string{"state", "=", "Queued"}},
140         }
141         var containers arvados.ContainerList
142         err = arv.List("containers", params, &containers)
143         c.Check(err, IsNil)
144         c.Check(len(containers.Items), Equals, 1)
145
146         echo := "echo"
147         crunchRunCommand = &echo
148
149         doneProcessing := make(chan struct{})
150         dispatcher := dispatch.Dispatcher{
151                 Arv:          arv,
152                 PollInterval: time.Duration(1) * time.Second,
153                 RunContainer: func(dispatcher *dispatch.Dispatcher,
154                         container arvados.Container,
155                         status chan arvados.Container) {
156                         go runContainer(dispatcher, container)
157                         run(dispatcher, container, status)
158                         doneProcessing <- struct{}{}
159                 },
160                 DoneProcessing: doneProcessing}
161
162         squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
163
164         err = dispatcher.RunDispatcher()
165         c.Assert(err, IsNil)
166
167         squeueUpdater.Done()
168
169         c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
170
171         // There should be no queued containers now
172         err = arv.List("containers", params, &containers)
173         c.Check(err, IsNil)
174         c.Check(len(containers.Items), Equals, 0)
175
176         // Previously "Queued" container should now be in "Complete" state
177         var container arvados.Container
178         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
179         c.Check(err, IsNil)
180         return container
181 }
182
183 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
184         apiStubResponses := make(map[string]arvadostest.StubResponse)
185         apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
186         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
187
188         testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
189 }
190
191 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
192         apiStub := arvadostest.ServerStub{apiStubResponses}
193
194         api := httptest.NewServer(&apiStub)
195         defer api.Close()
196
197         arv := arvadosclient.ArvadosClient{
198                 Scheme:    "http",
199                 ApiServer: api.URL[7:],
200                 ApiToken:  "abc123",
201                 Client:    &http.Client{Transport: &http.Transport{}},
202                 Retries:   0,
203         }
204
205         buf := bytes.NewBuffer(nil)
206         log.SetOutput(io.MultiWriter(buf, os.Stderr))
207         defer log.SetOutput(os.Stderr)
208
209         crunchRunCommand = &crunchCmd
210
211         doneProcessing := make(chan struct{})
212         dispatcher := dispatch.Dispatcher{
213                 Arv:          arv,
214                 PollInterval: time.Duration(1) * time.Second,
215                 RunContainer: func(dispatcher *dispatch.Dispatcher,
216                         container arvados.Container,
217                         status chan arvados.Container) {
218                         go func() {
219                                 time.Sleep(1 * time.Second)
220                                 dispatcher.UpdateState(container.UUID, dispatch.Running)
221                                 dispatcher.UpdateState(container.UUID, dispatch.Complete)
222                         }()
223                         run(dispatcher, container, status)
224                         doneProcessing <- struct{}{}
225                 },
226                 DoneProcessing: doneProcessing}
227
228         go func() {
229                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
230                         time.Sleep(100 * time.Millisecond)
231                 }
232                 dispatcher.DoneProcessing <- struct{}{}
233         }()
234
235         err := dispatcher.RunDispatcher()
236         c.Assert(err, IsNil)
237
238         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
239 }