11910: Fix racy tests: ignore non-matching logs from previous tests.
[arvados.git] / services / ws / session_v0_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io"
12         "net/url"
13         "os"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
18         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
19         "golang.org/x/net/websocket"
20         check "gopkg.in/check.v1"
21 )
22
23 func init() {
24         if os.Getenv("ARVADOS_DEBUG") != "" {
25                 ctxlog.SetLevel("debug")
26         }
27 }
28
29 var _ = check.Suite(&v0Suite{})
30
31 type v0Suite struct {
32         serverSuite serverSuite
33         token       string
34         toDelete    []string
35 }
36
37 func (s *v0Suite) SetUpTest(c *check.C) {
38         s.serverSuite.SetUpTest(c)
39         s.token = arvadostest.ActiveToken
40 }
41
42 func (s *v0Suite) TearDownSuite(c *check.C) {
43         ac := arvados.NewClientFromEnv()
44         ac.AuthToken = arvadostest.AdminToken
45         for _, path := range s.toDelete {
46                 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
47                 if err != nil {
48                         panic(err)
49                 }
50         }
51 }
52
53 func (s *v0Suite) TestFilters(c *check.C) {
54         srv, conn, r, w := s.testClient()
55         defer srv.Close()
56         defer conn.Close()
57
58         c.Check(w.Encode(map[string]interface{}{
59                 "method":  "subscribe",
60                 "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
61         }), check.IsNil)
62         s.expectStatus(c, r, 200)
63
64         go s.emitEvents(nil)
65         lg := s.expectLog(c, r)
66         c.Check(lg.EventType, check.Equals, "update")
67 }
68
69 func (s *v0Suite) TestLastLogID(c *check.C) {
70         var lastID uint64
71         c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
72
73         srv, conn, r, w := s.testClient()
74         defer srv.Close()
75         defer conn.Close()
76
77         uuidChan := make(chan string, 2)
78         s.emitEvents(uuidChan)
79
80         c.Check(w.Encode(map[string]interface{}{
81                 "method":      "subscribe",
82                 "last_log_id": lastID,
83         }), check.IsNil)
84         s.expectStatus(c, r, 200)
85
86         go func() {
87                 s.emitEvents(uuidChan)
88                 close(uuidChan)
89         }()
90
91         go func() {
92                 for uuid := range uuidChan {
93                         for _, etype := range []string{"create", "blip", "update"} {
94                                 lg := s.expectLog(c, r)
95                                 for lg.ObjectUUID != uuid {
96                                         lg = s.expectLog(c, r)
97                                 }
98                                 c.Check(lg.EventType, check.Equals, etype)
99                         }
100                 }
101         }()
102 }
103
104 func (s *v0Suite) TestPermission(c *check.C) {
105         srv, conn, r, w := s.testClient()
106         defer srv.Close()
107         defer conn.Close()
108
109         c.Check(w.Encode(map[string]interface{}{
110                 "method": "subscribe",
111         }), check.IsNil)
112         s.expectStatus(c, r, 200)
113
114         uuidChan := make(chan string, 2)
115         go func() {
116                 s.token = arvadostest.AdminToken
117                 s.emitEvents(uuidChan)
118                 s.token = arvadostest.ActiveToken
119                 s.emitEvents(uuidChan)
120         }()
121
122         wrongUUID := <-uuidChan
123         rightUUID := <-uuidChan
124         lg := s.expectLog(c, r)
125         for lg.ObjectUUID != rightUUID {
126                 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
127                 lg = s.expectLog(c, r)
128         }
129 }
130
131 func (s *v0Suite) TestSendBadJSON(c *check.C) {
132         srv, conn, r, w := s.testClient()
133         defer srv.Close()
134         defer conn.Close()
135
136         c.Check(w.Encode(map[string]interface{}{
137                 "method": "subscribe",
138         }), check.IsNil)
139         s.expectStatus(c, r, 200)
140
141         _, err := fmt.Fprint(conn, "^]beep\n")
142         c.Check(err, check.IsNil)
143         s.expectStatus(c, r, 400)
144
145         c.Check(w.Encode(map[string]interface{}{
146                 "method": "subscribe",
147         }), check.IsNil)
148         s.expectStatus(c, r, 200)
149 }
150
151 func (s *v0Suite) TestSubscribe(c *check.C) {
152         srv, conn, r, w := s.testClient()
153         defer srv.Close()
154         defer conn.Close()
155
156         s.emitEvents(nil)
157
158         err := w.Encode(map[string]interface{}{"21": 12})
159         c.Check(err, check.IsNil)
160         s.expectStatus(c, r, 400)
161
162         err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
163         c.Check(err, check.IsNil)
164         s.expectStatus(c, r, 200)
165
166         uuidChan := make(chan string, 1)
167         go s.emitEvents(uuidChan)
168         uuid := <-uuidChan
169
170         for _, etype := range []string{"create", "blip", "update"} {
171                 lg := s.expectLog(c, r)
172                 for lg.ObjectUUID != uuid {
173                         lg = s.expectLog(c, r)
174                 }
175                 c.Check(lg.EventType, check.Equals, etype)
176         }
177 }
178
179 // Generate some events by creating and updating a workflow object,
180 // and creating a custom log entry (event_type="blip") about the newly
181 // created workflow. If uuidChan is not nil, send the new workflow
182 // UUID to uuidChan as soon as it's known.
183 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
184         ac := arvados.NewClientFromEnv()
185         ac.AuthToken = s.token
186         wf := &arvados.Workflow{
187                 Name: "ws_test",
188         }
189         err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
190         if err != nil {
191                 panic(err)
192         }
193         if uuidChan != nil {
194                 uuidChan <- wf.UUID
195         }
196         lg := &arvados.Log{}
197         err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
198                 ObjectUUID: wf.UUID,
199                 EventType:  "blip",
200                 Properties: map[string]interface{}{
201                         "beep": "boop",
202                 },
203         }), nil)
204         if err != nil {
205                 panic(err)
206         }
207         err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
208         if err != nil {
209                 panic(err)
210         }
211         s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
212 }
213
214 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
215         j, err := json.Marshal(ob)
216         if err != nil {
217                 panic(err)
218         }
219         v := url.Values{}
220         v[rscName] = []string{string(j)}
221         return bytes.NewBufferString(v.Encode())
222 }
223
224 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
225         msg := map[string]interface{}{}
226         c.Check(r.Decode(&msg), check.IsNil)
227         c.Check(int(msg["status"].(float64)), check.Equals, status)
228 }
229
230 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
231         lg := &arvados.Log{}
232         ok := make(chan struct{})
233         go func() {
234                 c.Check(r.Decode(lg), check.IsNil)
235                 close(ok)
236         }()
237         select {
238         case <-time.After(10 * time.Second):
239                 panic("timed out")
240         case <-ok:
241                 return lg
242         }
243 }
244
245 func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
246         go s.serverSuite.srv.Run()
247         s.serverSuite.srv.WaitReady()
248         srv := s.serverSuite.srv
249         conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
250         if err != nil {
251                 panic(err)
252         }
253         w := json.NewEncoder(conn)
254         r := json.NewDecoder(conn)
255         return srv, conn, r, w
256 }