From: Tom Clegg Date: Fri, 14 Jul 2017 14:51:54 +0000 (-0400) Subject: Merge branch '11960-trash-events' X-Git-Tag: 1.1.0~134 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/eb66b34d22cef6c98ad9ebdc228cf784b3136b72?hp=b8977c89db2707dd713cd4eeb11d595717fcff75 Merge branch '11960-trash-events' closes #11960 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/services/ws/permission.go b/services/ws/permission.go index b40c1fa186..a39a959312 100644 --- a/services/ws/permission.go +++ b/services/ws/permission.go @@ -74,7 +74,8 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) { pc.nMisses++ err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{ - "select": {`["uuid"]`}, + "include_trash": {"true"}, + "select": {`["uuid"]`}, }) var allowed bool diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index bc7e6ecb9e..58c64231cb 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -20,7 +20,16 @@ var ( errQueueFull = errors.New("client queue full") errFrameTooBig = errors.New("frame too big") - sendObjectAttributes = []string{"state", "name", "owner_uuid", "portable_data_hash"} + // Send clients only these keys from the + // log.properties.old_attributes and + // log.properties.new_attributes hashes. + sendObjectAttributes = []string{ + "is_trashed", + "name", + "owner_uuid", + "portable_data_hash", + "state", + } v0subscribeOK = []byte(`{"status":200}`) v0subscribeFail = []byte(`{"status":400}`) @@ -90,7 +99,17 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) { return nil, nil } - ok, err := sess.permChecker.Check(detail.ObjectUUID) + var permTarget string + if detail.EventType == "delete" { + // It's pointless to check permission by reading + // ObjectUUID if it has just been deleted, but if the + // client has permission on the parent project then + // it's OK to send the event. + permTarget = detail.ObjectOwnerUUID + } else { + permTarget = detail.ObjectUUID + } + ok, err := sess.permChecker.Check(permTarget) if err != nil || !ok { return nil, err } @@ -143,7 +162,7 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) { if sub.LastLogID == 0 { return } - sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents") + sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents") // Here we do a "select id" query and queue an event for every // log since the given ID, then use (*event)Detail() to // retrieve the whole row and decide whether to send it. This @@ -158,17 +177,26 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) { sub.LastLogID, time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano)) if err != nil { - sess.log.WithError(err).Error("db.Query failed") + sess.log.WithError(err).Error("sendOldEvents db.Query failed") return } - defer rows.Close() + + var ids []uint64 for rows.Next() { var id uint64 err := rows.Scan(&id) if err != nil { - sess.log.WithError(err).Error("row Scan failed") + sess.log.WithError(err).Error("sendOldEvents row Scan failed") continue } + ids = append(ids, id) + } + if err := rows.Err(); err != nil { + sess.log.WithError(err).Error("sendOldEvents db.Query failed") + } + rows.Close() + + for _, id := range ids { for len(sess.sendq)*2 > cap(sess.sendq) { // Ugly... but if we fill up the whole client // queue with a backlog of old events, a @@ -193,9 +221,6 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) { } } } - if err := rows.Err(); err != nil { - sess.log.WithError(err).Error("db.Query failed") - } } type v0subscribe struct { diff --git a/services/ws/session_v0_test.go b/services/ws/session_v0_test.go index 1213be5d14..9f743e0b5e 100644 --- a/services/ws/session_v0_test.go +++ b/services/ws/session_v0_test.go @@ -11,6 +11,7 @@ import ( "io" "net/url" "os" + "sync" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" @@ -32,14 +33,29 @@ type v0Suite struct { serverSuite serverSuite token string toDelete []string + wg sync.WaitGroup + ignoreLogID uint64 } func (s *v0Suite) SetUpTest(c *check.C) { s.serverSuite.SetUpTest(c) + go s.serverSuite.srv.Run() + s.serverSuite.srv.WaitReady() + s.token = arvadostest.ActiveToken + s.ignoreLogID = s.lastLogID(c) +} + +func (s *v0Suite) TearDownTest(c *check.C) { + s.wg.Wait() + s.serverSuite.srv.Close() } func (s *v0Suite) TearDownSuite(c *check.C) { + s.deleteTestObjects(c) +} + +func (s *v0Suite) deleteTestObjects(c *check.C) { ac := arvados.NewClientFromEnv() ac.AuthToken = arvadostest.AdminToken for _, path := range s.toDelete { @@ -48,11 +64,11 @@ func (s *v0Suite) TearDownSuite(c *check.C) { panic(err) } } + s.toDelete = nil } func (s *v0Suite) TestFilters(c *check.C) { - srv, conn, r, w := s.testClient() - defer srv.Close() + conn, r, w := s.testClient() defer conn.Close() c.Check(w.Encode(map[string]interface{}{ @@ -67,15 +83,40 @@ func (s *v0Suite) TestFilters(c *check.C) { } func (s *v0Suite) TestLastLogID(c *check.C) { - var lastID uint64 - c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil) + lastID := s.lastLogID(c) - srv, conn, r, w := s.testClient() - defer srv.Close() - defer conn.Close() + checkLogs := func(r *json.Decoder, uuid string) { + for _, etype := range []string{"create", "blip", "update"} { + lg := s.expectLog(c, r) + for lg.ObjectUUID != uuid { + lg = s.expectLog(c, r) + } + c.Check(lg.EventType, check.Equals, etype) + } + } - uuidChan := make(chan string, 2) + // Connecting connEarly (before sending the early events) lets + // us confirm all of the "early" events have already passed + // through the server. + connEarly, rEarly, wEarly := s.testClient() + defer connEarly.Close() + c.Check(wEarly.Encode(map[string]interface{}{ + "method": "subscribe", + }), check.IsNil) + s.expectStatus(c, rEarly, 200) + + // Send the early events. + uuidChan := make(chan string, 1) s.emitEvents(uuidChan) + uuidEarly := <-uuidChan + + // Wait for the early events to pass through. + checkLogs(rEarly, uuidEarly) + + // Connect the client that wants to get old events via + // last_log_id. + conn, r, w := s.testClient() + defer conn.Close() c.Check(w.Encode(map[string]interface{}{ "method": "subscribe", @@ -83,36 +124,13 @@ func (s *v0Suite) TestLastLogID(c *check.C) { }), check.IsNil) s.expectStatus(c, r, 200) - avoidRace := make(chan struct{}, cap(uuidChan)) - go func() { - // When last_log_id is given, although v0session sends - // old events in order, and sends new events in order, - // it doesn't necessarily finish sending all old - // events before sending any new events. To avoid - // hitting this bug in the test, we wait for the old - // events to arrive before emitting any new events. - <-avoidRace - s.emitEvents(uuidChan) - close(uuidChan) - }() - - go func() { - for uuid := range uuidChan { - for _, etype := range []string{"create", "blip", "update"} { - lg := s.expectLog(c, r) - for lg.ObjectUUID != uuid { - lg = s.expectLog(c, r) - } - c.Check(lg.EventType, check.Equals, etype) - } - avoidRace <- struct{}{} - } - }() + checkLogs(r, uuidEarly) + s.emitEvents(uuidChan) + checkLogs(r, <-uuidChan) } func (s *v0Suite) TestPermission(c *check.C) { - srv, conn, r, w := s.testClient() - defer srv.Close() + conn, r, w := s.testClient() defer conn.Close() c.Check(w.Encode(map[string]interface{}{ @@ -137,9 +155,71 @@ func (s *v0Suite) TestPermission(c *check.C) { } } +// Two users create private objects; admin deletes both objects; each +// user receives a "delete" event for their own object (not for the +// other user's object). +func (s *v0Suite) TestEventTypeDelete(c *check.C) { + clients := []struct { + token string + uuid string + conn *websocket.Conn + r *json.Decoder + w *json.Encoder + }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}} + for i := range clients { + uuidChan := make(chan string, 1) + s.token = clients[i].token + s.emitEvents(uuidChan) + clients[i].uuid = <-uuidChan + clients[i].conn, clients[i].r, clients[i].w = s.testClient() + + c.Check(clients[i].w.Encode(map[string]interface{}{ + "method": "subscribe", + }), check.IsNil) + s.expectStatus(c, clients[i].r, 200) + } + + s.ignoreLogID = s.lastLogID(c) + s.deleteTestObjects(c) + + for _, client := range clients { + lg := s.expectLog(c, client.r) + c.Check(lg.ObjectUUID, check.Equals, client.uuid) + c.Check(lg.EventType, check.Equals, "delete") + } +} + +// Trashing/deleting a collection produces an "update" event with +// properties["new_attributes"]["is_trashed"] == true. +func (s *v0Suite) TestTrashedCollection(c *check.C) { + ac := arvados.NewClientFromEnv() + ac.AuthToken = s.token + + coll := &arvados.Collection{ManifestText: ""} + err := ac.RequestAndDecode(coll, "POST", "arvados/v1/collections", s.jsonBody("collection", coll), map[string]interface{}{"ensure_unique_name": true}) + c.Assert(err, check.IsNil) + s.ignoreLogID = s.lastLogID(c) + + conn, r, w := s.testClient() + defer conn.Close() + + c.Check(w.Encode(map[string]interface{}{ + "method": "subscribe", + }), check.IsNil) + s.expectStatus(c, r, 200) + + err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil) + c.Assert(err, check.IsNil) + + lg := s.expectLog(c, r) + c.Check(lg.ObjectUUID, check.Equals, coll.UUID) + c.Check(lg.EventType, check.Equals, "update") + c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false) + c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true) +} + func (s *v0Suite) TestSendBadJSON(c *check.C) { - srv, conn, r, w := s.testClient() - defer srv.Close() + conn, r, w := s.testClient() defer conn.Close() c.Check(w.Encode(map[string]interface{}{ @@ -158,8 +238,7 @@ func (s *v0Suite) TestSendBadJSON(c *check.C) { } func (s *v0Suite) TestSubscribe(c *check.C) { - srv, conn, r, w := s.testClient() - defer srv.Close() + conn, r, w := s.testClient() defer conn.Close() s.emitEvents(nil) @@ -190,6 +269,9 @@ func (s *v0Suite) TestSubscribe(c *check.C) { // created workflow. If uuidChan is not nil, send the new workflow // UUID to uuidChan as soon as it's known. func (s *v0Suite) emitEvents(uuidChan chan<- string) { + s.wg.Add(1) + defer s.wg.Done() + ac := arvados.NewClientFromEnv() ac.AuthToken = s.token wf := &arvados.Workflow{ @@ -240,7 +322,9 @@ func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log { lg := &arvados.Log{} ok := make(chan struct{}) go func() { - c.Check(r.Decode(lg), check.IsNil) + for lg.ID <= s.ignoreLogID { + c.Check(r.Decode(lg), check.IsNil) + } close(ok) }() select { @@ -251,9 +335,7 @@ func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log { } } -func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) { - go s.serverSuite.srv.Run() - s.serverSuite.srv.WaitReady() +func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) { srv := s.serverSuite.srv conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String()) if err != nil { @@ -261,5 +343,11 @@ func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.E } w := json.NewEncoder(conn) r := json.NewDecoder(conn) - return srv, conn, r, w + return conn, r, w +} + +func (s *v0Suite) lastLogID(c *check.C) uint64 { + var lastID uint64 + c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil) + return lastID }