6 "git.curoverse.com/arvados.git/sdk/go/arvados"
7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9 "git.curoverse.com/arvados.git/sdk/go/dispatch"
24 // Gocheck boilerplate
25 func Test(t *testing.T) {
29 var _ = Suite(&TestSuite{})
30 var _ = Suite(&MockArvadosServerSuite{})
32 type TestSuite struct{}
33 type MockArvadosServerSuite struct{}
35 var initialArgs []string
37 func (s *TestSuite) SetUpSuite(c *C) {
41 func (s *TestSuite) TearDownSuite(c *C) {
44 func (s *TestSuite) SetUpTest(c *C) {
45 args := []string{"crunch-dispatch-slurm"}
48 arvadostest.StartAPI()
49 os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
52 func (s *TestSuite) TearDownTest(c *C) {
57 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
58 arvadostest.ResetEnv()
61 func (s *TestSuite) TestIntegrationNormal(c *C) {
62 container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
64 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
65 dispatcher.UpdateState(container.UUID, dispatch.Running)
66 time.Sleep(3 * time.Second)
67 dispatcher.UpdateState(container.UUID, dispatch.Complete)
69 c.Check(container.State, Equals, arvados.ContainerStateComplete)
72 func (s *TestSuite) TestIntegrationCancel(c *C) {
75 var scancelCmdLine []string
76 defer func(orig func(arvados.Container) *exec.Cmd) {
79 scancelCmd = func(container arvados.Container) *exec.Cmd {
80 scancelCmdLine = scancelFunc(container).Args
81 return exec.Command("echo")
84 container := s.integrationTest(c,
85 func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
87 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
88 dispatcher.UpdateState(container.UUID, dispatch.Running)
89 time.Sleep(1 * time.Second)
90 dispatcher.Arv.Update("containers", container.UUID,
92 "container": arvadosclient.Dict{"priority": 0}},
95 c.Check(container.State, Equals, arvados.ContainerStateCancelled)
96 c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
99 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
100 container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share",
101 fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
102 fmt.Sprintf("--mem-per-cpu=%d", 2862),
103 fmt.Sprintf("--cpus-per-task=%d", 4)},
104 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
105 dispatcher.UpdateState(container.UUID, dispatch.Running)
106 time.Sleep(3 * time.Second)
107 dispatcher.UpdateState(container.UUID, dispatch.Complete)
109 c.Check(container.State, Equals, arvados.ContainerStateCancelled)
112 func (s *TestSuite) integrationTest(c *C,
113 newSqueueCmd func() *exec.Cmd,
114 sbatchCmdComps []string,
115 runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
116 arvadostest.ResetEnv()
118 arv, err := arvadosclient.MakeArvadosClient()
121 var sbatchCmdLine []string
123 // Override sbatchCmd
124 defer func(orig func(arvados.Container) *exec.Cmd) {
127 sbatchCmd = func(container arvados.Container) *exec.Cmd {
128 sbatchCmdLine = sbatchFunc(container).Args
129 return exec.Command("sh")
132 // Override squeueCmd
133 defer func(orig func() *exec.Cmd) {
136 squeueCmd = newSqueueCmd
138 // There should be one queued container
139 params := arvadosclient.Dict{
140 "filters": [][]string{{"state", "=", "Queued"}},
142 var containers arvados.ContainerList
143 err = arv.List("containers", params, &containers)
145 c.Check(len(containers.Items), Equals, 1)
147 theConfig.CrunchRunCommand = []string{"echo"}
149 doneProcessing := make(chan struct{})
150 dispatcher := dispatch.Dispatcher{
152 PollInterval: time.Duration(1) * time.Second,
153 RunContainer: func(dispatcher *dispatch.Dispatcher,
154 container arvados.Container,
155 status chan arvados.Container) {
156 go runContainer(dispatcher, container)
157 run(dispatcher, container, status)
158 doneProcessing <- struct{}{}
160 DoneProcessing: doneProcessing}
162 squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
164 err = dispatcher.RunDispatcher()
169 c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
171 // There should be no queued containers now
172 err = arv.List("containers", params, &containers)
174 c.Check(len(containers.Items), Equals, 0)
176 // Previously "Queued" container should now be in "Complete" state
177 var container arvados.Container
178 err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
183 func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
184 apiStubResponses := make(map[string]arvadostest.StubResponse)
185 apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
186 apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
188 testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
191 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
192 apiStub := arvadostest.ServerStub{apiStubResponses}
194 api := httptest.NewServer(&apiStub)
197 arv := &arvadosclient.ArvadosClient{
199 ApiServer: api.URL[7:],
201 Client: &http.Client{Transport: &http.Transport{}},
205 buf := bytes.NewBuffer(nil)
206 log.SetOutput(io.MultiWriter(buf, os.Stderr))
207 defer log.SetOutput(os.Stderr)
209 theConfig.CrunchRunCommand = []string{crunchCmd}
211 doneProcessing := make(chan struct{})
212 dispatcher := dispatch.Dispatcher{
214 PollInterval: time.Duration(1) * time.Second,
215 RunContainer: func(dispatcher *dispatch.Dispatcher,
216 container arvados.Container,
217 status chan arvados.Container) {
219 time.Sleep(1 * time.Second)
220 dispatcher.UpdateState(container.UUID, dispatch.Running)
221 dispatcher.UpdateState(container.UUID, dispatch.Complete)
223 run(dispatcher, container, status)
224 doneProcessing <- struct{}{}
226 DoneProcessing: doneProcessing}
229 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
230 time.Sleep(100 * time.Millisecond)
232 dispatcher.DoneProcessing <- struct{}{}
235 err := dispatcher.RunDispatcher()
238 c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
241 func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
243 err := readConfig(&config, "/nosuchdir89j7879/8hjwr7ojgyy7")
244 c.Assert(err, NotNil)
247 func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
250 tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
252 defer os.Remove(tmpfile.Name())
254 _, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
257 err = readConfig(&config, tmpfile.Name())
258 c.Assert(err, NotNil)
261 func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
264 tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
266 defer os.Remove(tmpfile.Name())
268 _, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
271 err = readConfig(&config, tmpfile.Name())
273 c.Check(0, Equals, len(config.SbatchArguments))
276 func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
279 tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
281 defer os.Remove(tmpfile.Name())
283 args := []string{"--arg1=v1", "--arg2", "--arg3=v3"}
284 argsS := `{"SbatchArguments": ["--arg1=v1", "--arg2", "--arg3=v3"]}`
285 _, err = tmpfile.Write([]byte(argsS))
288 err = readConfig(&config, tmpfile.Name())
290 c.Check(3, Equals, len(config.SbatchArguments))
291 c.Check(args, DeepEquals, config.SbatchArguments)
294 func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
295 testSbatchFuncWithArgs(c, nil)
298 func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
299 testSbatchFuncWithArgs(c, []string{})
302 func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
303 testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
306 func testSbatchFuncWithArgs(c *C, args []string) {
307 theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
309 container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2}}
310 sbatchCmd := sbatchFunc(container)
312 var expected []string
313 expected = append(expected, "sbatch", "--share")
314 expected = append(expected, theConfig.SbatchArguments...)
315 expected = append(expected, "--job-name=123", "--mem-per-cpu=120", "--cpus-per-task=2")
317 c.Check(sbatchCmd.Args, DeepEquals, expected)
320 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
321 theConfig.SbatchArguments = nil
322 container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1}, SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}}}
323 sbatchCmd := sbatchFunc(container)
325 var expected []string
326 expected = append(expected, "sbatch", "--share")
327 expected = append(expected, "--job-name=123", "--mem-per-cpu=239", "--cpus-per-task=1", "--partition=blurb,b2")
329 c.Check(sbatchCmd.Args, DeepEquals, expected)