+func (s *v0Suite) TestManyEventsAndSubscribers(c *check.C) {
+ // Frequent slow listener pings create the conditions for a
+ // deadlock issue with the lib/pq example listener usage.
+ //
+ // Specifically: a lib/pq/example/listen-style event loop can
+ // deadlock if enough (~32) server notifications arrive after
+ // the event loop decides to call Ping (e.g., while
+ // listener.Ping() is waiting for a response from the server,
+ // or in the time.Sleep() invoked by testSlowPing).
+ //
+ // (*ListenerConn)listenerConnLoop() doesn't see the server's
+ // ping response until it finishes sending a previous
+ // notification through its internal queue to
+ // (*Listener)listenerConnLoop(), which is blocked on sending
+ // to our Notify channel, which is blocked on waiting for the
+ // Ping response.
+ defer func(d time.Duration) {
+ listenerPingInterval = d
+ testSlowPing = false
+ }(listenerPingInterval)
+ listenerPingInterval = time.Second / 2
+ testSlowPing = true
+ // Restart the test server in order to get one that uses our
+ // test globals.
+ s.TearDownTest(c)
+ s.SetUpTest(c)
+
+ done := make(chan struct{})
+ defer close(done)
+ go s.emitEvents(c, nil, done)
+
+ // We will expect to receive at least one event during each
+ // one-second interval while the test is running.
+ t0 := time.Now()
+ seconds := 10
+ receivedPerSecond := make([]int64, seconds)
+
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(seconds)*time.Second))
+ defer cancel()
+ for clientID := 0; clientID < 100; clientID++ {
+ clientID := clientID
+ go func() {
+ for ctx.Err() == nil {
+ conn, r, w, err := s.testClient()
+ if ctx.Err() != nil {
+ return
+ }
+ c.Assert(err, check.IsNil)
+ defer conn.Close()
+ err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
+ if ctx.Err() != nil {
+ return
+ }
+ c.Check(err, check.IsNil)
+ s.expectStatus(c, r, 200)
+ for {
+ if clientID%10 == 0 {
+ // slow client
+ time.Sleep(time.Second / 20)
+ } else if rand.Float64() < 0.01 {
+ // disconnect+reconnect
+ break
+ }
+ var lg arvados.Log
+ err := r.Decode(&lg)
+ if ctx.Err() != nil {
+ return
+ }
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ c.Check(err, check.IsNil)
+ if i := int(time.Since(t0) / time.Second); i < seconds {
+ atomic.AddInt64(&receivedPerSecond[i], 1)
+ }
+ }
+ conn.Close()
+ }
+ }()
+ }
+ <-ctx.Done()
+ c.Log("done")
+ for i, n := range receivedPerSecond {
+ c.Logf("t<%d n=%d", i+1, n)
+ c.Check(int64(n), check.Not(check.Equals), int64(0))
+ }
+}
+