Merge branch '9272-use-container-auth'
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm_test.go
1 package main
2
3 import (
4         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
5         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
6
7         "bytes"
8         "fmt"
9         "log"
10         "math"
11         "net/http"
12         "net/http/httptest"
13         "os"
14         "os/exec"
15         "strconv"
16         "strings"
17         "syscall"
18         "testing"
19         "time"
20
21         . "gopkg.in/check.v1"
22 )
23
24 // Gocheck boilerplate
25 func Test(t *testing.T) {
26         TestingT(t)
27 }
28
29 var _ = Suite(&TestSuite{})
30 var _ = Suite(&MockArvadosServerSuite{})
31
32 type TestSuite struct{}
33 type MockArvadosServerSuite struct{}
34
35 var initialArgs []string
36
37 func (s *TestSuite) SetUpSuite(c *C) {
38         initialArgs = os.Args
39         arvadostest.StartAPI()
40 }
41
42 func (s *TestSuite) TearDownSuite(c *C) {
43         arvadostest.StopAPI()
44 }
45
46 func (s *TestSuite) SetUpTest(c *C) {
47         args := []string{"crunch-dispatch-slurm"}
48         os.Args = args
49
50         var err error
51         arv, err = arvadosclient.MakeArvadosClient()
52         if err != nil {
53                 c.Fatalf("Error making arvados client: %s", err)
54         }
55         os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
56 }
57
58 func (s *TestSuite) TearDownTest(c *C) {
59         arvadostest.ResetEnv()
60         os.Args = initialArgs
61 }
62
63 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
64         arvadostest.ResetEnv()
65 }
66
67 func (s *TestSuite) Test_doMain(c *C) {
68         args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
69         os.Args = append(os.Args, args...)
70
71         var sbatchCmdLine []string
72         var striggerCmdLine []string
73
74         // Override sbatchCmd
75         defer func(orig func(Container) *exec.Cmd) {
76                 sbatchCmd = orig
77         }(sbatchCmd)
78         sbatchCmd = func(container Container) *exec.Cmd {
79                 sbatchCmdLine = sbatchFunc(container).Args
80                 return exec.Command("sh")
81         }
82
83         // Override striggerCmd
84         defer func(orig func(jobid, containerUUID, finishCommand,
85                 apiHost, apiToken, apiInsecure string) *exec.Cmd) {
86                 striggerCmd = orig
87         }(striggerCmd)
88         striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
89                 striggerCmdLine = striggerFunc(jobid, containerUUID, finishCommand,
90                         apiHost, apiToken, apiInsecure).Args
91                 go func() {
92                         time.Sleep(5 * time.Second)
93                         for _, state := range []string{"Running", "Complete"} {
94                                 arv.Update("containers", containerUUID,
95                                         arvadosclient.Dict{
96                                                 "container": arvadosclient.Dict{"state": state}},
97                                         nil)
98                         }
99                 }()
100                 return exec.Command("echo", "strigger")
101         }
102
103         go func() {
104                 time.Sleep(8 * time.Second)
105                 sigChan <- syscall.SIGINT
106         }()
107
108         // There should be no queued containers now
109         params := arvadosclient.Dict{
110                 "filters": [][]string{[]string{"state", "=", "Queued"}},
111         }
112         var containers ContainerList
113         err := arv.List("containers", params, &containers)
114         c.Check(err, IsNil)
115         c.Check(len(containers.Items), Equals, 1)
116
117         err = doMain()
118         c.Check(err, IsNil)
119
120         item := containers.Items[0]
121         sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
122                 fmt.Sprintf("--job-name=%s", item.UUID),
123                 fmt.Sprintf("--mem-per-cpu=%s", strconv.Itoa(int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576))))),
124                 fmt.Sprintf("--cpus-per-task=%s", strconv.Itoa(int(item.RuntimeConstraints["vcpus"])))}
125         c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
126
127         c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
128                 "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
129
130         // There should be no queued containers now
131         err = arv.List("containers", params, &containers)
132         c.Check(err, IsNil)
133         c.Check(len(containers.Items), Equals, 0)
134
135         // Previously "Queued" container should now be in "Complete" state
136         var container Container
137         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
138         c.Check(err, IsNil)
139         c.Check(container.State, Equals, "Complete")
140 }
141
142 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
143         apiStubResponses := make(map[string]arvadostest.StubResponse)
144         apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
145         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
146
147         testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
148 }
149
150 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
151         apiStub := arvadostest.ServerStub{apiStubResponses}
152
153         api := httptest.NewServer(&apiStub)
154         defer api.Close()
155
156         arv = arvadosclient.ArvadosClient{
157                 Scheme:    "http",
158                 ApiServer: api.URL[7:],
159                 ApiToken:  "abc123",
160                 Client:    &http.Client{Transport: &http.Transport{}},
161                 Retries:   0,
162         }
163
164         buf := bytes.NewBuffer(nil)
165         log.SetOutput(buf)
166         defer log.SetOutput(os.Stderr)
167
168         go func() {
169                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
170                         time.Sleep(100 * time.Millisecond)
171                 }
172                 sigChan <- syscall.SIGTERM
173         }()
174
175         runQueuedContainers(2, 1, crunchCmd, crunchCmd)
176
177         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
178 }