6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8 "git.curoverse.com/arvados.git/sdk/go/dispatch"
23 // Gocheck boilerplate
24 func Test(t *testing.T) {
28 var _ = Suite(&TestSuite{})
29 var _ = Suite(&MockArvadosServerSuite{})
31 type TestSuite struct{}
32 type MockArvadosServerSuite struct{}
34 var initialArgs []string
36 func (s *TestSuite) SetUpSuite(c *C) {
40 func (s *TestSuite) TearDownSuite(c *C) {
43 func (s *TestSuite) SetUpTest(c *C) {
44 args := []string{"crunch-dispatch-slurm"}
47 arvadostest.StartAPI()
48 os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
51 func (s *TestSuite) TearDownTest(c *C) {
56 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
57 arvadostest.ResetEnv()
60 func (s *TestSuite) TestIntegrationNormal(c *C) {
61 s.integrationTest(c, false)
64 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
65 s.integrationTest(c, true)
68 func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
69 arvadostest.ResetEnv()
71 arv, err := arvadosclient.MakeArvadosClient()
74 var sbatchCmdLine []string
77 defer func(orig func(dispatch.Container) *exec.Cmd) {
80 sbatchCmd = func(container dispatch.Container) *exec.Cmd {
81 sbatchCmdLine = sbatchFunc(container).Args
82 return exec.Command("sh")
86 defer func(orig func() *exec.Cmd) {
89 squeueCmd = func() *exec.Cmd {
90 if missingFromSqueue {
91 return exec.Command("echo")
93 return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
97 // There should be no queued containers now
98 params := arvadosclient.Dict{
99 "filters": [][]string{[]string{"state", "=", "Queued"}},
101 var containers dispatch.ContainerList
102 err = arv.List("containers", params, &containers)
104 c.Check(len(containers.Items), Equals, 1)
107 crunchRunCommand = &echo
109 doneProcessing := make(chan struct{})
110 dispatcher := dispatch.Dispatcher{
112 PollInterval: time.Duration(1) * time.Second,
113 RunContainer: func(dispatcher *dispatch.Dispatcher,
114 container dispatch.Container,
115 status chan dispatch.Container) {
117 dispatcher.UpdateState(container.UUID, dispatch.Running)
118 time.Sleep(3 * time.Second)
119 dispatcher.UpdateState(container.UUID, dispatch.Complete)
121 run(dispatcher, container, status)
122 doneProcessing <- struct{}{}
124 DoneProcessing: doneProcessing}
126 squeueUpdater.SqueueDone = make(chan struct{})
127 go squeueUpdater.SyncSqueue(time.Duration(500) * time.Millisecond)
129 err = dispatcher.RunDispatcher()
132 squeueUpdater.SqueueDone <- struct{}{}
133 close(squeueUpdater.SqueueDone)
135 item := containers.Items[0]
136 sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
137 fmt.Sprintf("--job-name=%s", item.UUID),
138 fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
139 fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
140 fmt.Sprintf("--priority=%d", item.Priority)}
142 if missingFromSqueue {
143 // not in squeue when run() started, so it will have called sbatch
144 c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
146 // already in squeue when run() started, will have just monitored it instead
147 c.Check(sbatchCmdLine, DeepEquals, []string(nil))
150 // There should be no queued containers now
151 err = arv.List("containers", params, &containers)
153 c.Check(len(containers.Items), Equals, 0)
155 // Previously "Queued" container should now be in "Complete" state
156 var container dispatch.Container
157 err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
159 if missingFromSqueue {
160 c.Check(container.State, Equals, "Cancelled")
162 c.Check(container.State, Equals, "Complete")
166 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
167 apiStubResponses := make(map[string]arvadostest.StubResponse)
168 apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
169 apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
171 testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
174 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
175 apiStub := arvadostest.ServerStub{apiStubResponses}
177 api := httptest.NewServer(&apiStub)
180 arv := arvadosclient.ArvadosClient{
182 ApiServer: api.URL[7:],
184 Client: &http.Client{Transport: &http.Transport{}},
188 buf := bytes.NewBuffer(nil)
189 log.SetOutput(io.MultiWriter(buf, os.Stderr))
190 defer log.SetOutput(os.Stderr)
192 crunchRunCommand = &crunchCmd
194 doneProcessing := make(chan struct{})
195 dispatcher := dispatch.Dispatcher{
197 PollInterval: time.Duration(1) * time.Second,
198 RunContainer: func(dispatcher *dispatch.Dispatcher,
199 container dispatch.Container,
200 status chan dispatch.Container) {
202 time.Sleep(1 * time.Second)
203 dispatcher.UpdateState(container.UUID, dispatch.Running)
204 dispatcher.UpdateState(container.UUID, dispatch.Complete)
206 run(dispatcher, container, status)
207 doneProcessing <- struct{}{}
209 DoneProcessing: doneProcessing}
212 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
213 time.Sleep(100 * time.Millisecond)
215 dispatcher.DoneProcessing <- struct{}{}
218 err := dispatcher.RunDispatcher()
221 c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)