1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
18 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
19 "golang.org/x/net/websocket"
20 check "gopkg.in/check.v1"
24 if os.Getenv("ARVADOS_DEBUG") != "" {
25 ctxlog.SetLevel("debug")
29 var _ = check.Suite(&v0Suite{})
32 serverSuite serverSuite
37 func (s *v0Suite) SetUpTest(c *check.C) {
38 s.serverSuite.SetUpTest(c)
39 s.token = arvadostest.ActiveToken
42 func (s *v0Suite) TearDownSuite(c *check.C) {
43 ac := arvados.NewClientFromEnv()
44 ac.AuthToken = arvadostest.AdminToken
45 for _, path := range s.toDelete {
46 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
53 func (s *v0Suite) TestFilters(c *check.C) {
54 srv, conn, r, w := s.testClient()
58 c.Check(w.Encode(map[string]interface{}{
59 "method": "subscribe",
60 "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
62 s.expectStatus(c, r, 200)
65 lg := s.expectLog(c, r)
66 c.Check(lg.EventType, check.Equals, "update")
69 func (s *v0Suite) TestLastLogID(c *check.C) {
71 c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
73 srv, conn, r, w := s.testClient()
77 uuidChan := make(chan string, 2)
78 s.emitEvents(uuidChan)
80 c.Check(w.Encode(map[string]interface{}{
81 "method": "subscribe",
82 "last_log_id": lastID,
84 s.expectStatus(c, r, 200)
86 avoidRace := make(chan struct{}, cap(uuidChan))
88 // When last_log_id is given, although v0session sends
89 // old events in order, and sends new events in order,
90 // it doesn't necessarily finish sending all old
91 // events before sending any new events. To avoid
92 // hitting this bug in the test, we wait for the old
93 // events to arrive before emitting any new events.
95 s.emitEvents(uuidChan)
100 for uuid := range uuidChan {
101 for _, etype := range []string{"create", "blip", "update"} {
102 lg := s.expectLog(c, r)
103 for lg.ObjectUUID != uuid {
104 lg = s.expectLog(c, r)
106 c.Check(lg.EventType, check.Equals, etype)
108 avoidRace <- struct{}{}
113 func (s *v0Suite) TestPermission(c *check.C) {
114 srv, conn, r, w := s.testClient()
118 c.Check(w.Encode(map[string]interface{}{
119 "method": "subscribe",
121 s.expectStatus(c, r, 200)
123 uuidChan := make(chan string, 2)
125 s.token = arvadostest.AdminToken
126 s.emitEvents(uuidChan)
127 s.token = arvadostest.ActiveToken
128 s.emitEvents(uuidChan)
131 wrongUUID := <-uuidChan
132 rightUUID := <-uuidChan
133 lg := s.expectLog(c, r)
134 for lg.ObjectUUID != rightUUID {
135 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
136 lg = s.expectLog(c, r)
140 func (s *v0Suite) TestSendBadJSON(c *check.C) {
141 srv, conn, r, w := s.testClient()
145 c.Check(w.Encode(map[string]interface{}{
146 "method": "subscribe",
148 s.expectStatus(c, r, 200)
150 _, err := fmt.Fprint(conn, "^]beep\n")
151 c.Check(err, check.IsNil)
152 s.expectStatus(c, r, 400)
154 c.Check(w.Encode(map[string]interface{}{
155 "method": "subscribe",
157 s.expectStatus(c, r, 200)
160 func (s *v0Suite) TestSubscribe(c *check.C) {
161 srv, conn, r, w := s.testClient()
167 err := w.Encode(map[string]interface{}{"21": 12})
168 c.Check(err, check.IsNil)
169 s.expectStatus(c, r, 400)
171 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
172 c.Check(err, check.IsNil)
173 s.expectStatus(c, r, 200)
175 uuidChan := make(chan string, 1)
176 go s.emitEvents(uuidChan)
179 for _, etype := range []string{"create", "blip", "update"} {
180 lg := s.expectLog(c, r)
181 for lg.ObjectUUID != uuid {
182 lg = s.expectLog(c, r)
184 c.Check(lg.EventType, check.Equals, etype)
188 // Generate some events by creating and updating a workflow object,
189 // and creating a custom log entry (event_type="blip") about the newly
190 // created workflow. If uuidChan is not nil, send the new workflow
191 // UUID to uuidChan as soon as it's known.
192 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
193 ac := arvados.NewClientFromEnv()
194 ac.AuthToken = s.token
195 wf := &arvados.Workflow{
198 err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
206 err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
209 Properties: map[string]interface{}{
216 err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
220 s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
223 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
224 j, err := json.Marshal(ob)
229 v[rscName] = []string{string(j)}
230 return bytes.NewBufferString(v.Encode())
233 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
234 msg := map[string]interface{}{}
235 c.Check(r.Decode(&msg), check.IsNil)
236 c.Check(int(msg["status"].(float64)), check.Equals, status)
239 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
241 ok := make(chan struct{})
243 c.Check(r.Decode(lg), check.IsNil)
247 case <-time.After(10 * time.Second):
254 func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
255 go s.serverSuite.srv.Run()
256 s.serverSuite.srv.WaitReady()
257 srv := s.serverSuite.srv
258 conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
262 w := json.NewEncoder(conn)
263 r := json.NewDecoder(conn)
264 return srv, conn, r, w