6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8 "git.curoverse.com/arvados.git/sdk/go/dispatch"
22 // Gocheck boilerplate
23 func Test(t *testing.T) {
27 var _ = Suite(&TestSuite{})
28 var _ = Suite(&MockArvadosServerSuite{})
30 type TestSuite struct{}
31 type MockArvadosServerSuite struct{}
33 var initialArgs []string
35 func (s *TestSuite) SetUpSuite(c *C) {
39 func (s *TestSuite) TearDownSuite(c *C) {
42 func (s *TestSuite) SetUpTest(c *C) {
43 args := []string{"crunch-dispatch-slurm"}
46 arvadostest.StartAPI()
47 os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
50 func (s *TestSuite) TearDownTest(c *C) {
55 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
56 arvadostest.ResetEnv()
59 func (s *TestSuite) TestIntegrationNormal(c *C) {
60 container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
62 func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
63 dispatcher.UpdateState(container.UUID, dispatch.Running)
64 time.Sleep(3 * time.Second)
65 dispatcher.UpdateState(container.UUID, dispatch.Complete)
67 c.Check(container.State, Equals, "Complete")
70 func (s *TestSuite) TestIntegrationCancel(c *C) {
73 var scancelCmdLine []string
74 defer func(orig func(dispatch.Container) *exec.Cmd) {
77 scancelCmd = func(container dispatch.Container) *exec.Cmd {
78 scancelCmdLine = scancelFunc(container).Args
79 return exec.Command("echo")
82 container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
84 func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
85 dispatcher.UpdateState(container.UUID, dispatch.Running)
86 time.Sleep(1 * time.Second)
87 dispatcher.Arv.Update("containers", container.UUID,
89 "container": arvadosclient.Dict{"priority": 0}},
92 c.Check(container.State, Equals, "Cancelled")
93 c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
96 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
97 container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share", "--parsable",
98 fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
99 fmt.Sprintf("--mem-per-cpu=%d", 2862),
100 fmt.Sprintf("--cpus-per-task=%d", 4),
101 fmt.Sprintf("--priority=%d", 1)},
102 func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
103 dispatcher.UpdateState(container.UUID, dispatch.Running)
104 time.Sleep(3 * time.Second)
105 dispatcher.UpdateState(container.UUID, dispatch.Complete)
107 c.Check(container.State, Equals, "Cancelled")
110 func (s *TestSuite) integrationTest(c *C,
111 newSqueueCmd func() *exec.Cmd,
112 sbatchCmdComps []string,
113 runContainer func(*dispatch.Dispatcher, dispatch.Container)) dispatch.Container {
114 arvadostest.ResetEnv()
116 arv, err := arvadosclient.MakeArvadosClient()
119 var sbatchCmdLine []string
121 // Override sbatchCmd
122 defer func(orig func(dispatch.Container) *exec.Cmd) {
125 sbatchCmd = func(container dispatch.Container) *exec.Cmd {
126 sbatchCmdLine = sbatchFunc(container).Args
127 return exec.Command("sh")
130 // Override squeueCmd
131 defer func(orig func() *exec.Cmd) {
134 squeueCmd = newSqueueCmd
136 // There should be no queued containers now
137 params := arvadosclient.Dict{
138 "filters": [][]string{[]string{"state", "=", "Queued"}},
140 var containers dispatch.ContainerList
141 err = arv.List("containers", params, &containers)
143 c.Check(len(containers.Items), Equals, 1)
146 crunchRunCommand = &echo
148 doneProcessing := make(chan struct{})
149 dispatcher := dispatch.Dispatcher{
151 PollInterval: time.Duration(1) * time.Second,
152 RunContainer: func(dispatcher *dispatch.Dispatcher,
153 container dispatch.Container,
154 status chan dispatch.Container) {
155 go runContainer(dispatcher, container)
156 run(dispatcher, container, status)
157 doneProcessing <- struct{}{}
159 DoneProcessing: doneProcessing}
161 squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
163 err = dispatcher.RunDispatcher()
168 c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
170 // There should be no queued containers now
171 err = arv.List("containers", params, &containers)
173 c.Check(len(containers.Items), Equals, 0)
175 // Previously "Queued" container should now be in "Complete" state
176 var container dispatch.Container
177 err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
182 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
183 apiStubResponses := make(map[string]arvadostest.StubResponse)
184 apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
185 apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
187 testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
190 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
191 apiStub := arvadostest.ServerStub{apiStubResponses}
193 api := httptest.NewServer(&apiStub)
196 arv := arvadosclient.ArvadosClient{
198 ApiServer: api.URL[7:],
200 Client: &http.Client{Transport: &http.Transport{}},
204 buf := bytes.NewBuffer(nil)
205 log.SetOutput(io.MultiWriter(buf, os.Stderr))
206 defer log.SetOutput(os.Stderr)
208 crunchRunCommand = &crunchCmd
210 doneProcessing := make(chan struct{})
211 dispatcher := dispatch.Dispatcher{
213 PollInterval: time.Duration(1) * time.Second,
214 RunContainer: func(dispatcher *dispatch.Dispatcher,
215 container dispatch.Container,
216 status chan dispatch.Container) {
218 time.Sleep(1 * time.Second)
219 dispatcher.UpdateState(container.UUID, dispatch.Running)
220 dispatcher.UpdateState(container.UUID, dispatch.Complete)
222 run(dispatcher, container, status)
223 doneProcessing <- struct{}{}
225 DoneProcessing: doneProcessing}
228 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
229 time.Sleep(100 * time.Millisecond)
231 dispatcher.DoneProcessing <- struct{}{}
234 err := dispatcher.RunDispatcher()
237 c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)