Merge branch 'master' into 8650-container-work-unit
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/dispatch"
9         "io"
10         "log"
11         "net/http"
12         "net/http/httptest"
13         "os"
14         "os/exec"
15         "strings"
16         "testing"
17         "time"
18
19         . "gopkg.in/check.v1"
20 )
21
22 // Gocheck boilerplate
23 func Test(t *testing.T) {
24         TestingT(t)
25 }
26
27 var _ = Suite(&TestSuite{})
28 var _ = Suite(&MockArvadosServerSuite{})
29
30 type TestSuite struct{}
31 type MockArvadosServerSuite struct{}
32
33 var initialArgs []string
34
35 func (s *TestSuite) SetUpSuite(c *C) {
36         initialArgs = os.Args
37 }
38
39 func (s *TestSuite) TearDownSuite(c *C) {
40 }
41
42 func (s *TestSuite) SetUpTest(c *C) {
43         args := []string{"crunch-dispatch-slurm"}
44         os.Args = args
45
46         arvadostest.StartAPI()
47         os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
48 }
49
50 func (s *TestSuite) TearDownTest(c *C) {
51         os.Args = initialArgs
52         arvadostest.StopAPI()
53 }
54
55 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
56         arvadostest.ResetEnv()
57 }
58
59 func (s *TestSuite) TestIntegrationNormal(c *C) {
60         container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
61                 []string(nil),
62                 func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
63                         dispatcher.UpdateState(container.UUID, dispatch.Running)
64                         time.Sleep(3 * time.Second)
65                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
66                 })
67         c.Check(container.State, Equals, "Complete")
68 }
69
70 func (s *TestSuite) TestIntegrationCancel(c *C) {
71
72         // Override sbatchCmd
73         var scancelCmdLine []string
74         defer func(orig func(dispatch.Container) *exec.Cmd) {
75                 scancelCmd = orig
76         }(scancelCmd)
77         scancelCmd = func(container dispatch.Container) *exec.Cmd {
78                 scancelCmdLine = scancelFunc(container).Args
79                 return exec.Command("echo")
80         }
81
82         container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
83                 []string(nil),
84                 func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
85                         dispatcher.UpdateState(container.UUID, dispatch.Running)
86                         time.Sleep(1 * time.Second)
87                         dispatcher.Arv.Update("containers", container.UUID,
88                                 arvadosclient.Dict{
89                                         "container": arvadosclient.Dict{"priority": 0}},
90                                 nil)
91                 })
92         c.Check(container.State, Equals, "Cancelled")
93         c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
94 }
95
96 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
97         container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share", "--parsable",
98                 fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
99                 fmt.Sprintf("--mem-per-cpu=%d", 2862),
100                 fmt.Sprintf("--cpus-per-task=%d", 4),
101                 fmt.Sprintf("--priority=%d", 1)},
102                 func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
103                         dispatcher.UpdateState(container.UUID, dispatch.Running)
104                         time.Sleep(3 * time.Second)
105                         dispatcher.UpdateState(container.UUID, dispatch.Complete)
106                 })
107         c.Check(container.State, Equals, "Cancelled")
108 }
109
110 func (s *TestSuite) integrationTest(c *C,
111         newSqueueCmd func() *exec.Cmd,
112         sbatchCmdComps []string,
113         runContainer func(*dispatch.Dispatcher, dispatch.Container)) dispatch.Container {
114         arvadostest.ResetEnv()
115
116         arv, err := arvadosclient.MakeArvadosClient()
117         c.Assert(err, IsNil)
118
119         var sbatchCmdLine []string
120
121         // Override sbatchCmd
122         defer func(orig func(dispatch.Container) *exec.Cmd) {
123                 sbatchCmd = orig
124         }(sbatchCmd)
125         sbatchCmd = func(container dispatch.Container) *exec.Cmd {
126                 sbatchCmdLine = sbatchFunc(container).Args
127                 return exec.Command("sh")
128         }
129
130         // Override squeueCmd
131         defer func(orig func() *exec.Cmd) {
132                 squeueCmd = orig
133         }(squeueCmd)
134         squeueCmd = newSqueueCmd
135
136         // There should be no queued containers now
137         params := arvadosclient.Dict{
138                 "filters": [][]string{[]string{"state", "=", "Queued"}},
139         }
140         var containers dispatch.ContainerList
141         err = arv.List("containers", params, &containers)
142         c.Check(err, IsNil)
143         c.Check(len(containers.Items), Equals, 1)
144
145         echo := "echo"
146         crunchRunCommand = &echo
147
148         doneProcessing := make(chan struct{})
149         dispatcher := dispatch.Dispatcher{
150                 Arv:          arv,
151                 PollInterval: time.Duration(1) * time.Second,
152                 RunContainer: func(dispatcher *dispatch.Dispatcher,
153                         container dispatch.Container,
154                         status chan dispatch.Container) {
155                         go runContainer(dispatcher, container)
156                         run(dispatcher, container, status)
157                         doneProcessing <- struct{}{}
158                 },
159                 DoneProcessing: doneProcessing}
160
161         squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
162
163         err = dispatcher.RunDispatcher()
164         c.Assert(err, IsNil)
165
166         squeueUpdater.Done()
167
168         c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
169
170         // There should be no queued containers now
171         err = arv.List("containers", params, &containers)
172         c.Check(err, IsNil)
173         c.Check(len(containers.Items), Equals, 0)
174
175         // Previously "Queued" container should now be in "Complete" state
176         var container dispatch.Container
177         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
178         c.Check(err, IsNil)
179         return container
180 }
181
182 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
183         apiStubResponses := make(map[string]arvadostest.StubResponse)
184         apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
185         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
186
187         testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
188 }
189
190 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
191         apiStub := arvadostest.ServerStub{apiStubResponses}
192
193         api := httptest.NewServer(&apiStub)
194         defer api.Close()
195
196         arv := arvadosclient.ArvadosClient{
197                 Scheme:    "http",
198                 ApiServer: api.URL[7:],
199                 ApiToken:  "abc123",
200                 Client:    &http.Client{Transport: &http.Transport{}},
201                 Retries:   0,
202         }
203
204         buf := bytes.NewBuffer(nil)
205         log.SetOutput(io.MultiWriter(buf, os.Stderr))
206         defer log.SetOutput(os.Stderr)
207
208         crunchRunCommand = &crunchCmd
209
210         doneProcessing := make(chan struct{})
211         dispatcher := dispatch.Dispatcher{
212                 Arv:          arv,
213                 PollInterval: time.Duration(1) * time.Second,
214                 RunContainer: func(dispatcher *dispatch.Dispatcher,
215                         container dispatch.Container,
216                         status chan dispatch.Container) {
217                         go func() {
218                                 time.Sleep(1 * time.Second)
219                                 dispatcher.UpdateState(container.UUID, dispatch.Running)
220                                 dispatcher.UpdateState(container.UUID, dispatch.Complete)
221                         }()
222                         run(dispatcher, container, status)
223                         doneProcessing <- struct{}{}
224                 },
225                 DoneProcessing: doneProcessing}
226
227         go func() {
228                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
229                         time.Sleep(100 * time.Millisecond)
230                 }
231                 dispatcher.DoneProcessing <- struct{}{}
232         }()
233
234         err := dispatcher.RunDispatcher()
235         c.Assert(err, IsNil)
236
237         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
238 }