Merge branch '21535-multi-wf-delete'
[arvados.git] / services / ws / session_v0_test.go
index d7c9edb24f512186f68debcf86fe70d64dd34f60..7d15543c05277cc5f7056af7116443ee55a1cc51 100644 (file)
@@ -6,13 +6,17 @@ package ws
 
 import (
        "bytes"
+       "context"
        "encoding/json"
+       "errors"
        "fmt"
        "io"
+       "math/rand"
        "net/url"
        "os"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -60,15 +64,14 @@ func (s *v0Suite) deleteTestObjects(c *check.C) {
        ac.AuthToken = arvadostest.AdminToken
        for _, path := range s.toDelete {
                err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
-               if err != nil {
-                       panic(err)
-               }
+               c.Check(err, check.IsNil)
        }
        s.toDelete = nil
 }
 
 func (s *v0Suite) TestFilters(c *check.C) {
-       conn, r, w := s.testClient()
+       conn, r, w, err := s.testClient()
+       c.Assert(err, check.IsNil)
        defer conn.Close()
 
        cmd := func(method, eventType string, status int) {
@@ -86,7 +89,7 @@ func (s *v0Suite) TestFilters(c *check.C) {
        cmd("unsubscribe", "create", 200)
        cmd("unsubscribe", "update", 200)
 
-       go s.emitEvents(nil)
+       go s.emitEvents(c, nil, nil)
        lg := s.expectLog(c, r)
        c.Check(lg.EventType, check.Equals, "update")
 
@@ -111,7 +114,8 @@ func (s *v0Suite) TestLastLogID(c *check.C) {
        // Connecting connEarly (before sending the early events) lets
        // us confirm all of the "early" events have already passed
        // through the server.
-       connEarly, rEarly, wEarly := s.testClient()
+       connEarly, rEarly, wEarly, err := s.testClient()
+       c.Assert(err, check.IsNil)
        defer connEarly.Close()
        c.Check(wEarly.Encode(map[string]interface{}{
                "method": "subscribe",
@@ -120,7 +124,7 @@ func (s *v0Suite) TestLastLogID(c *check.C) {
 
        // Send the early events.
        uuidChan := make(chan string, 1)
-       s.emitEvents(uuidChan)
+       s.emitEvents(c, uuidChan, nil)
        uuidEarly := <-uuidChan
 
        // Wait for the early events to pass through.
@@ -128,7 +132,8 @@ func (s *v0Suite) TestLastLogID(c *check.C) {
 
        // Connect the client that wants to get old events via
        // last_log_id.
-       conn, r, w := s.testClient()
+       conn, r, w, err := s.testClient()
+       c.Assert(err, check.IsNil)
        defer conn.Close()
 
        c.Check(w.Encode(map[string]interface{}{
@@ -138,12 +143,13 @@ func (s *v0Suite) TestLastLogID(c *check.C) {
        s.expectStatus(c, r, 200)
 
        checkLogs(r, uuidEarly)
-       s.emitEvents(uuidChan)
+       s.emitEvents(c, uuidChan, nil)
        checkLogs(r, <-uuidChan)
 }
 
 func (s *v0Suite) TestPermission(c *check.C) {
-       conn, r, w := s.testClient()
+       conn, r, w, err := s.testClient()
+       c.Assert(err, check.IsNil)
        defer conn.Close()
 
        c.Check(w.Encode(map[string]interface{}{
@@ -154,9 +160,9 @@ func (s *v0Suite) TestPermission(c *check.C) {
        uuidChan := make(chan string, 2)
        go func() {
                s.token = arvadostest.AdminToken
-               s.emitEvents(uuidChan)
+               s.emitEvents(c, uuidChan, nil)
                s.token = arvadostest.ActiveToken
-               s.emitEvents(uuidChan)
+               s.emitEvents(c, uuidChan, nil)
        }()
 
        wrongUUID := <-uuidChan
@@ -182,9 +188,12 @@ func (s *v0Suite) TestEventTypeDelete(c *check.C) {
        for i := range clients {
                uuidChan := make(chan string, 1)
                s.token = clients[i].token
-               s.emitEvents(uuidChan)
+               s.emitEvents(c, uuidChan, nil)
                clients[i].uuid = <-uuidChan
-               clients[i].conn, clients[i].r, clients[i].w = s.testClient()
+
+               var err error
+               clients[i].conn, clients[i].r, clients[i].w, err = s.testClient()
+               c.Assert(err, check.IsNil)
 
                c.Check(clients[i].w.Encode(map[string]interface{}{
                        "method": "subscribe",
@@ -202,6 +211,41 @@ func (s *v0Suite) TestEventTypeDelete(c *check.C) {
        }
 }
 
+func (s *v0Suite) TestEventPropertiesFields(c *check.C) {
+       ac := arvados.NewClientFromEnv()
+       ac.AuthToken = s.token
+
+       conn, r, w, err := s.testClient()
+       c.Assert(err, check.IsNil)
+       defer conn.Close()
+
+       c.Check(w.Encode(map[string]interface{}{
+               "method":  "subscribe",
+               "filters": [][]string{{"object_uuid", "=", arvadostest.RunningContainerUUID}},
+       }), check.IsNil)
+       s.expectStatus(c, r, 200)
+
+       err = ac.RequestAndDecode(nil, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
+               "object_uuid": arvadostest.RunningContainerUUID,
+               "event_type":  "update",
+               "properties": map[string]interface{}{
+                       "new_attributes": map[string]interface{}{
+                               "name":                      "namevalue",
+                               "requesting_container_uuid": "uuidvalue",
+                               "state":                     "statevalue",
+                       },
+               },
+       }), nil)
+       c.Assert(err, check.IsNil)
+
+       lg := s.expectLog(c, r)
+       c.Check(lg.ObjectUUID, check.Equals, arvadostest.RunningContainerUUID)
+       c.Check(lg.EventType, check.Equals, "update")
+       c.Check(lg.Properties["new_attributes"].(map[string]interface{})["requesting_container_uuid"], check.Equals, "uuidvalue")
+       c.Check(lg.Properties["new_attributes"].(map[string]interface{})["name"], check.Equals, "namevalue")
+       c.Check(lg.Properties["new_attributes"].(map[string]interface{})["state"], check.Equals, "statevalue")
+}
+
 // Trashing/deleting a collection produces an "update" event with
 // properties["new_attributes"]["is_trashed"] == true.
 func (s *v0Suite) TestTrashedCollection(c *check.C) {
@@ -213,7 +257,8 @@ func (s *v0Suite) TestTrashedCollection(c *check.C) {
        c.Assert(err, check.IsNil)
        s.ignoreLogID = s.lastLogID(c)
 
-       conn, r, w := s.testClient()
+       conn, r, w, err := s.testClient()
+       c.Assert(err, check.IsNil)
        defer conn.Close()
 
        c.Check(w.Encode(map[string]interface{}{
@@ -232,7 +277,8 @@ func (s *v0Suite) TestTrashedCollection(c *check.C) {
 }
 
 func (s *v0Suite) TestSendBadJSON(c *check.C) {
-       conn, r, w := s.testClient()
+       conn, r, w, err := s.testClient()
+       c.Assert(err, check.IsNil)
        defer conn.Close()
 
        c.Check(w.Encode(map[string]interface{}{
@@ -240,7 +286,7 @@ func (s *v0Suite) TestSendBadJSON(c *check.C) {
        }), check.IsNil)
        s.expectStatus(c, r, 200)
 
-       _, err := fmt.Fprint(conn, "^]beep\n")
+       _, err = fmt.Fprint(conn, "^]beep\n")
        c.Check(err, check.IsNil)
        s.expectStatus(c, r, 400)
 
@@ -251,12 +297,13 @@ func (s *v0Suite) TestSendBadJSON(c *check.C) {
 }
 
 func (s *v0Suite) TestSubscribe(c *check.C) {
-       conn, r, w := s.testClient()
+       conn, r, w, err := s.testClient()
+       c.Assert(err, check.IsNil)
        defer conn.Close()
 
-       s.emitEvents(nil)
+       s.emitEvents(c, nil, nil)
 
-       err := w.Encode(map[string]interface{}{"21": 12})
+       err = w.Encode(map[string]interface{}{"21": 12})
        c.Check(err, check.IsNil)
        s.expectStatus(c, r, 400)
 
@@ -265,7 +312,7 @@ func (s *v0Suite) TestSubscribe(c *check.C) {
        s.expectStatus(c, r, 200)
 
        uuidChan := make(chan string, 1)
-       go s.emitEvents(uuidChan)
+       go s.emitEvents(c, uuidChan, nil)
        uuid := <-uuidChan
 
        for _, etype := range []string{"create", "blip", "update"} {
@@ -277,11 +324,104 @@ func (s *v0Suite) TestSubscribe(c *check.C) {
        }
 }
 
+func (s *v0Suite) TestManyEventsAndSubscribers(c *check.C) {
+       // Frequent slow listener pings create the conditions for a
+       // deadlock issue with the lib/pq example listener usage.
+       //
+       // Specifically: a lib/pq/example/listen-style event loop can
+       // deadlock if enough (~32) server notifications arrive after
+       // the event loop decides to call Ping (e.g., while
+       // listener.Ping() is waiting for a response from the server,
+       // or in the time.Sleep() invoked by testSlowPing).
+       //
+       // (*ListenerConn)listenerConnLoop() doesn't see the server's
+       // ping response until it finishes sending a previous
+       // notification through its internal queue to
+       // (*Listener)listenerConnLoop(), which is blocked on sending
+       // to our Notify channel, which is blocked on waiting for the
+       // Ping response.
+       defer func(d time.Duration) {
+               listenerPingInterval = d
+               testSlowPing = false
+       }(listenerPingInterval)
+       listenerPingInterval = time.Second / 2
+       testSlowPing = true
+       // Restart the test server in order to get one that uses our
+       // test globals.
+       s.TearDownTest(c)
+       s.SetUpTest(c)
+
+       done := make(chan struct{})
+       defer close(done)
+       go s.emitEvents(c, nil, done)
+
+       // We will expect to receive at least one event during each
+       // one-second interval while the test is running.
+       t0 := time.Now()
+       seconds := 10
+       receivedPerSecond := make([]int64, seconds)
+
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(seconds)*time.Second))
+       defer cancel()
+       for clientID := 0; clientID < 100; clientID++ {
+               clientID := clientID
+               go func() {
+                       for ctx.Err() == nil {
+                               conn, r, w, err := s.testClient()
+                               if ctx.Err() != nil {
+                                       return
+                               }
+                               c.Assert(err, check.IsNil)
+                               defer conn.Close()
+                               err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
+                               if ctx.Err() != nil {
+                                       return
+                               }
+                               c.Check(err, check.IsNil)
+                               s.expectStatus(c, r, 200)
+                               for {
+                                       if clientID%10 == 0 {
+                                               // slow client
+                                               time.Sleep(time.Second / 20)
+                                       } else if rand.Float64() < 0.01 {
+                                               // disconnect+reconnect
+                                               break
+                                       }
+                                       var lg arvados.Log
+                                       err := r.Decode(&lg)
+                                       if ctx.Err() != nil {
+                                               return
+                                       }
+                                       if errors.Is(err, io.EOF) {
+                                               break
+                                       }
+                                       c.Check(err, check.IsNil)
+                                       if i := int(time.Since(t0) / time.Second); i < seconds {
+                                               atomic.AddInt64(&receivedPerSecond[i], 1)
+                                       }
+                               }
+                               conn.Close()
+                       }
+               }()
+       }
+       <-ctx.Done()
+       c.Log("done")
+       for i, n := range receivedPerSecond {
+               c.Logf("t<%d n=%d", i+1, n)
+               c.Check(int64(n), check.Not(check.Equals), int64(0))
+       }
+}
+
 // Generate some events by creating and updating a workflow object,
 // and creating a custom log entry (event_type="blip") about the newly
-// created workflow. If uuidChan is not nil, send the new workflow
-// UUID to uuidChan as soon as it's known.
-func (s *v0Suite) emitEvents(uuidChan chan<- string) {
+// created workflow.
+//
+// If uuidChan is not nil, send the new workflow UUID to uuidChan as
+// soon as it's known.
+//
+// If done is not nil, keep generating events until done receives or
+// closes.
+func (s *v0Suite) emitEvents(c *check.C, uuidChan chan<- string, done <-chan struct{}) {
        s.wg.Add(1)
        defer s.wg.Done()
 
@@ -291,20 +431,33 @@ func (s *v0Suite) emitEvents(uuidChan chan<- string) {
                Name: "ws_test",
        }
        err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", `{"name":"ws_test"}`), map[string]interface{}{"ensure_unique_name": true})
-       if err != nil {
-               panic(err)
-       }
+       c.Assert(err, check.IsNil)
+       s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID)
        if uuidChan != nil {
                uuidChan <- wf.UUID
        }
-       lg := &arvados.Log{}
-       err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
-               "object_uuid": wf.UUID,
-               "event_type":  "blip",
-               "properties": map[string]interface{}{
-                       "beep": "boop",
-               },
-       }), nil)
+       for i := 0; ; i++ {
+               lg := &arvados.Log{}
+               err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
+                       "object_uuid": wf.UUID,
+                       "event_type":  "blip",
+                       "properties": map[string]interface{}{
+                               "beep": "boop",
+                       },
+               }), nil)
+               s.toDelete = append(s.toDelete, "arvados/v1/logs/"+lg.UUID)
+               if done != nil {
+                       select {
+                       case <-done:
+                       default:
+                               if i%50 == 0 {
+                                       time.Sleep(100 * time.Millisecond)
+                               }
+                               continue
+                       }
+               }
+               break
+       }
        if err != nil {
                panic(err)
        }
@@ -312,7 +465,6 @@ func (s *v0Suite) emitEvents(uuidChan chan<- string) {
        if err != nil {
                panic(err)
        }
-       s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
 }
 
 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
@@ -339,28 +491,30 @@ func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
        lg := &arvados.Log{}
        ok := make(chan struct{})
        go func() {
+               defer close(ok)
                for lg.ID <= s.ignoreLogID {
-                       c.Check(r.Decode(lg), check.IsNil)
+                       c.Assert(r.Decode(lg), check.IsNil)
                }
-               close(ok)
        }()
        select {
        case <-time.After(10 * time.Second):
-               panic("timed out")
+               c.Error("timed out")
+               c.FailNow()
+               return lg
        case <-ok:
                return lg
        }
 }
 
-func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
+func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder, error) {
        srv := s.serviceSuite.srv
        conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL)
        if err != nil {
-               panic(err)
+               return nil, nil, nil, err
        }
        w := json.NewEncoder(conn)
        r := json.NewDecoder(conn)
-       return conn, r, w
+       return conn, r, w, nil
 }
 
 func (s *v0Suite) lastLogID(c *check.C) int64 {