Merge branch '11960-trash-events'
authorTom Clegg <tom@curoverse.com>
Fri, 14 Jul 2017 14:51:54 +0000 (10:51 -0400)
committerTom Clegg <tom@curoverse.com>
Fri, 14 Jul 2017 14:51:54 +0000 (10:51 -0400)
closes #11960

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curoverse.com>

services/ws/permission.go
services/ws/session_v0.go
services/ws/session_v0_test.go

index b40c1fa1863219048a2ba70f8b2afb1ffc394391..a39a959312aed582957d8845dddfc70394b649a4 100644 (file)
@@ -74,7 +74,8 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
 
        pc.nMisses++
        err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
-               "select": {`["uuid"]`},
+               "include_trash": {"true"},
+               "select":        {`["uuid"]`},
        })
 
        var allowed bool
index bc7e6ecb9e391e5afef37edabfeffd791c4cd710..58c64231cb53c1204ceed70b0ea030a7050ebb95 100644 (file)
@@ -20,7 +20,16 @@ var (
        errQueueFull   = errors.New("client queue full")
        errFrameTooBig = errors.New("frame too big")
 
-       sendObjectAttributes = []string{"state", "name", "owner_uuid", "portable_data_hash"}
+       // Send clients only these keys from the
+       // log.properties.old_attributes and
+       // log.properties.new_attributes hashes.
+       sendObjectAttributes = []string{
+               "is_trashed",
+               "name",
+               "owner_uuid",
+               "portable_data_hash",
+               "state",
+       }
 
        v0subscribeOK   = []byte(`{"status":200}`)
        v0subscribeFail = []byte(`{"status":400}`)
@@ -90,7 +99,17 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) {
                return nil, nil
        }
 
-       ok, err := sess.permChecker.Check(detail.ObjectUUID)
+       var permTarget string
+       if detail.EventType == "delete" {
+               // It's pointless to check permission by reading
+               // ObjectUUID if it has just been deleted, but if the
+               // client has permission on the parent project then
+               // it's OK to send the event.
+               permTarget = detail.ObjectOwnerUUID
+       } else {
+               permTarget = detail.ObjectUUID
+       }
+       ok, err := sess.permChecker.Check(permTarget)
        if err != nil || !ok {
                return nil, err
        }
@@ -143,7 +162,7 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
        if sub.LastLogID == 0 {
                return
        }
-       sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
+       sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents")
        // Here we do a "select id" query and queue an event for every
        // log since the given ID, then use (*event)Detail() to
        // retrieve the whole row and decide whether to send it. This
@@ -158,17 +177,26 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                sub.LastLogID,
                time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
        if err != nil {
-               sess.log.WithError(err).Error("db.Query failed")
+               sess.log.WithError(err).Error("sendOldEvents db.Query failed")
                return
        }
-       defer rows.Close()
+
+       var ids []uint64
        for rows.Next() {
                var id uint64
                err := rows.Scan(&id)
                if err != nil {
-                       sess.log.WithError(err).Error("row Scan failed")
+                       sess.log.WithError(err).Error("sendOldEvents row Scan failed")
                        continue
                }
+               ids = append(ids, id)
+       }
+       if err := rows.Err(); err != nil {
+               sess.log.WithError(err).Error("sendOldEvents db.Query failed")
+       }
+       rows.Close()
+
+       for _, id := range ids {
                for len(sess.sendq)*2 > cap(sess.sendq) {
                        // Ugly... but if we fill up the whole client
                        // queue with a backlog of old events, a
@@ -193,9 +221,6 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        }
                }
        }
-       if err := rows.Err(); err != nil {
-               sess.log.WithError(err).Error("db.Query failed")
-       }
 }
 
 type v0subscribe struct {
index 1213be5d140555f0a533cfc4aa639ef9959e7db2..9f743e0b5e3d58312d2b3a2636b148bd493b51e0 100644 (file)
@@ -11,6 +11,7 @@ import (
        "io"
        "net/url"
        "os"
+       "sync"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -32,14 +33,29 @@ type v0Suite struct {
        serverSuite serverSuite
        token       string
        toDelete    []string
+       wg          sync.WaitGroup
+       ignoreLogID uint64
 }
 
 func (s *v0Suite) SetUpTest(c *check.C) {
        s.serverSuite.SetUpTest(c)
+       go s.serverSuite.srv.Run()
+       s.serverSuite.srv.WaitReady()
+
        s.token = arvadostest.ActiveToken
+       s.ignoreLogID = s.lastLogID(c)
+}
+
+func (s *v0Suite) TearDownTest(c *check.C) {
+       s.wg.Wait()
+       s.serverSuite.srv.Close()
 }
 
 func (s *v0Suite) TearDownSuite(c *check.C) {
+       s.deleteTestObjects(c)
+}
+
+func (s *v0Suite) deleteTestObjects(c *check.C) {
        ac := arvados.NewClientFromEnv()
        ac.AuthToken = arvadostest.AdminToken
        for _, path := range s.toDelete {
@@ -48,11 +64,11 @@ func (s *v0Suite) TearDownSuite(c *check.C) {
                        panic(err)
                }
        }
+       s.toDelete = nil
 }
 
 func (s *v0Suite) TestFilters(c *check.C) {
-       srv, conn, r, w := s.testClient()
-       defer srv.Close()
+       conn, r, w := s.testClient()
        defer conn.Close()
 
        c.Check(w.Encode(map[string]interface{}{
@@ -67,15 +83,40 @@ func (s *v0Suite) TestFilters(c *check.C) {
 }
 
 func (s *v0Suite) TestLastLogID(c *check.C) {
-       var lastID uint64
-       c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
+       lastID := s.lastLogID(c)
 
-       srv, conn, r, w := s.testClient()
-       defer srv.Close()
-       defer conn.Close()
+       checkLogs := func(r *json.Decoder, uuid string) {
+               for _, etype := range []string{"create", "blip", "update"} {
+                       lg := s.expectLog(c, r)
+                       for lg.ObjectUUID != uuid {
+                               lg = s.expectLog(c, r)
+                       }
+                       c.Check(lg.EventType, check.Equals, etype)
+               }
+       }
 
-       uuidChan := make(chan string, 2)
+       // Connecting connEarly (before sending the early events) lets
+       // us confirm all of the "early" events have already passed
+       // through the server.
+       connEarly, rEarly, wEarly := s.testClient()
+       defer connEarly.Close()
+       c.Check(wEarly.Encode(map[string]interface{}{
+               "method": "subscribe",
+       }), check.IsNil)
+       s.expectStatus(c, rEarly, 200)
+
+       // Send the early events.
+       uuidChan := make(chan string, 1)
        s.emitEvents(uuidChan)
+       uuidEarly := <-uuidChan
+
+       // Wait for the early events to pass through.
+       checkLogs(rEarly, uuidEarly)
+
+       // Connect the client that wants to get old events via
+       // last_log_id.
+       conn, r, w := s.testClient()
+       defer conn.Close()
 
        c.Check(w.Encode(map[string]interface{}{
                "method":      "subscribe",
@@ -83,36 +124,13 @@ func (s *v0Suite) TestLastLogID(c *check.C) {
        }), check.IsNil)
        s.expectStatus(c, r, 200)
 
-       avoidRace := make(chan struct{}, cap(uuidChan))
-       go func() {
-               // When last_log_id is given, although v0session sends
-               // old events in order, and sends new events in order,
-               // it doesn't necessarily finish sending all old
-               // events before sending any new events. To avoid
-               // hitting this bug in the test, we wait for the old
-               // events to arrive before emitting any new events.
-               <-avoidRace
-               s.emitEvents(uuidChan)
-               close(uuidChan)
-       }()
-
-       go func() {
-               for uuid := range uuidChan {
-                       for _, etype := range []string{"create", "blip", "update"} {
-                               lg := s.expectLog(c, r)
-                               for lg.ObjectUUID != uuid {
-                                       lg = s.expectLog(c, r)
-                               }
-                               c.Check(lg.EventType, check.Equals, etype)
-                       }
-                       avoidRace <- struct{}{}
-               }
-       }()
+       checkLogs(r, uuidEarly)
+       s.emitEvents(uuidChan)
+       checkLogs(r, <-uuidChan)
 }
 
 func (s *v0Suite) TestPermission(c *check.C) {
-       srv, conn, r, w := s.testClient()
-       defer srv.Close()
+       conn, r, w := s.testClient()
        defer conn.Close()
 
        c.Check(w.Encode(map[string]interface{}{
@@ -137,9 +155,71 @@ func (s *v0Suite) TestPermission(c *check.C) {
        }
 }
 
+// Two users create private objects; admin deletes both objects; each
+// user receives a "delete" event for their own object (not for the
+// other user's object).
+func (s *v0Suite) TestEventTypeDelete(c *check.C) {
+       clients := []struct {
+               token string
+               uuid  string
+               conn  *websocket.Conn
+               r     *json.Decoder
+               w     *json.Encoder
+       }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
+       for i := range clients {
+               uuidChan := make(chan string, 1)
+               s.token = clients[i].token
+               s.emitEvents(uuidChan)
+               clients[i].uuid = <-uuidChan
+               clients[i].conn, clients[i].r, clients[i].w = s.testClient()
+
+               c.Check(clients[i].w.Encode(map[string]interface{}{
+                       "method": "subscribe",
+               }), check.IsNil)
+               s.expectStatus(c, clients[i].r, 200)
+       }
+
+       s.ignoreLogID = s.lastLogID(c)
+       s.deleteTestObjects(c)
+
+       for _, client := range clients {
+               lg := s.expectLog(c, client.r)
+               c.Check(lg.ObjectUUID, check.Equals, client.uuid)
+               c.Check(lg.EventType, check.Equals, "delete")
+       }
+}
+
+// Trashing/deleting a collection produces an "update" event with
+// properties["new_attributes"]["is_trashed"] == true.
+func (s *v0Suite) TestTrashedCollection(c *check.C) {
+       ac := arvados.NewClientFromEnv()
+       ac.AuthToken = s.token
+
+       coll := &arvados.Collection{ManifestText: ""}
+       err := ac.RequestAndDecode(coll, "POST", "arvados/v1/collections", s.jsonBody("collection", coll), map[string]interface{}{"ensure_unique_name": true})
+       c.Assert(err, check.IsNil)
+       s.ignoreLogID = s.lastLogID(c)
+
+       conn, r, w := s.testClient()
+       defer conn.Close()
+
+       c.Check(w.Encode(map[string]interface{}{
+               "method": "subscribe",
+       }), check.IsNil)
+       s.expectStatus(c, r, 200)
+
+       err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+       c.Assert(err, check.IsNil)
+
+       lg := s.expectLog(c, r)
+       c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
+       c.Check(lg.EventType, check.Equals, "update")
+       c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
+       c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
+}
+
 func (s *v0Suite) TestSendBadJSON(c *check.C) {
-       srv, conn, r, w := s.testClient()
-       defer srv.Close()
+       conn, r, w := s.testClient()
        defer conn.Close()
 
        c.Check(w.Encode(map[string]interface{}{
@@ -158,8 +238,7 @@ func (s *v0Suite) TestSendBadJSON(c *check.C) {
 }
 
 func (s *v0Suite) TestSubscribe(c *check.C) {
-       srv, conn, r, w := s.testClient()
-       defer srv.Close()
+       conn, r, w := s.testClient()
        defer conn.Close()
 
        s.emitEvents(nil)
@@ -190,6 +269,9 @@ func (s *v0Suite) TestSubscribe(c *check.C) {
 // created workflow. If uuidChan is not nil, send the new workflow
 // UUID to uuidChan as soon as it's known.
 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
+       s.wg.Add(1)
+       defer s.wg.Done()
+
        ac := arvados.NewClientFromEnv()
        ac.AuthToken = s.token
        wf := &arvados.Workflow{
@@ -240,7 +322,9 @@ func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
        lg := &arvados.Log{}
        ok := make(chan struct{})
        go func() {
-               c.Check(r.Decode(lg), check.IsNil)
+               for lg.ID <= s.ignoreLogID {
+                       c.Check(r.Decode(lg), check.IsNil)
+               }
                close(ok)
        }()
        select {
@@ -251,9 +335,7 @@ func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
        }
 }
 
-func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
-       go s.serverSuite.srv.Run()
-       s.serverSuite.srv.WaitReady()
+func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
        srv := s.serverSuite.srv
        conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
        if err != nil {
@@ -261,5 +343,11 @@ func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.E
        }
        w := json.NewEncoder(conn)
        r := json.NewDecoder(conn)
-       return srv, conn, r, w
+       return conn, r, w
+}
+
+func (s *v0Suite) lastLogID(c *check.C) uint64 {
+       var lastID uint64
+       c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
+       return lastID
 }