13959: Use logrus for crunch-dispatch-local logging.
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "io"
11         "net/http"
12         "net/http/httptest"
13         "os"
14         "os/exec"
15         "strings"
16         "testing"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
21         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
22         "git.curoverse.com/arvados.git/sdk/go/dispatch"
23         "github.com/Sirupsen/logrus"
24         . "gopkg.in/check.v1"
25 )
26
27 // Gocheck boilerplate
28 func Test(t *testing.T) {
29         TestingT(t)
30 }
31
32 var _ = Suite(&TestSuite{})
33 var _ = Suite(&MockArvadosServerSuite{})
34
35 type TestSuite struct{}
36 type MockArvadosServerSuite struct{}
37
38 var initialArgs []string
39
40 func (s *TestSuite) SetUpSuite(c *C) {
41         initialArgs = os.Args
42         arvadostest.StartAPI()
43         runningCmds = make(map[string]*exec.Cmd)
44 }
45
46 func (s *TestSuite) TearDownSuite(c *C) {
47         arvadostest.StopAPI()
48 }
49
50 func (s *TestSuite) SetUpTest(c *C) {
51         args := []string{"crunch-dispatch-local"}
52         os.Args = args
53 }
54
55 func (s *TestSuite) TearDownTest(c *C) {
56         arvadostest.ResetEnv()
57         os.Args = initialArgs
58 }
59
60 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
61         arvadostest.ResetEnv()
62 }
63
64 func (s *TestSuite) TestIntegration(c *C) {
65         arv, err := arvadosclient.MakeArvadosClient()
66         c.Assert(err, IsNil)
67
68         echo := "echo"
69         crunchRunCommand = &echo
70
71         ctx, cancel := context.WithCancel(context.Background())
72         dispatcher := dispatch.Dispatcher{
73                 Arv:        arv,
74                 PollPeriod: time.Second,
75                 RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
76                         run(d, c, s)
77                         cancel()
78                 },
79         }
80
81         startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
82                 dispatcher.UpdateState(container.UUID, "Running")
83                 dispatcher.UpdateState(container.UUID, "Complete")
84                 return cmd.Start()
85         }
86
87         err = dispatcher.Run(ctx)
88         c.Assert(err, Equals, context.Canceled)
89
90         // Wait for all running crunch jobs to complete / terminate
91         waitGroup.Wait()
92
93         // There should be no queued containers now
94         params := arvadosclient.Dict{
95                 "filters": [][]string{{"state", "=", "Queued"}},
96         }
97         var containers arvados.ContainerList
98         err = arv.List("containers", params, &containers)
99         c.Check(err, IsNil)
100         c.Assert(len(containers.Items), Equals, 0)
101
102         // Previously "Queued" container should now be in "Complete" state
103         var container arvados.Container
104         err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
105         c.Check(err, IsNil)
106         c.Check(string(container.State), Equals, "Complete")
107 }
108
109 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
110         apiStubResponses := make(map[string]arvadostest.StubResponse)
111         apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
112
113         testWithServerStub(c, apiStubResponses, "echo", "error getting list of containers")
114 }
115
116 func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
117         apiStubResponses := make(map[string]arvadostest.StubResponse)
118         apiStubResponses["/arvados/v1/containers"] =
119                 arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1","State":"Queued","Priority":1}]}`)}
120         apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
121                 arvadostest.StubResponse{500, string(`{}`)}
122
123         testWithServerStub(c, apiStubResponses, "echo", "error locking container zzzzz-dz642-xxxxxxxxxxxxxx1")
124 }
125
126 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
127         apiStubResponses := make(map[string]arvadostest.StubResponse)
128         apiStubResponses["/arvados/v1/containers"] =
129                 arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued","Priority":1}]}`)}
130         apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2/lock"] =
131                 arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Locked", "priority":1, "locked_by_uuid": "` + arvadostest.Dispatch1AuthUUID + `"}`)}
132         apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
133                 arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1, "locked_by_uuid": "` + arvadostest.Dispatch1AuthUUID + `"}`)}
134
135         testWithServerStub(c, apiStubResponses, "echo",
136                 `after "echo" process termination, container state for zzzzz-dz642-xxxxxxxxxxxxxx2 is "Running"; updating it to "Cancelled"`)
137 }
138
139 func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
140         apiStubResponses := make(map[string]arvadostest.StubResponse)
141         apiStubResponses["/arvados/v1/containers"] =
142                 arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Queued","Priority":1}]}`)}
143
144         apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3/lock"] =
145                 arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Locked", "priority":1}`)}
146
147         testWithServerStub(c, apiStubResponses, "nosuchcommand", `error starting "nosuchcommand" for zzzzz-dz642-xxxxxxxxxxxxxx3`)
148 }
149
150 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
151         apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
152                 arvadostest.StubResponse{200, string(`{"uuid": "` + arvadostest.Dispatch1AuthUUID + `", "api_token": "xyz"}`)}
153
154         apiStub := arvadostest.ServerStub{apiStubResponses}
155
156         api := httptest.NewServer(&apiStub)
157         defer api.Close()
158
159         arv := &arvadosclient.ArvadosClient{
160                 Scheme:    "http",
161                 ApiServer: api.URL[7:],
162                 ApiToken:  "abc123",
163                 Client:    &http.Client{Transport: &http.Transport{}},
164                 Retries:   0,
165         }
166
167         buf := bytes.NewBuffer(nil)
168         logrus.SetOutput(io.MultiWriter(buf, os.Stderr))
169         defer logrus.SetOutput(os.Stderr)
170
171         *crunchRunCommand = crunchCmd
172
173         ctx, cancel := context.WithCancel(context.Background())
174         dispatcher := dispatch.Dispatcher{
175                 Arv:        arv,
176                 PollPeriod: time.Second / 20,
177                 RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
178                         run(d, c, s)
179                         cancel()
180                 },
181         }
182
183         startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
184                 dispatcher.UpdateState(container.UUID, "Running")
185                 dispatcher.UpdateState(container.UUID, "Complete")
186                 return cmd.Start()
187         }
188
189         go func() {
190                 for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
191                         time.Sleep(100 * time.Millisecond)
192                 }
193                 cancel()
194         }()
195
196         err := dispatcher.Run(ctx)
197         c.Assert(err, Equals, context.Canceled)
198
199         // Wait for all running crunch jobs to complete / terminate
200         waitGroup.Wait()
201
202         c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
203 }