Merge branch '14539-pysdk-empty-dir'
[arvados.git] / services / ws / session_v0_test.go
index f4dc23fecfa3fd94f3de59c7629135b9461a83b9..7585bc5e17e017dc095a141d550e4e609c877c94 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -5,10 +9,9 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "net"
-       "net/http"
        "net/url"
        "os"
+       "sync"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -27,15 +30,32 @@ func init() {
 var _ = check.Suite(&v0Suite{})
 
 type v0Suite struct {
-       token    string
-       toDelete []string
+       serverSuite serverSuite
+       token       string
+       toDelete    []string
+       wg          sync.WaitGroup
+       ignoreLogID uint64
 }
 
 func (s *v0Suite) SetUpTest(c *check.C) {
+       s.serverSuite.SetUpTest(c)
+       go s.serverSuite.srv.Run()
+       s.serverSuite.srv.WaitReady()
+
        s.token = arvadostest.ActiveToken
+       s.ignoreLogID = s.lastLogID(c)
+}
+
+func (s *v0Suite) TearDownTest(c *check.C) {
+       s.wg.Wait()
+       s.serverSuite.srv.Close()
 }
 
 func (s *v0Suite) TearDownSuite(c *check.C) {
+       s.deleteTestObjects(c)
+}
+
+func (s *v0Suite) deleteTestObjects(c *check.C) {
        ac := arvados.NewClientFromEnv()
        ac.AuthToken = arvadostest.AdminToken
        for _, path := range s.toDelete {
@@ -44,34 +64,72 @@ func (s *v0Suite) TearDownSuite(c *check.C) {
                        panic(err)
                }
        }
+       s.toDelete = nil
 }
 
 func (s *v0Suite) TestFilters(c *check.C) {
-       srv, conn, r, w := s.testClient()
-       defer srv.Close()
+       conn, r, w := s.testClient()
        defer conn.Close()
 
-       c.Check(w.Encode(map[string]interface{}{
-               "method":  "subscribe",
-               "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
-       }), check.IsNil)
-       s.expectStatus(c, r, 200)
+       cmd := func(method, eventType string, status int) {
+               c.Check(w.Encode(map[string]interface{}{
+                       "method":  method,
+                       "filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
+               }), check.IsNil)
+               s.expectStatus(c, r, status)
+       }
+       cmd("subscribe", "update", 200)
+       cmd("subscribe", "update", 200)
+       cmd("subscribe", "create", 200)
+       cmd("subscribe", "update", 200)
+       cmd("unsubscribe", "blip", 400)
+       cmd("unsubscribe", "create", 200)
+       cmd("unsubscribe", "update", 200)
 
        go s.emitEvents(nil)
        lg := s.expectLog(c, r)
        c.Check(lg.EventType, check.Equals, "update")
+
+       cmd("unsubscribe", "update", 200)
+       cmd("unsubscribe", "update", 200)
+       cmd("unsubscribe", "update", 400)
 }
 
 func (s *v0Suite) TestLastLogID(c *check.C) {
-       var lastID uint64
-       c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
+       lastID := s.lastLogID(c)
 
-       srv, conn, r, w := s.testClient()
-       defer srv.Close()
-       defer conn.Close()
+       checkLogs := func(r *json.Decoder, uuid string) {
+               for _, etype := range []string{"create", "blip", "update"} {
+                       lg := s.expectLog(c, r)
+                       for lg.ObjectUUID != uuid {
+                               lg = s.expectLog(c, r)
+                       }
+                       c.Check(lg.EventType, check.Equals, etype)
+               }
+       }
 
-       uuidChan := make(chan string, 2)
+       // Connecting connEarly (before sending the early events) lets
+       // us confirm all of the "early" events have already passed
+       // through the server.
+       connEarly, rEarly, wEarly := s.testClient()
+       defer connEarly.Close()
+       c.Check(wEarly.Encode(map[string]interface{}{
+               "method": "subscribe",
+       }), check.IsNil)
+       s.expectStatus(c, rEarly, 200)
+
+       // Send the early events.
+       uuidChan := make(chan string, 1)
        s.emitEvents(uuidChan)
+       uuidEarly := <-uuidChan
+
+       // Wait for the early events to pass through.
+       checkLogs(rEarly, uuidEarly)
+
+       // Connect the client that wants to get old events via
+       // last_log_id.
+       conn, r, w := s.testClient()
+       defer conn.Close()
 
        c.Check(w.Encode(map[string]interface{}{
                "method":      "subscribe",
@@ -79,33 +137,13 @@ func (s *v0Suite) TestLastLogID(c *check.C) {
        }), check.IsNil)
        s.expectStatus(c, r, 200)
 
-       go func() {
-               s.emitEvents(uuidChan)
-               close(uuidChan)
-       }()
-
-       done := make(chan bool)
-       go func() {
-               for uuid := range uuidChan {
-                       for _, etype := range []string{"create", "blip", "update"} {
-                               lg := s.expectLog(c, r)
-                               c.Check(lg.ObjectUUID, check.Equals, uuid)
-                               c.Check(lg.EventType, check.Equals, etype)
-                       }
-               }
-               close(done)
-       }()
-
-       select {
-       case <-time.After(10 * time.Second):
-               c.Fatal("timeout")
-       case <-done:
-       }
+       checkLogs(r, uuidEarly)
+       s.emitEvents(uuidChan)
+       checkLogs(r, <-uuidChan)
 }
 
 func (s *v0Suite) TestPermission(c *check.C) {
-       srv, conn, r, w := s.testClient()
-       defer srv.Close()
+       conn, r, w := s.testClient()
        defer conn.Close()
 
        c.Check(w.Encode(map[string]interface{}{
@@ -113,21 +151,88 @@ func (s *v0Suite) TestPermission(c *check.C) {
        }), check.IsNil)
        s.expectStatus(c, r, 200)
 
-       uuidChan := make(chan string, 1)
+       uuidChan := make(chan string, 2)
        go func() {
                s.token = arvadostest.AdminToken
-               s.emitEvents(nil)
+               s.emitEvents(uuidChan)
                s.token = arvadostest.ActiveToken
                s.emitEvents(uuidChan)
        }()
 
+       wrongUUID := <-uuidChan
+       rightUUID := <-uuidChan
+       lg := s.expectLog(c, r)
+       for lg.ObjectUUID != rightUUID {
+               c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
+               lg = s.expectLog(c, r)
+       }
+}
+
+// Two users create private objects; admin deletes both objects; each
+// user receives a "delete" event for their own object (not for the
+// other user's object).
+func (s *v0Suite) TestEventTypeDelete(c *check.C) {
+       clients := []struct {
+               token string
+               uuid  string
+               conn  *websocket.Conn
+               r     *json.Decoder
+               w     *json.Encoder
+       }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
+       for i := range clients {
+               uuidChan := make(chan string, 1)
+               s.token = clients[i].token
+               s.emitEvents(uuidChan)
+               clients[i].uuid = <-uuidChan
+               clients[i].conn, clients[i].r, clients[i].w = s.testClient()
+
+               c.Check(clients[i].w.Encode(map[string]interface{}{
+                       "method": "subscribe",
+               }), check.IsNil)
+               s.expectStatus(c, clients[i].r, 200)
+       }
+
+       s.ignoreLogID = s.lastLogID(c)
+       s.deleteTestObjects(c)
+
+       for _, client := range clients {
+               lg := s.expectLog(c, client.r)
+               c.Check(lg.ObjectUUID, check.Equals, client.uuid)
+               c.Check(lg.EventType, check.Equals, "delete")
+       }
+}
+
+// Trashing/deleting a collection produces an "update" event with
+// properties["new_attributes"]["is_trashed"] == true.
+func (s *v0Suite) TestTrashedCollection(c *check.C) {
+       ac := arvados.NewClientFromEnv()
+       ac.AuthToken = s.token
+
+       coll := &arvados.Collection{ManifestText: ""}
+       err := ac.RequestAndDecode(coll, "POST", "arvados/v1/collections", s.jsonBody("collection", coll), map[string]interface{}{"ensure_unique_name": true})
+       c.Assert(err, check.IsNil)
+       s.ignoreLogID = s.lastLogID(c)
+
+       conn, r, w := s.testClient()
+       defer conn.Close()
+
+       c.Check(w.Encode(map[string]interface{}{
+               "method": "subscribe",
+       }), check.IsNil)
+       s.expectStatus(c, r, 200)
+
+       err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+       c.Assert(err, check.IsNil)
+
        lg := s.expectLog(c, r)
-       c.Check(lg.ObjectUUID, check.Equals, <-uuidChan)
+       c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
+       c.Check(lg.EventType, check.Equals, "update")
+       c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
+       c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
 }
 
 func (s *v0Suite) TestSendBadJSON(c *check.C) {
-       srv, conn, r, w := s.testClient()
-       defer srv.Close()
+       conn, r, w := s.testClient()
        defer conn.Close()
 
        c.Check(w.Encode(map[string]interface{}{
@@ -146,8 +251,7 @@ func (s *v0Suite) TestSendBadJSON(c *check.C) {
 }
 
 func (s *v0Suite) TestSubscribe(c *check.C) {
-       srv, conn, r, w := s.testClient()
-       defer srv.Close()
+       conn, r, w := s.testClient()
        defer conn.Close()
 
        s.emitEvents(nil)
@@ -166,7 +270,9 @@ func (s *v0Suite) TestSubscribe(c *check.C) {
 
        for _, etype := range []string{"create", "blip", "update"} {
                lg := s.expectLog(c, r)
-               c.Check(lg.ObjectUUID, check.Equals, uuid)
+               for lg.ObjectUUID != uuid {
+                       lg = s.expectLog(c, r)
+               }
                c.Check(lg.EventType, check.Equals, etype)
        }
 }
@@ -176,6 +282,9 @@ func (s *v0Suite) TestSubscribe(c *check.C) {
 // created workflow. If uuidChan is not nil, send the new workflow
 // UUID to uuidChan as soon as it's known.
 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
+       s.wg.Add(1)
+       defer s.wg.Done()
+
        ac := arvados.NewClientFromEnv()
        ac.AuthToken = s.token
        wf := &arvados.Workflow{
@@ -224,63 +333,34 @@ func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
 
 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
        lg := &arvados.Log{}
-       c.Check(r.Decode(lg), check.IsNil)
-       return lg
+       ok := make(chan struct{})
+       go func() {
+               for lg.ID <= s.ignoreLogID {
+                       c.Check(r.Decode(lg), check.IsNil)
+               }
+               close(ok)
+       }()
+       select {
+       case <-time.After(10 * time.Second):
+               panic("timed out")
+       case <-ok:
+               return lg
+       }
 }
 
-func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) {
-       srv := newTestServer()
-       conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr)
+func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
+       srv := s.serverSuite.srv
+       conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
        if err != nil {
                panic(err)
        }
        w := json.NewEncoder(conn)
        r := json.NewDecoder(conn)
-       return srv, conn, r, w
-}
-
-type testServer struct {
-       http.Server
-       addr string
-       stop chan bool
-}
-
-func (srv *testServer) Close() {
-       close(srv.stop)
+       return conn, r, w
 }
 
-func newTestServer() *testServer {
-       ln, err := net.Listen("tcp", ":")
-       if err != nil {
-               panic(err)
-       }
-       cfg := defaultConfig()
-       cfg.Client = *(arvados.NewClientFromEnv())
-       es := &pgEventSource{
-               DataSource: testDBConfig().ConnectionString(),
-               QueueSize:  4,
-       }
-       srv := &testServer{
-               Server: http.Server{
-                       Addr:         ":",
-                       ReadTimeout:  10 * time.Second,
-                       WriteTimeout: 10 * time.Second,
-                       Handler: &router{
-                               Config:         &cfg,
-                               eventSource:    es,
-                               newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
-                       },
-               },
-               addr: ln.Addr().String(),
-               stop: make(chan bool),
-       }
-       go es.Run()
-       go srv.Serve(ln)
-       go func() {
-               <-srv.stop
-               ln.Close()
-               es.cancel()
-       }()
-       es.waitReady()
-       return srv
+func (s *v0Suite) lastLogID(c *check.C) uint64 {
+       var lastID uint64
+       c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
+       return lastID
 }