6 "git.curoverse.com/arvados.git/sdk/go/arvados"
7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9 "git.curoverse.com/arvados.git/sdk/go/dispatch"
23 // Gocheck boilerplate
24 func Test(t *testing.T) {
28 var _ = Suite(&TestSuite{})
29 var _ = Suite(&MockArvadosServerSuite{})
31 type TestSuite struct{}
32 type MockArvadosServerSuite struct{}
34 var initialArgs []string
36 func (s *TestSuite) SetUpSuite(c *C) {
40 func (s *TestSuite) TearDownSuite(c *C) {
43 func (s *TestSuite) SetUpTest(c *C) {
44 args := []string{"crunch-dispatch-slurm"}
47 arvadostest.StartAPI()
48 os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
51 func (s *TestSuite) TearDownTest(c *C) {
56 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
57 arvadostest.ResetEnv()
60 func (s *TestSuite) TestIntegrationNormal(c *C) {
61 container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
63 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
64 dispatcher.UpdateState(container.UUID, dispatch.Running)
65 time.Sleep(3 * time.Second)
66 dispatcher.UpdateState(container.UUID, dispatch.Complete)
68 c.Check(container.State, Equals, arvados.ContainerStateComplete)
71 func (s *TestSuite) TestIntegrationCancel(c *C) {
74 var scancelCmdLine []string
75 defer func(orig func(arvados.Container) *exec.Cmd) {
78 scancelCmd = func(container arvados.Container) *exec.Cmd {
79 scancelCmdLine = scancelFunc(container).Args
80 return exec.Command("echo")
83 container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
85 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
86 dispatcher.UpdateState(container.UUID, dispatch.Running)
87 time.Sleep(1 * time.Second)
88 dispatcher.Arv.Update("containers", container.UUID,
90 "container": arvadosclient.Dict{"priority": 0}},
93 c.Check(container.State, Equals, arvados.ContainerStateCancelled)
94 c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
97 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
98 container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share", "--parsable",
99 fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
100 fmt.Sprintf("--mem-per-cpu=%d", 2862),
101 fmt.Sprintf("--cpus-per-task=%d", 4),
102 fmt.Sprintf("--priority=%d", 1)},
103 func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
104 dispatcher.UpdateState(container.UUID, dispatch.Running)
105 time.Sleep(3 * time.Second)
106 dispatcher.UpdateState(container.UUID, dispatch.Complete)
108 c.Check(container.State, Equals, arvados.ContainerStateCancelled)
111 func (s *TestSuite) integrationTest(c *C,
112 newSqueueCmd func() *exec.Cmd,
113 sbatchCmdComps []string,
114 runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
115 arvadostest.ResetEnv()
117 arv, err := arvadosclient.MakeArvadosClient()
120 var sbatchCmdLine []string
122 // Override sbatchCmd
123 defer func(orig func(arvados.Container) *exec.Cmd) {
126 sbatchCmd = func(container arvados.Container) *exec.Cmd {
127 sbatchCmdLine = sbatchFunc(container).Args
128 return exec.Command("sh")
131 // Override squeueCmd
132 defer func(orig func() *exec.Cmd) {
135 squeueCmd = newSqueueCmd
137 // There should be no queued containers now
138 params := arvadosclient.Dict{
139 "filters": [][]string{[]string{"state", "=", "Queued"}},
141 var containers arvados.ContainerList
142 err = arv.List("containers", params, &containers)
144 c.Check(len(containers.Items), Equals, 1)
147 crunchRunCommand = &echo
149 doneProcessing := make(chan struct{})
150 dispatcher := dispatch.Dispatcher{
152 PollInterval: time.Duration(1) * time.Second,
153 RunContainer: func(dispatcher *dispatch.Dispatcher,
154 container arvados.Container,
155 status chan arvados.Container) {
156 go runContainer(dispatcher, container)
157 run(dispatcher, container, status)
158 doneProcessing <- struct{}{}
160 DoneProcessing: doneProcessing}
162 squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
164 err = dispatcher.RunDispatcher()
169 c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
171 // There should be no queued containers now
172 err = arv.List("containers", params, &containers)
174 c.Check(len(containers.Items), Equals, 0)
176 // Previously "Queued" container should now be in "Complete" state
177 var container arvados.Container
178 err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
183 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
184 apiStubResponses := make(map[string]arvadostest.StubResponse)
185 apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
186 apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
188 testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
191 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
192 apiStub := arvadostest.ServerStub{apiStubResponses}
194 api := httptest.NewServer(&apiStub)
197 arv := arvadosclient.ArvadosClient{
199 ApiServer: api.URL[7:],
201 Client: &http.Client{Transport: &http.Transport{}},
205 buf := bytes.NewBuffer(nil)
206 log.SetOutput(io.MultiWriter(buf, os.Stderr))
207 defer log.SetOutput(os.Stderr)
209 crunchRunCommand = &crunchCmd
211 doneProcessing := make(chan struct{})
212 dispatcher := dispatch.Dispatcher{
214 PollInterval: time.Duration(1) * time.Second,
215 RunContainer: func(dispatcher *dispatch.Dispatcher,
216 container arvados.Container,
217 status chan arvados.Container) {
219 time.Sleep(1 * time.Second)
220 dispatcher.UpdateState(container.UUID, dispatch.Running)
221 dispatcher.UpdateState(container.UUID, dispatch.Complete)
223 run(dispatcher, container, status)
224 doneProcessing <- struct{}{}
226 DoneProcessing: doneProcessing}
229 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
230 time.Sleep(100 * time.Millisecond)
232 dispatcher.DoneProcessing <- struct{}{}
235 err := dispatcher.RunDispatcher()
238 c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)