1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
18 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
19 "golang.org/x/net/websocket"
20 check "gopkg.in/check.v1"
24 if os.Getenv("ARVADOS_DEBUG") != "" {
25 ctxlog.SetLevel("debug")
29 var _ = check.Suite(&v0Suite{})
32 serverSuite serverSuite
37 func (s *v0Suite) SetUpTest(c *check.C) {
38 s.serverSuite.SetUpTest(c)
39 s.token = arvadostest.ActiveToken
42 func (s *v0Suite) TearDownSuite(c *check.C) {
43 ac := arvados.NewClientFromEnv()
44 ac.AuthToken = arvadostest.AdminToken
45 for _, path := range s.toDelete {
46 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
53 func (s *v0Suite) TestFilters(c *check.C) {
54 srv, conn, r, w := s.testClient()
58 c.Check(w.Encode(map[string]interface{}{
59 "method": "subscribe",
60 "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
62 s.expectStatus(c, r, 200)
65 lg := s.expectLog(c, r)
66 c.Check(lg.EventType, check.Equals, "update")
69 func (s *v0Suite) TestLastLogID(c *check.C) {
71 c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
73 srv, conn, r, w := s.testClient()
77 uuidChan := make(chan string, 2)
78 s.emitEvents(uuidChan)
80 c.Check(w.Encode(map[string]interface{}{
81 "method": "subscribe",
82 "last_log_id": lastID,
84 s.expectStatus(c, r, 200)
87 s.emitEvents(uuidChan)
92 for uuid := range uuidChan {
93 for _, etype := range []string{"create", "blip", "update"} {
94 lg := s.expectLog(c, r)
95 for lg.ObjectUUID != uuid {
96 lg = s.expectLog(c, r)
98 c.Check(lg.EventType, check.Equals, etype)
104 func (s *v0Suite) TestPermission(c *check.C) {
105 srv, conn, r, w := s.testClient()
109 c.Check(w.Encode(map[string]interface{}{
110 "method": "subscribe",
112 s.expectStatus(c, r, 200)
114 uuidChan := make(chan string, 2)
116 s.token = arvadostest.AdminToken
117 s.emitEvents(uuidChan)
118 s.token = arvadostest.ActiveToken
119 s.emitEvents(uuidChan)
122 wrongUUID := <-uuidChan
123 rightUUID := <-uuidChan
124 lg := s.expectLog(c, r)
125 for lg.ObjectUUID != rightUUID {
126 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
127 lg = s.expectLog(c, r)
131 func (s *v0Suite) TestSendBadJSON(c *check.C) {
132 srv, conn, r, w := s.testClient()
136 c.Check(w.Encode(map[string]interface{}{
137 "method": "subscribe",
139 s.expectStatus(c, r, 200)
141 _, err := fmt.Fprint(conn, "^]beep\n")
142 c.Check(err, check.IsNil)
143 s.expectStatus(c, r, 400)
145 c.Check(w.Encode(map[string]interface{}{
146 "method": "subscribe",
148 s.expectStatus(c, r, 200)
151 func (s *v0Suite) TestSubscribe(c *check.C) {
152 srv, conn, r, w := s.testClient()
158 err := w.Encode(map[string]interface{}{"21": 12})
159 c.Check(err, check.IsNil)
160 s.expectStatus(c, r, 400)
162 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
163 c.Check(err, check.IsNil)
164 s.expectStatus(c, r, 200)
166 uuidChan := make(chan string, 1)
167 go s.emitEvents(uuidChan)
170 for _, etype := range []string{"create", "blip", "update"} {
171 lg := s.expectLog(c, r)
172 for lg.ObjectUUID != uuid {
173 lg = s.expectLog(c, r)
175 c.Check(lg.EventType, check.Equals, etype)
179 // Generate some events by creating and updating a workflow object,
180 // and creating a custom log entry (event_type="blip") about the newly
181 // created workflow. If uuidChan is not nil, send the new workflow
182 // UUID to uuidChan as soon as it's known.
183 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
184 ac := arvados.NewClientFromEnv()
185 ac.AuthToken = s.token
186 wf := &arvados.Workflow{
189 err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
197 err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
200 Properties: map[string]interface{}{
207 err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
211 s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
214 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
215 j, err := json.Marshal(ob)
220 v[rscName] = []string{string(j)}
221 return bytes.NewBufferString(v.Encode())
224 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
225 msg := map[string]interface{}{}
226 c.Check(r.Decode(&msg), check.IsNil)
227 c.Check(int(msg["status"].(float64)), check.Equals, status)
230 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
232 ok := make(chan struct{})
234 c.Check(r.Decode(lg), check.IsNil)
238 case <-time.After(10 * time.Second):
245 func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
246 go s.serverSuite.srv.Run()
247 s.serverSuite.srv.WaitReady()
248 srv := s.serverSuite.srv
249 conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
253 w := json.NewEncoder(conn)
254 r := json.NewDecoder(conn)
255 return srv, conn, r, w