1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
18 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
19 "golang.org/x/net/websocket"
20 check "gopkg.in/check.v1"
24 if os.Getenv("ARVADOS_DEBUG") != "" {
25 ctxlog.SetLevel("debug")
29 var _ = check.Suite(&v0Suite{})
32 serverSuite serverSuite
37 func (s *v0Suite) SetUpTest(c *check.C) {
38 s.serverSuite.SetUpTest(c)
39 s.token = arvadostest.ActiveToken
42 func (s *v0Suite) TearDownSuite(c *check.C) {
43 ac := arvados.NewClientFromEnv()
44 ac.AuthToken = arvadostest.AdminToken
45 for _, path := range s.toDelete {
46 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
53 func (s *v0Suite) TestFilters(c *check.C) {
54 srv, conn, r, w := s.testClient()
58 c.Check(w.Encode(map[string]interface{}{
59 "method": "subscribe",
60 "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
62 s.expectStatus(c, r, 200)
65 lg := s.expectLog(c, r)
66 c.Check(lg.EventType, check.Equals, "update")
69 func (s *v0Suite) TestLastLogID(c *check.C) {
71 c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
73 srv, conn, r, w := s.testClient()
77 uuidChan := make(chan string, 2)
78 s.emitEvents(uuidChan)
80 c.Check(w.Encode(map[string]interface{}{
81 "method": "subscribe",
82 "last_log_id": lastID,
84 s.expectStatus(c, r, 200)
87 s.emitEvents(uuidChan)
91 done := make(chan bool)
93 for uuid := range uuidChan {
94 for _, etype := range []string{"create", "blip", "update"} {
95 lg := s.expectLog(c, r)
96 c.Check(lg.ObjectUUID, check.Equals, uuid)
97 c.Check(lg.EventType, check.Equals, etype)
104 case <-time.After(10 * time.Second):
110 func (s *v0Suite) TestPermission(c *check.C) {
111 srv, conn, r, w := s.testClient()
115 c.Check(w.Encode(map[string]interface{}{
116 "method": "subscribe",
118 s.expectStatus(c, r, 200)
120 uuidChan := make(chan string, 1)
122 s.token = arvadostest.AdminToken
124 s.token = arvadostest.ActiveToken
125 s.emitEvents(uuidChan)
128 lg := s.expectLog(c, r)
129 c.Check(lg.ObjectUUID, check.Equals, <-uuidChan)
132 func (s *v0Suite) TestSendBadJSON(c *check.C) {
133 srv, conn, r, w := s.testClient()
137 c.Check(w.Encode(map[string]interface{}{
138 "method": "subscribe",
140 s.expectStatus(c, r, 200)
142 _, err := fmt.Fprint(conn, "^]beep\n")
143 c.Check(err, check.IsNil)
144 s.expectStatus(c, r, 400)
146 c.Check(w.Encode(map[string]interface{}{
147 "method": "subscribe",
149 s.expectStatus(c, r, 200)
152 func (s *v0Suite) TestSubscribe(c *check.C) {
153 srv, conn, r, w := s.testClient()
159 err := w.Encode(map[string]interface{}{"21": 12})
160 c.Check(err, check.IsNil)
161 s.expectStatus(c, r, 400)
163 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
164 c.Check(err, check.IsNil)
165 s.expectStatus(c, r, 200)
167 uuidChan := make(chan string, 1)
168 go s.emitEvents(uuidChan)
171 for _, etype := range []string{"create", "blip", "update"} {
172 lg := s.expectLog(c, r)
173 c.Check(lg.ObjectUUID, check.Equals, uuid)
174 c.Check(lg.EventType, check.Equals, etype)
178 // Generate some events by creating and updating a workflow object,
179 // and creating a custom log entry (event_type="blip") about the newly
180 // created workflow. If uuidChan is not nil, send the new workflow
181 // UUID to uuidChan as soon as it's known.
182 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
183 ac := arvados.NewClientFromEnv()
184 ac.AuthToken = s.token
185 wf := &arvados.Workflow{
188 err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
196 err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
199 Properties: map[string]interface{}{
206 err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
210 s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
213 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
214 j, err := json.Marshal(ob)
219 v[rscName] = []string{string(j)}
220 return bytes.NewBufferString(v.Encode())
223 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
224 msg := map[string]interface{}{}
225 c.Check(r.Decode(&msg), check.IsNil)
226 c.Check(int(msg["status"].(float64)), check.Equals, status)
229 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
231 c.Check(r.Decode(lg), check.IsNil)
235 func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
236 go s.serverSuite.srv.Run()
237 s.serverSuite.srv.WaitReady()
238 srv := s.serverSuite.srv
239 conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
243 w := json.NewEncoder(conn)
244 r := json.NewDecoder(conn)
245 return srv, conn, r, w