X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f36162457a771824059fefa098a3ffb89c59263f..f86892e89212ec7ada8c666a983d8f56fb30b15d:/services/ws/session_v0_test.go diff --git a/services/ws/session_v0_test.go b/services/ws/session_v0_test.go index d8a2b698f4..7d15543c05 100644 --- a/services/ws/session_v0_test.go +++ b/services/ws/session_v0_test.go @@ -1,19 +1,27 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package ws import ( "bytes" + "context" "encoding/json" + "errors" "fmt" "io" - "net" - "net/http" + "math/rand" "net/url" "os" + "strings" + "sync" + "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadostest" - "git.curoverse.com/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadostest" + "git.arvados.org/arvados.git/sdk/go/ctxlog" "golang.org/x/net/websocket" check "gopkg.in/check.v1" ) @@ -27,51 +35,106 @@ func init() { var _ = check.Suite(&v0Suite{}) type v0Suite struct { - token string - toDelete []string + serviceSuite serviceSuite + token string + toDelete []string + wg sync.WaitGroup + ignoreLogID int64 } func (s *v0Suite) SetUpTest(c *check.C) { + s.serviceSuite.SetUpTest(c) + s.serviceSuite.start(c) + s.token = arvadostest.ActiveToken + s.ignoreLogID = s.lastLogID(c) +} + +func (s *v0Suite) TearDownTest(c *check.C) { + s.wg.Wait() + s.serviceSuite.TearDownTest(c) } func (s *v0Suite) TearDownSuite(c *check.C) { + s.deleteTestObjects(c) +} + +func (s *v0Suite) deleteTestObjects(c *check.C) { ac := arvados.NewClientFromEnv() ac.AuthToken = arvadostest.AdminToken for _, path := range s.toDelete { err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil) - if err != nil { - panic(err) - } + c.Check(err, check.IsNil) } + s.toDelete = nil } func (s *v0Suite) TestFilters(c *check.C) { - srv, conn, r, w := s.testClient() - defer srv.Close() + conn, r, w, err := s.testClient() + c.Assert(err, check.IsNil) defer conn.Close() - c.Check(w.Encode(map[string]interface{}{ - "method": "subscribe", - "filters": [][]interface{}{{"event_type", "in", []string{"update"}}}, - }), check.IsNil) - s.expectStatus(c, r, 200) - - go s.emitEvents(nil) + cmd := func(method, eventType string, status int) { + c.Check(w.Encode(map[string]interface{}{ + "method": method, + "filters": [][]interface{}{{"event_type", "in", []string{eventType}}}, + }), check.IsNil) + s.expectStatus(c, r, status) + } + cmd("subscribe", "update", 200) + cmd("subscribe", "update", 200) + cmd("subscribe", "create", 200) + cmd("subscribe", "update", 200) + cmd("unsubscribe", "blip", 400) + cmd("unsubscribe", "create", 200) + cmd("unsubscribe", "update", 200) + + go s.emitEvents(c, nil, nil) lg := s.expectLog(c, r) c.Check(lg.EventType, check.Equals, "update") + + cmd("unsubscribe", "update", 200) + cmd("unsubscribe", "update", 200) + cmd("unsubscribe", "update", 400) } func (s *v0Suite) TestLastLogID(c *check.C) { - var lastID uint64 - c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil) + lastID := s.lastLogID(c) - srv, conn, r, w := s.testClient() - defer srv.Close() - defer conn.Close() + checkLogs := func(r *json.Decoder, uuid string) { + for _, etype := range []string{"create", "blip", "update"} { + lg := s.expectLog(c, r) + for lg.ObjectUUID != uuid { + lg = s.expectLog(c, r) + } + c.Check(lg.EventType, check.Equals, etype) + } + } - uuidChan := make(chan string, 2) - s.emitEvents(uuidChan) + // Connecting connEarly (before sending the early events) lets + // us confirm all of the "early" events have already passed + // through the server. + connEarly, rEarly, wEarly, err := s.testClient() + c.Assert(err, check.IsNil) + defer connEarly.Close() + c.Check(wEarly.Encode(map[string]interface{}{ + "method": "subscribe", + }), check.IsNil) + s.expectStatus(c, rEarly, 200) + + // Send the early events. + uuidChan := make(chan string, 1) + s.emitEvents(c, uuidChan, nil) + uuidEarly := <-uuidChan + + // Wait for the early events to pass through. + checkLogs(rEarly, uuidEarly) + + // Connect the client that wants to get old events via + // last_log_id. + conn, r, w, err := s.testClient() + c.Assert(err, check.IsNil) + defer conn.Close() c.Check(w.Encode(map[string]interface{}{ "method": "subscribe", @@ -79,33 +142,14 @@ func (s *v0Suite) TestLastLogID(c *check.C) { }), check.IsNil) s.expectStatus(c, r, 200) - go func() { - s.emitEvents(uuidChan) - close(uuidChan) - }() - - done := make(chan bool) - go func() { - for uuid := range uuidChan { - for _, etype := range []string{"create", "blip", "update"} { - lg := s.expectLog(c, r) - c.Check(lg.ObjectUUID, check.Equals, uuid) - c.Check(lg.EventType, check.Equals, etype) - } - } - close(done) - }() - - select { - case <-time.After(10 * time.Second): - c.Fatal("timeout") - case <-done: - } + checkLogs(r, uuidEarly) + s.emitEvents(c, uuidChan, nil) + checkLogs(r, <-uuidChan) } func (s *v0Suite) TestPermission(c *check.C) { - srv, conn, r, w := s.testClient() - defer srv.Close() + conn, r, w, err := s.testClient() + c.Assert(err, check.IsNil) defer conn.Close() c.Check(w.Encode(map[string]interface{}{ @@ -113,21 +157,128 @@ func (s *v0Suite) TestPermission(c *check.C) { }), check.IsNil) s.expectStatus(c, r, 200) - uuidChan := make(chan string, 1) + uuidChan := make(chan string, 2) go func() { s.token = arvadostest.AdminToken - s.emitEvents(nil) + s.emitEvents(c, uuidChan, nil) s.token = arvadostest.ActiveToken - s.emitEvents(uuidChan) + s.emitEvents(c, uuidChan, nil) }() + wrongUUID := <-uuidChan + rightUUID := <-uuidChan + lg := s.expectLog(c, r) + for lg.ObjectUUID != rightUUID { + c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID) + lg = s.expectLog(c, r) + } +} + +// Two users create private objects; admin deletes both objects; each +// user receives a "delete" event for their own object (not for the +// other user's object). +func (s *v0Suite) TestEventTypeDelete(c *check.C) { + clients := []struct { + token string + uuid string + conn *websocket.Conn + r *json.Decoder + w *json.Encoder + }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}} + for i := range clients { + uuidChan := make(chan string, 1) + s.token = clients[i].token + s.emitEvents(c, uuidChan, nil) + clients[i].uuid = <-uuidChan + + var err error + clients[i].conn, clients[i].r, clients[i].w, err = s.testClient() + c.Assert(err, check.IsNil) + + c.Check(clients[i].w.Encode(map[string]interface{}{ + "method": "subscribe", + }), check.IsNil) + s.expectStatus(c, clients[i].r, 200) + } + + s.ignoreLogID = s.lastLogID(c) + s.deleteTestObjects(c) + + for _, client := range clients { + lg := s.expectLog(c, client.r) + c.Check(lg.ObjectUUID, check.Equals, client.uuid) + c.Check(lg.EventType, check.Equals, "delete") + } +} + +func (s *v0Suite) TestEventPropertiesFields(c *check.C) { + ac := arvados.NewClientFromEnv() + ac.AuthToken = s.token + + conn, r, w, err := s.testClient() + c.Assert(err, check.IsNil) + defer conn.Close() + + c.Check(w.Encode(map[string]interface{}{ + "method": "subscribe", + "filters": [][]string{{"object_uuid", "=", arvadostest.RunningContainerUUID}}, + }), check.IsNil) + s.expectStatus(c, r, 200) + + err = ac.RequestAndDecode(nil, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{ + "object_uuid": arvadostest.RunningContainerUUID, + "event_type": "update", + "properties": map[string]interface{}{ + "new_attributes": map[string]interface{}{ + "name": "namevalue", + "requesting_container_uuid": "uuidvalue", + "state": "statevalue", + }, + }, + }), nil) + c.Assert(err, check.IsNil) + lg := s.expectLog(c, r) - c.Check(lg.ObjectUUID, check.Equals, <-uuidChan) + c.Check(lg.ObjectUUID, check.Equals, arvadostest.RunningContainerUUID) + c.Check(lg.EventType, check.Equals, "update") + c.Check(lg.Properties["new_attributes"].(map[string]interface{})["requesting_container_uuid"], check.Equals, "uuidvalue") + c.Check(lg.Properties["new_attributes"].(map[string]interface{})["name"], check.Equals, "namevalue") + c.Check(lg.Properties["new_attributes"].(map[string]interface{})["state"], check.Equals, "statevalue") +} + +// Trashing/deleting a collection produces an "update" event with +// properties["new_attributes"]["is_trashed"] == true. +func (s *v0Suite) TestTrashedCollection(c *check.C) { + ac := arvados.NewClientFromEnv() + ac.AuthToken = s.token + + var coll arvados.Collection + err := ac.RequestAndDecode(&coll, "POST", "arvados/v1/collections", s.jsonBody("collection", `{"manifest_text":""}`), map[string]interface{}{"ensure_unique_name": true}) + c.Assert(err, check.IsNil) + s.ignoreLogID = s.lastLogID(c) + + conn, r, w, err := s.testClient() + c.Assert(err, check.IsNil) + defer conn.Close() + + c.Check(w.Encode(map[string]interface{}{ + "method": "subscribe", + }), check.IsNil) + s.expectStatus(c, r, 200) + + err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil) + c.Assert(err, check.IsNil) + + lg := s.expectLog(c, r) + c.Check(lg.ObjectUUID, check.Equals, coll.UUID) + c.Check(lg.EventType, check.Equals, "update") + c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false) + c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true) } func (s *v0Suite) TestSendBadJSON(c *check.C) { - srv, conn, r, w := s.testClient() - defer srv.Close() + conn, r, w, err := s.testClient() + c.Assert(err, check.IsNil) defer conn.Close() c.Check(w.Encode(map[string]interface{}{ @@ -135,7 +286,7 @@ func (s *v0Suite) TestSendBadJSON(c *check.C) { }), check.IsNil) s.expectStatus(c, r, 200) - _, err := fmt.Fprint(conn, "^]beep\n") + _, err = fmt.Fprint(conn, "^]beep\n") c.Check(err, check.IsNil) s.expectStatus(c, r, 400) @@ -146,13 +297,13 @@ func (s *v0Suite) TestSendBadJSON(c *check.C) { } func (s *v0Suite) TestSubscribe(c *check.C) { - srv, conn, r, w := s.testClient() - defer srv.Close() + conn, r, w, err := s.testClient() + c.Assert(err, check.IsNil) defer conn.Close() - s.emitEvents(nil) + s.emitEvents(c, nil, nil) - err := w.Encode(map[string]interface{}{"21": 12}) + err = w.Encode(map[string]interface{}{"21": 12}) c.Check(err, check.IsNil) s.expectStatus(c, r, 400) @@ -161,58 +312,172 @@ func (s *v0Suite) TestSubscribe(c *check.C) { s.expectStatus(c, r, 200) uuidChan := make(chan string, 1) - go s.emitEvents(uuidChan) + go s.emitEvents(c, uuidChan, nil) uuid := <-uuidChan for _, etype := range []string{"create", "blip", "update"} { lg := s.expectLog(c, r) - c.Check(lg.ObjectUUID, check.Equals, uuid) + for lg.ObjectUUID != uuid { + lg = s.expectLog(c, r) + } c.Check(lg.EventType, check.Equals, etype) } } +func (s *v0Suite) TestManyEventsAndSubscribers(c *check.C) { + // Frequent slow listener pings create the conditions for a + // deadlock issue with the lib/pq example listener usage. + // + // Specifically: a lib/pq/example/listen-style event loop can + // deadlock if enough (~32) server notifications arrive after + // the event loop decides to call Ping (e.g., while + // listener.Ping() is waiting for a response from the server, + // or in the time.Sleep() invoked by testSlowPing). + // + // (*ListenerConn)listenerConnLoop() doesn't see the server's + // ping response until it finishes sending a previous + // notification through its internal queue to + // (*Listener)listenerConnLoop(), which is blocked on sending + // to our Notify channel, which is blocked on waiting for the + // Ping response. + defer func(d time.Duration) { + listenerPingInterval = d + testSlowPing = false + }(listenerPingInterval) + listenerPingInterval = time.Second / 2 + testSlowPing = true + // Restart the test server in order to get one that uses our + // test globals. + s.TearDownTest(c) + s.SetUpTest(c) + + done := make(chan struct{}) + defer close(done) + go s.emitEvents(c, nil, done) + + // We will expect to receive at least one event during each + // one-second interval while the test is running. + t0 := time.Now() + seconds := 10 + receivedPerSecond := make([]int64, seconds) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(seconds)*time.Second)) + defer cancel() + for clientID := 0; clientID < 100; clientID++ { + clientID := clientID + go func() { + for ctx.Err() == nil { + conn, r, w, err := s.testClient() + if ctx.Err() != nil { + return + } + c.Assert(err, check.IsNil) + defer conn.Close() + err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}}) + if ctx.Err() != nil { + return + } + c.Check(err, check.IsNil) + s.expectStatus(c, r, 200) + for { + if clientID%10 == 0 { + // slow client + time.Sleep(time.Second / 20) + } else if rand.Float64() < 0.01 { + // disconnect+reconnect + break + } + var lg arvados.Log + err := r.Decode(&lg) + if ctx.Err() != nil { + return + } + if errors.Is(err, io.EOF) { + break + } + c.Check(err, check.IsNil) + if i := int(time.Since(t0) / time.Second); i < seconds { + atomic.AddInt64(&receivedPerSecond[i], 1) + } + } + conn.Close() + } + }() + } + <-ctx.Done() + c.Log("done") + for i, n := range receivedPerSecond { + c.Logf("t<%d n=%d", i+1, n) + c.Check(int64(n), check.Not(check.Equals), int64(0)) + } +} + // Generate some events by creating and updating a workflow object, // and creating a custom log entry (event_type="blip") about the newly -// created workflow. If uuidChan is not nil, send the new workflow -// UUID to uuidChan as soon as it's known. -func (s *v0Suite) emitEvents(uuidChan chan<- string) { +// created workflow. +// +// If uuidChan is not nil, send the new workflow UUID to uuidChan as +// soon as it's known. +// +// If done is not nil, keep generating events until done receives or +// closes. +func (s *v0Suite) emitEvents(c *check.C, uuidChan chan<- string, done <-chan struct{}) { + s.wg.Add(1) + defer s.wg.Done() + ac := arvados.NewClientFromEnv() ac.AuthToken = s.token wf := &arvados.Workflow{ Name: "ws_test", } - err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true}) - if err != nil { - panic(err) - } + err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", `{"name":"ws_test"}`), map[string]interface{}{"ensure_unique_name": true}) + c.Assert(err, check.IsNil) + s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID) if uuidChan != nil { uuidChan <- wf.UUID } - lg := &arvados.Log{} - err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{ - ObjectUUID: wf.UUID, - EventType: "blip", - Properties: map[string]interface{}{ - "beep": "boop", - }, - }), nil) + for i := 0; ; i++ { + lg := &arvados.Log{} + err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{ + "object_uuid": wf.UUID, + "event_type": "blip", + "properties": map[string]interface{}{ + "beep": "boop", + }, + }), nil) + s.toDelete = append(s.toDelete, "arvados/v1/logs/"+lg.UUID) + if done != nil { + select { + case <-done: + default: + if i%50 == 0 { + time.Sleep(100 * time.Millisecond) + } + continue + } + } + break + } if err != nil { panic(err) } - err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil) + err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", `{"name":"ws_test"}`), nil) if err != nil { panic(err) } - s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID) } func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader { - j, err := json.Marshal(ob) - if err != nil { - panic(err) + val, ok := ob.(string) + if !ok { + j, err := json.Marshal(ob) + if err != nil { + panic(err) + } + val = string(j) } v := url.Values{} - v[rscName] = []string{string(j)} + v[rscName] = []string{val} return bytes.NewBufferString(v.Encode()) } @@ -224,61 +489,36 @@ func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) { func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log { lg := &arvados.Log{} - c.Check(r.Decode(lg), check.IsNil) - return lg + ok := make(chan struct{}) + go func() { + defer close(ok) + for lg.ID <= s.ignoreLogID { + c.Assert(r.Decode(lg), check.IsNil) + } + }() + select { + case <-time.After(10 * time.Second): + c.Error("timed out") + c.FailNow() + return lg + case <-ok: + return lg + } } -func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) { - srv := newTestServer() - conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr) +func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder, error) { + srv := s.serviceSuite.srv + conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL) if err != nil { - panic(err) + return nil, nil, nil, err } w := json.NewEncoder(conn) r := json.NewDecoder(conn) - return srv, conn, r, w -} - -type testServer struct { - http.Server - addr string - ln net.Listener - pges *pgEventSource -} - -func (srv *testServer) Close() { - srv.ln.Close() - srv.pges.cancel() + return conn, r, w, nil } -func newTestServer() *testServer { - ln, err := net.Listen("tcp", ":") - if err != nil { - panic(err) - } - cfg := defaultConfig() - cfg.Client = *(arvados.NewClientFromEnv()) - pges := &pgEventSource{ - DataSource: testDBConfig().ConnectionString(), - QueueSize: 4, - } - srv := &testServer{ - Server: http.Server{ - Addr: ":", - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - Handler: &router{ - Config: &cfg, - eventSource: pges, - newPermChecker: func() permChecker { return newPermChecker(cfg.Client) }, - }, - }, - addr: ln.Addr().String(), - ln: ln, - pges: pges, - } - go pges.Run() - go srv.Serve(ln) - pges.waitReady() - return srv +func (s *v0Suite) lastLogID(c *check.C) int64 { + var lastID int64 + c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil) + return lastID }