20930: Test many clients connecting and disonnecting.
[arvados.git] / services / ws / session_v0_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "math/rand"
15         "net/url"
16         "os"
17         "strings"
18         "sync"
19         "sync/atomic"
20         "time"
21
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/arvadostest"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "golang.org/x/net/websocket"
26         check "gopkg.in/check.v1"
27 )
28
29 func init() {
30         if os.Getenv("ARVADOS_DEBUG") != "" {
31                 ctxlog.SetLevel("debug")
32         }
33 }
34
35 var _ = check.Suite(&v0Suite{})
36
37 type v0Suite struct {
38         serviceSuite serviceSuite
39         token        string
40         toDelete     []string
41         wg           sync.WaitGroup
42         ignoreLogID  int64
43 }
44
45 func (s *v0Suite) SetUpTest(c *check.C) {
46         s.serviceSuite.SetUpTest(c)
47         s.serviceSuite.start(c)
48
49         s.token = arvadostest.ActiveToken
50         s.ignoreLogID = s.lastLogID(c)
51 }
52
53 func (s *v0Suite) TearDownTest(c *check.C) {
54         s.wg.Wait()
55         s.serviceSuite.TearDownTest(c)
56 }
57
58 func (s *v0Suite) TearDownSuite(c *check.C) {
59         s.deleteTestObjects(c)
60 }
61
62 func (s *v0Suite) deleteTestObjects(c *check.C) {
63         ac := arvados.NewClientFromEnv()
64         ac.AuthToken = arvadostest.AdminToken
65         for _, path := range s.toDelete {
66                 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
67                 c.Check(err, check.IsNil)
68         }
69         s.toDelete = nil
70 }
71
72 func (s *v0Suite) TestFilters(c *check.C) {
73         conn, r, w, err := s.testClient()
74         c.Assert(err, check.IsNil)
75         defer conn.Close()
76
77         cmd := func(method, eventType string, status int) {
78                 c.Check(w.Encode(map[string]interface{}{
79                         "method":  method,
80                         "filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
81                 }), check.IsNil)
82                 s.expectStatus(c, r, status)
83         }
84         cmd("subscribe", "update", 200)
85         cmd("subscribe", "update", 200)
86         cmd("subscribe", "create", 200)
87         cmd("subscribe", "update", 200)
88         cmd("unsubscribe", "blip", 400)
89         cmd("unsubscribe", "create", 200)
90         cmd("unsubscribe", "update", 200)
91
92         go s.emitEvents(c, nil, nil)
93         lg := s.expectLog(c, r)
94         c.Check(lg.EventType, check.Equals, "update")
95
96         cmd("unsubscribe", "update", 200)
97         cmd("unsubscribe", "update", 200)
98         cmd("unsubscribe", "update", 400)
99 }
100
101 func (s *v0Suite) TestLastLogID(c *check.C) {
102         lastID := s.lastLogID(c)
103
104         checkLogs := func(r *json.Decoder, uuid string) {
105                 for _, etype := range []string{"create", "blip", "update"} {
106                         lg := s.expectLog(c, r)
107                         for lg.ObjectUUID != uuid {
108                                 lg = s.expectLog(c, r)
109                         }
110                         c.Check(lg.EventType, check.Equals, etype)
111                 }
112         }
113
114         // Connecting connEarly (before sending the early events) lets
115         // us confirm all of the "early" events have already passed
116         // through the server.
117         connEarly, rEarly, wEarly, err := s.testClient()
118         c.Assert(err, check.IsNil)
119         defer connEarly.Close()
120         c.Check(wEarly.Encode(map[string]interface{}{
121                 "method": "subscribe",
122         }), check.IsNil)
123         s.expectStatus(c, rEarly, 200)
124
125         // Send the early events.
126         uuidChan := make(chan string, 1)
127         s.emitEvents(c, uuidChan, nil)
128         uuidEarly := <-uuidChan
129
130         // Wait for the early events to pass through.
131         checkLogs(rEarly, uuidEarly)
132
133         // Connect the client that wants to get old events via
134         // last_log_id.
135         conn, r, w, err := s.testClient()
136         c.Assert(err, check.IsNil)
137         defer conn.Close()
138
139         c.Check(w.Encode(map[string]interface{}{
140                 "method":      "subscribe",
141                 "last_log_id": lastID,
142         }), check.IsNil)
143         s.expectStatus(c, r, 200)
144
145         checkLogs(r, uuidEarly)
146         s.emitEvents(c, uuidChan, nil)
147         checkLogs(r, <-uuidChan)
148 }
149
150 func (s *v0Suite) TestPermission(c *check.C) {
151         conn, r, w, err := s.testClient()
152         c.Assert(err, check.IsNil)
153         defer conn.Close()
154
155         c.Check(w.Encode(map[string]interface{}{
156                 "method": "subscribe",
157         }), check.IsNil)
158         s.expectStatus(c, r, 200)
159
160         uuidChan := make(chan string, 2)
161         go func() {
162                 s.token = arvadostest.AdminToken
163                 s.emitEvents(c, uuidChan, nil)
164                 s.token = arvadostest.ActiveToken
165                 s.emitEvents(c, uuidChan, nil)
166         }()
167
168         wrongUUID := <-uuidChan
169         rightUUID := <-uuidChan
170         lg := s.expectLog(c, r)
171         for lg.ObjectUUID != rightUUID {
172                 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
173                 lg = s.expectLog(c, r)
174         }
175 }
176
177 // Two users create private objects; admin deletes both objects; each
178 // user receives a "delete" event for their own object (not for the
179 // other user's object).
180 func (s *v0Suite) TestEventTypeDelete(c *check.C) {
181         clients := []struct {
182                 token string
183                 uuid  string
184                 conn  *websocket.Conn
185                 r     *json.Decoder
186                 w     *json.Encoder
187         }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
188         for i := range clients {
189                 uuidChan := make(chan string, 1)
190                 s.token = clients[i].token
191                 s.emitEvents(c, uuidChan, nil)
192                 clients[i].uuid = <-uuidChan
193
194                 var err error
195                 clients[i].conn, clients[i].r, clients[i].w, err = s.testClient()
196                 c.Assert(err, check.IsNil)
197
198                 c.Check(clients[i].w.Encode(map[string]interface{}{
199                         "method": "subscribe",
200                 }), check.IsNil)
201                 s.expectStatus(c, clients[i].r, 200)
202         }
203
204         s.ignoreLogID = s.lastLogID(c)
205         s.deleteTestObjects(c)
206
207         for _, client := range clients {
208                 lg := s.expectLog(c, client.r)
209                 c.Check(lg.ObjectUUID, check.Equals, client.uuid)
210                 c.Check(lg.EventType, check.Equals, "delete")
211         }
212 }
213
214 // Trashing/deleting a collection produces an "update" event with
215 // properties["new_attributes"]["is_trashed"] == true.
216 func (s *v0Suite) TestTrashedCollection(c *check.C) {
217         ac := arvados.NewClientFromEnv()
218         ac.AuthToken = s.token
219
220         var coll arvados.Collection
221         err := ac.RequestAndDecode(&coll, "POST", "arvados/v1/collections", s.jsonBody("collection", `{"manifest_text":""}`), map[string]interface{}{"ensure_unique_name": true})
222         c.Assert(err, check.IsNil)
223         s.ignoreLogID = s.lastLogID(c)
224
225         conn, r, w, err := s.testClient()
226         c.Assert(err, check.IsNil)
227         defer conn.Close()
228
229         c.Check(w.Encode(map[string]interface{}{
230                 "method": "subscribe",
231         }), check.IsNil)
232         s.expectStatus(c, r, 200)
233
234         err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
235         c.Assert(err, check.IsNil)
236
237         lg := s.expectLog(c, r)
238         c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
239         c.Check(lg.EventType, check.Equals, "update")
240         c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
241         c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
242 }
243
244 func (s *v0Suite) TestSendBadJSON(c *check.C) {
245         conn, r, w, err := s.testClient()
246         c.Assert(err, check.IsNil)
247         defer conn.Close()
248
249         c.Check(w.Encode(map[string]interface{}{
250                 "method": "subscribe",
251         }), check.IsNil)
252         s.expectStatus(c, r, 200)
253
254         _, err = fmt.Fprint(conn, "^]beep\n")
255         c.Check(err, check.IsNil)
256         s.expectStatus(c, r, 400)
257
258         c.Check(w.Encode(map[string]interface{}{
259                 "method": "subscribe",
260         }), check.IsNil)
261         s.expectStatus(c, r, 200)
262 }
263
264 func (s *v0Suite) TestSubscribe(c *check.C) {
265         conn, r, w, err := s.testClient()
266         c.Assert(err, check.IsNil)
267         defer conn.Close()
268
269         s.emitEvents(c, nil, nil)
270
271         err = w.Encode(map[string]interface{}{"21": 12})
272         c.Check(err, check.IsNil)
273         s.expectStatus(c, r, 400)
274
275         err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
276         c.Check(err, check.IsNil)
277         s.expectStatus(c, r, 200)
278
279         uuidChan := make(chan string, 1)
280         go s.emitEvents(c, uuidChan, nil)
281         uuid := <-uuidChan
282
283         for _, etype := range []string{"create", "blip", "update"} {
284                 lg := s.expectLog(c, r)
285                 for lg.ObjectUUID != uuid {
286                         lg = s.expectLog(c, r)
287                 }
288                 c.Check(lg.EventType, check.Equals, etype)
289         }
290 }
291
292 func (s *v0Suite) TestManyEventsAndSubscribers(c *check.C) {
293         // Frequent slow listener pings create the conditions for a
294         // deadlock issue with the lib/pq example listener usage.
295         //
296         // Specifically: a lib/pq/example/listen-style event loop can
297         // deadlock if enough (~32) server notifications arrive after
298         // the event loop decides to call Ping (e.g., while
299         // listener.Ping() is waiting for a response from the server,
300         // or in the time.Sleep() invoked by testSlowPing).
301         //
302         // (*ListenerConn)listenerConnLoop() doesn't see the server's
303         // ping response until it finishes sending a previous
304         // notification through its internal queue to
305         // (*Listener)listenerConnLoop(), which is blocked on sending
306         // to our Notify channel, which is blocked on waiting for the
307         // Ping response.
308         defer func(d time.Duration) {
309                 listenerPingInterval = d
310                 testSlowPing = false
311         }(listenerPingInterval)
312         listenerPingInterval = time.Second / 2
313         testSlowPing = true
314         // Restart the test server in order to get one that uses our
315         // test globals.
316         s.TearDownTest(c)
317         s.SetUpTest(c)
318
319         done := make(chan struct{})
320         defer close(done)
321         go s.emitEvents(c, nil, done)
322
323         // We will expect to receive at least one event during each
324         // one-second interval while the test is running.
325         t0 := time.Now()
326         seconds := 10
327         receivedPerSecond := make([]int64, seconds)
328
329         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(seconds)*time.Second))
330         defer cancel()
331         for clientID := 0; clientID < 100; clientID++ {
332                 clientID := clientID
333                 go func() {
334                         for ctx.Err() == nil {
335                                 conn, r, w, err := s.testClient()
336                                 if ctx.Err() != nil {
337                                         return
338                                 }
339                                 c.Assert(err, check.IsNil)
340                                 defer conn.Close()
341                                 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
342                                 if ctx.Err() != nil {
343                                         return
344                                 }
345                                 c.Check(err, check.IsNil)
346                                 s.expectStatus(c, r, 200)
347                                 for {
348                                         if clientID%10 == 0 {
349                                                 // slow client
350                                                 time.Sleep(time.Second / 20)
351                                         } else if rand.Float64() < 0.01 {
352                                                 // disconnect+reconnect
353                                                 break
354                                         }
355                                         var lg arvados.Log
356                                         err := r.Decode(&lg)
357                                         if ctx.Err() != nil {
358                                                 return
359                                         }
360                                         if errors.Is(err, io.EOF) {
361                                                 break
362                                         }
363                                         c.Check(err, check.IsNil)
364                                         if i := int(time.Since(t0) / time.Second); i < seconds {
365                                                 atomic.AddInt64(&receivedPerSecond[i], 1)
366                                         }
367                                 }
368                                 conn.Close()
369                         }
370                 }()
371         }
372         <-ctx.Done()
373         c.Log("done")
374         for i, n := range receivedPerSecond {
375                 c.Logf("t<%d n=%d", i+1, n)
376                 c.Check(int64(n), check.Not(check.Equals), int64(0))
377         }
378 }
379
380 // Generate some events by creating and updating a workflow object,
381 // and creating a custom log entry (event_type="blip") about the newly
382 // created workflow.
383 //
384 // If uuidChan is not nil, send the new workflow UUID to uuidChan as
385 // soon as it's known.
386 //
387 // If done is not nil, keep generating events until done receives or
388 // closes.
389 func (s *v0Suite) emitEvents(c *check.C, uuidChan chan<- string, done <-chan struct{}) {
390         s.wg.Add(1)
391         defer s.wg.Done()
392
393         ac := arvados.NewClientFromEnv()
394         ac.AuthToken = s.token
395         wf := &arvados.Workflow{
396                 Name: "ws_test",
397         }
398         err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", `{"name":"ws_test"}`), map[string]interface{}{"ensure_unique_name": true})
399         c.Assert(err, check.IsNil)
400         s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID)
401         if uuidChan != nil {
402                 uuidChan <- wf.UUID
403         }
404         for i := 0; ; i++ {
405                 lg := &arvados.Log{}
406                 err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
407                         "object_uuid": wf.UUID,
408                         "event_type":  "blip",
409                         "properties": map[string]interface{}{
410                                 "beep": "boop",
411                         },
412                 }), nil)
413                 s.toDelete = append(s.toDelete, "arvados/v1/logs/"+lg.UUID)
414                 if done != nil {
415                         select {
416                         case <-done:
417                         default:
418                                 if i%50 == 0 {
419                                         time.Sleep(100 * time.Millisecond)
420                                 }
421                                 continue
422                         }
423                 }
424                 break
425         }
426         if err != nil {
427                 panic(err)
428         }
429         err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", `{"name":"ws_test"}`), nil)
430         if err != nil {
431                 panic(err)
432         }
433 }
434
435 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
436         val, ok := ob.(string)
437         if !ok {
438                 j, err := json.Marshal(ob)
439                 if err != nil {
440                         panic(err)
441                 }
442                 val = string(j)
443         }
444         v := url.Values{}
445         v[rscName] = []string{val}
446         return bytes.NewBufferString(v.Encode())
447 }
448
449 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
450         msg := map[string]interface{}{}
451         c.Check(r.Decode(&msg), check.IsNil)
452         c.Check(int(msg["status"].(float64)), check.Equals, status)
453 }
454
455 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
456         lg := &arvados.Log{}
457         ok := make(chan struct{})
458         go func() {
459                 defer close(ok)
460                 for lg.ID <= s.ignoreLogID {
461                         c.Assert(r.Decode(lg), check.IsNil)
462                 }
463         }()
464         select {
465         case <-time.After(10 * time.Second):
466                 c.Error("timed out")
467                 c.FailNow()
468                 return lg
469         case <-ok:
470                 return lg
471         }
472 }
473
474 func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder, error) {
475         srv := s.serviceSuite.srv
476         conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL)
477         if err != nil {
478                 return nil, nil, nil, err
479         }
480         w := json.NewEncoder(conn)
481         r := json.NewDecoder(conn)
482         return conn, r, w, nil
483 }
484
485 func (s *v0Suite) lastLogID(c *check.C) int64 {
486         var lastID int64
487         c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
488         return lastID
489 }