X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5d081c423f314060cefafc7149850ea1dcbe098a..0af053088c83d1107866cb06fd6c5736d9065eee:/services/ws/session_v0_test.go diff --git a/services/ws/session_v0_test.go b/services/ws/session_v0_test.go index 1e6abbe283..7585bc5e17 100644 --- a/services/ws/session_v0_test.go +++ b/services/ws/session_v0_test.go @@ -34,6 +34,7 @@ type v0Suite struct { token string toDelete []string wg sync.WaitGroup + ignoreLogID uint64 } func (s *v0Suite) SetUpTest(c *check.C) { @@ -42,6 +43,7 @@ func (s *v0Suite) SetUpTest(c *check.C) { s.serverSuite.srv.WaitReady() s.token = arvadostest.ActiveToken + s.ignoreLogID = s.lastLogID(c) } func (s *v0Suite) TearDownTest(c *check.C) { @@ -69,26 +71,65 @@ func (s *v0Suite) TestFilters(c *check.C) { conn, r, w := s.testClient() defer conn.Close() - c.Check(w.Encode(map[string]interface{}{ - "method": "subscribe", - "filters": [][]interface{}{{"event_type", "in", []string{"update"}}}, - }), check.IsNil) - s.expectStatus(c, r, 200) + cmd := func(method, eventType string, status int) { + c.Check(w.Encode(map[string]interface{}{ + "method": method, + "filters": [][]interface{}{{"event_type", "in", []string{eventType}}}, + }), check.IsNil) + s.expectStatus(c, r, status) + } + cmd("subscribe", "update", 200) + cmd("subscribe", "update", 200) + cmd("subscribe", "create", 200) + cmd("subscribe", "update", 200) + cmd("unsubscribe", "blip", 400) + cmd("unsubscribe", "create", 200) + cmd("unsubscribe", "update", 200) go s.emitEvents(nil) lg := s.expectLog(c, r) c.Check(lg.EventType, check.Equals, "update") + + cmd("unsubscribe", "update", 200) + cmd("unsubscribe", "update", 200) + cmd("unsubscribe", "update", 400) } func (s *v0Suite) TestLastLogID(c *check.C) { - var lastID uint64 - c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil) + lastID := s.lastLogID(c) - conn, r, w := s.testClient() - defer conn.Close() + checkLogs := func(r *json.Decoder, uuid string) { + for _, etype := range []string{"create", "blip", "update"} { + lg := s.expectLog(c, r) + for lg.ObjectUUID != uuid { + lg = s.expectLog(c, r) + } + c.Check(lg.EventType, check.Equals, etype) + } + } - uuidChan := make(chan string, 2) + // Connecting connEarly (before sending the early events) lets + // us confirm all of the "early" events have already passed + // through the server. + connEarly, rEarly, wEarly := s.testClient() + defer connEarly.Close() + c.Check(wEarly.Encode(map[string]interface{}{ + "method": "subscribe", + }), check.IsNil) + s.expectStatus(c, rEarly, 200) + + // Send the early events. + uuidChan := make(chan string, 1) s.emitEvents(uuidChan) + uuidEarly := <-uuidChan + + // Wait for the early events to pass through. + checkLogs(rEarly, uuidEarly) + + // Connect the client that wants to get old events via + // last_log_id. + conn, r, w := s.testClient() + defer conn.Close() c.Check(w.Encode(map[string]interface{}{ "method": "subscribe", @@ -96,31 +137,9 @@ func (s *v0Suite) TestLastLogID(c *check.C) { }), check.IsNil) s.expectStatus(c, r, 200) - avoidRace := make(chan struct{}, cap(uuidChan)) - go func() { - // When last_log_id is given, although v0session sends - // old events in order, and sends new events in order, - // it doesn't necessarily finish sending all old - // events before sending any new events. To avoid - // hitting this bug in the test, we wait for the old - // events to arrive before emitting any new events. - <-avoidRace - s.emitEvents(uuidChan) - close(uuidChan) - }() - - go func() { - for uuid := range uuidChan { - for _, etype := range []string{"create", "blip", "update"} { - lg := s.expectLog(c, r) - for lg.ObjectUUID != uuid { - lg = s.expectLog(c, r) - } - c.Check(lg.EventType, check.Equals, etype) - } - avoidRace <- struct{}{} - } - }() + checkLogs(r, uuidEarly) + s.emitEvents(uuidChan) + checkLogs(r, <-uuidChan) } func (s *v0Suite) TestPermission(c *check.C) { @@ -173,6 +192,7 @@ func (s *v0Suite) TestEventTypeDelete(c *check.C) { s.expectStatus(c, clients[i].r, 200) } + s.ignoreLogID = s.lastLogID(c) s.deleteTestObjects(c) for _, client := range clients { @@ -191,6 +211,7 @@ func (s *v0Suite) TestTrashedCollection(c *check.C) { coll := &arvados.Collection{ManifestText: ""} err := ac.RequestAndDecode(coll, "POST", "arvados/v1/collections", s.jsonBody("collection", coll), map[string]interface{}{"ensure_unique_name": true}) c.Assert(err, check.IsNil) + s.ignoreLogID = s.lastLogID(c) conn, r, w := s.testClient() defer conn.Close() @@ -314,7 +335,9 @@ func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log { lg := &arvados.Log{} ok := make(chan struct{}) go func() { - c.Check(r.Decode(lg), check.IsNil) + for lg.ID <= s.ignoreLogID { + c.Check(r.Decode(lg), check.IsNil) + } close(ok) }() select { @@ -335,3 +358,9 @@ func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) { r := json.NewDecoder(conn) return conn, r, w } + +func (s *v0Suite) lastLogID(c *check.C) uint64 { + var lastID uint64 + c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil) + return lastID +}