"io"
"net/url"
"os"
+ "sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
serverSuite serverSuite
token string
toDelete []string
+ wg sync.WaitGroup
+ ignoreLogID uint64
}
func (s *v0Suite) SetUpTest(c *check.C) {
s.serverSuite.SetUpTest(c)
+ go s.serverSuite.srv.Run()
+ s.serverSuite.srv.WaitReady()
+
s.token = arvadostest.ActiveToken
+ s.ignoreLogID = s.lastLogID(c)
+}
+
+func (s *v0Suite) TearDownTest(c *check.C) {
+ s.wg.Wait()
+ s.serverSuite.srv.Close()
}
func (s *v0Suite) TearDownSuite(c *check.C) {
+ s.deleteTestObjects(c)
+}
+
+func (s *v0Suite) deleteTestObjects(c *check.C) {
ac := arvados.NewClientFromEnv()
ac.AuthToken = arvadostest.AdminToken
for _, path := range s.toDelete {
panic(err)
}
}
+ s.toDelete = nil
}
func (s *v0Suite) TestFilters(c *check.C) {
- srv, conn, r, w := s.testClient()
- defer srv.Close()
+ conn, r, w := s.testClient()
defer conn.Close()
c.Check(w.Encode(map[string]interface{}{
}
func (s *v0Suite) TestLastLogID(c *check.C) {
- var lastID uint64
- c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
+ lastID := s.lastLogID(c)
- srv, conn, r, w := s.testClient()
- defer srv.Close()
- defer conn.Close()
+ checkLogs := func(r *json.Decoder, uuid string) {
+ for _, etype := range []string{"create", "blip", "update"} {
+ lg := s.expectLog(c, r)
+ for lg.ObjectUUID != uuid {
+ lg = s.expectLog(c, r)
+ }
+ c.Check(lg.EventType, check.Equals, etype)
+ }
+ }
- uuidChan := make(chan string, 2)
+ // Connecting connEarly (before sending the early events) lets
+ // us confirm all of the "early" events have already passed
+ // through the server.
+ connEarly, rEarly, wEarly := s.testClient()
+ defer connEarly.Close()
+ c.Check(wEarly.Encode(map[string]interface{}{
+ "method": "subscribe",
+ }), check.IsNil)
+ s.expectStatus(c, rEarly, 200)
+
+ // Send the early events.
+ uuidChan := make(chan string, 1)
s.emitEvents(uuidChan)
+ uuidEarly := <-uuidChan
+
+ // Wait for the early events to pass through.
+ checkLogs(rEarly, uuidEarly)
+
+ // Connect the client that wants to get old events via
+ // last_log_id.
+ conn, r, w := s.testClient()
+ defer conn.Close()
c.Check(w.Encode(map[string]interface{}{
"method": "subscribe",
}), check.IsNil)
s.expectStatus(c, r, 200)
- go func() {
- s.emitEvents(uuidChan)
- close(uuidChan)
- }()
-
- done := make(chan bool)
- go func() {
- for uuid := range uuidChan {
- for _, etype := range []string{"create", "blip", "update"} {
- lg := s.expectLog(c, r)
- c.Check(lg.ObjectUUID, check.Equals, uuid)
- c.Check(lg.EventType, check.Equals, etype)
- }
- }
- close(done)
- }()
-
- select {
- case <-time.After(10 * time.Second):
- c.Fatal("timeout")
- case <-done:
- }
+ checkLogs(r, uuidEarly)
+ s.emitEvents(uuidChan)
+ checkLogs(r, <-uuidChan)
}
func (s *v0Suite) TestPermission(c *check.C) {
- srv, conn, r, w := s.testClient()
- defer srv.Close()
+ conn, r, w := s.testClient()
defer conn.Close()
c.Check(w.Encode(map[string]interface{}{
}), check.IsNil)
s.expectStatus(c, r, 200)
- uuidChan := make(chan string, 1)
+ uuidChan := make(chan string, 2)
go func() {
s.token = arvadostest.AdminToken
- s.emitEvents(nil)
+ s.emitEvents(uuidChan)
s.token = arvadostest.ActiveToken
s.emitEvents(uuidChan)
}()
+ wrongUUID := <-uuidChan
+ rightUUID := <-uuidChan
+ lg := s.expectLog(c, r)
+ for lg.ObjectUUID != rightUUID {
+ c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
+ lg = s.expectLog(c, r)
+ }
+}
+
+// Two users create private objects; admin deletes both objects; each
+// user receives a "delete" event for their own object (not for the
+// other user's object).
+func (s *v0Suite) TestEventTypeDelete(c *check.C) {
+ clients := []struct {
+ token string
+ uuid string
+ conn *websocket.Conn
+ r *json.Decoder
+ w *json.Encoder
+ }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
+ for i := range clients {
+ uuidChan := make(chan string, 1)
+ s.token = clients[i].token
+ s.emitEvents(uuidChan)
+ clients[i].uuid = <-uuidChan
+ clients[i].conn, clients[i].r, clients[i].w = s.testClient()
+
+ c.Check(clients[i].w.Encode(map[string]interface{}{
+ "method": "subscribe",
+ }), check.IsNil)
+ s.expectStatus(c, clients[i].r, 200)
+ }
+
+ s.ignoreLogID = s.lastLogID(c)
+ s.deleteTestObjects(c)
+
+ for _, client := range clients {
+ lg := s.expectLog(c, client.r)
+ c.Check(lg.ObjectUUID, check.Equals, client.uuid)
+ c.Check(lg.EventType, check.Equals, "delete")
+ }
+}
+
+// Trashing/deleting a collection produces an "update" event with
+// properties["new_attributes"]["is_trashed"] == true.
+func (s *v0Suite) TestTrashedCollection(c *check.C) {
+ ac := arvados.NewClientFromEnv()
+ ac.AuthToken = s.token
+
+ coll := &arvados.Collection{ManifestText: ""}
+ err := ac.RequestAndDecode(coll, "POST", "arvados/v1/collections", s.jsonBody("collection", coll), map[string]interface{}{"ensure_unique_name": true})
+ c.Assert(err, check.IsNil)
+ s.ignoreLogID = s.lastLogID(c)
+
+ conn, r, w := s.testClient()
+ defer conn.Close()
+
+ c.Check(w.Encode(map[string]interface{}{
+ "method": "subscribe",
+ }), check.IsNil)
+ s.expectStatus(c, r, 200)
+
+ err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+ c.Assert(err, check.IsNil)
+
lg := s.expectLog(c, r)
- c.Check(lg.ObjectUUID, check.Equals, <-uuidChan)
+ c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
+ c.Check(lg.EventType, check.Equals, "update")
+ c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
+ c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
}
func (s *v0Suite) TestSendBadJSON(c *check.C) {
- srv, conn, r, w := s.testClient()
- defer srv.Close()
+ conn, r, w := s.testClient()
defer conn.Close()
c.Check(w.Encode(map[string]interface{}{
}
func (s *v0Suite) TestSubscribe(c *check.C) {
- srv, conn, r, w := s.testClient()
- defer srv.Close()
+ conn, r, w := s.testClient()
defer conn.Close()
s.emitEvents(nil)
for _, etype := range []string{"create", "blip", "update"} {
lg := s.expectLog(c, r)
- c.Check(lg.ObjectUUID, check.Equals, uuid)
+ for lg.ObjectUUID != uuid {
+ lg = s.expectLog(c, r)
+ }
c.Check(lg.EventType, check.Equals, etype)
}
}
// created workflow. If uuidChan is not nil, send the new workflow
// UUID to uuidChan as soon as it's known.
func (s *v0Suite) emitEvents(uuidChan chan<- string) {
+ s.wg.Add(1)
+ defer s.wg.Done()
+
ac := arvados.NewClientFromEnv()
ac.AuthToken = s.token
wf := &arvados.Workflow{
func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
lg := &arvados.Log{}
- c.Check(r.Decode(lg), check.IsNil)
- return lg
+ ok := make(chan struct{})
+ go func() {
+ for lg.ID <= s.ignoreLogID {
+ c.Check(r.Decode(lg), check.IsNil)
+ }
+ close(ok)
+ }()
+ select {
+ case <-time.After(10 * time.Second):
+ panic("timed out")
+ case <-ok:
+ return lg
+ }
}
-func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
- go s.serverSuite.srv.Run()
- s.serverSuite.srv.WaitReady()
+func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
srv := s.serverSuite.srv
conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
if err != nil {
}
w := json.NewEncoder(conn)
r := json.NewDecoder(conn)
- return srv, conn, r, w
+ return conn, r, w
+}
+
+func (s *v0Suite) lastLogID(c *check.C) uint64 {
+ var lastID uint64
+ c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
+ return lastID
}