//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
"bytes"
+ "context"
"encoding/json"
+ "errors"
"fmt"
"io"
+ "math/rand"
"net/url"
"os"
+ "strings"
"sync"
+ "sync/atomic"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"golang.org/x/net/websocket"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&v0Suite{})
type v0Suite struct {
- serverSuite serverSuite
- token string
- toDelete []string
- wg sync.WaitGroup
- ignoreLogID uint64
+ serviceSuite serviceSuite
+ token string
+ toDelete []string
+ wg sync.WaitGroup
+ ignoreLogID int64
}
func (s *v0Suite) SetUpTest(c *check.C) {
- s.serverSuite.SetUpTest(c)
- go s.serverSuite.srv.Run()
- s.serverSuite.srv.WaitReady()
+ s.serviceSuite.SetUpTest(c)
+ s.serviceSuite.start(c)
s.token = arvadostest.ActiveToken
s.ignoreLogID = s.lastLogID(c)
func (s *v0Suite) TearDownTest(c *check.C) {
s.wg.Wait()
- s.serverSuite.srv.Close()
+ s.serviceSuite.TearDownTest(c)
}
func (s *v0Suite) TearDownSuite(c *check.C) {
ac.AuthToken = arvadostest.AdminToken
for _, path := range s.toDelete {
err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
- if err != nil {
- panic(err)
- }
+ c.Check(err, check.IsNil)
}
s.toDelete = nil
}
func (s *v0Suite) TestFilters(c *check.C) {
- conn, r, w := s.testClient()
+ conn, r, w, err := s.testClient()
+ c.Assert(err, check.IsNil)
defer conn.Close()
cmd := func(method, eventType string, status int) {
cmd("unsubscribe", "create", 200)
cmd("unsubscribe", "update", 200)
- go s.emitEvents(nil)
+ go s.emitEvents(c, nil, nil)
lg := s.expectLog(c, r)
c.Check(lg.EventType, check.Equals, "update")
// Connecting connEarly (before sending the early events) lets
// us confirm all of the "early" events have already passed
// through the server.
- connEarly, rEarly, wEarly := s.testClient()
+ connEarly, rEarly, wEarly, err := s.testClient()
+ c.Assert(err, check.IsNil)
defer connEarly.Close()
c.Check(wEarly.Encode(map[string]interface{}{
"method": "subscribe",
// Send the early events.
uuidChan := make(chan string, 1)
- s.emitEvents(uuidChan)
+ s.emitEvents(c, uuidChan, nil)
uuidEarly := <-uuidChan
// Wait for the early events to pass through.
// Connect the client that wants to get old events via
// last_log_id.
- conn, r, w := s.testClient()
+ conn, r, w, err := s.testClient()
+ c.Assert(err, check.IsNil)
defer conn.Close()
c.Check(w.Encode(map[string]interface{}{
s.expectStatus(c, r, 200)
checkLogs(r, uuidEarly)
- s.emitEvents(uuidChan)
+ s.emitEvents(c, uuidChan, nil)
checkLogs(r, <-uuidChan)
}
func (s *v0Suite) TestPermission(c *check.C) {
- conn, r, w := s.testClient()
+ conn, r, w, err := s.testClient()
+ c.Assert(err, check.IsNil)
defer conn.Close()
c.Check(w.Encode(map[string]interface{}{
uuidChan := make(chan string, 2)
go func() {
s.token = arvadostest.AdminToken
- s.emitEvents(uuidChan)
+ s.emitEvents(c, uuidChan, nil)
s.token = arvadostest.ActiveToken
- s.emitEvents(uuidChan)
+ s.emitEvents(c, uuidChan, nil)
}()
wrongUUID := <-uuidChan
for i := range clients {
uuidChan := make(chan string, 1)
s.token = clients[i].token
- s.emitEvents(uuidChan)
+ s.emitEvents(c, uuidChan, nil)
clients[i].uuid = <-uuidChan
- clients[i].conn, clients[i].r, clients[i].w = s.testClient()
+
+ var err error
+ clients[i].conn, clients[i].r, clients[i].w, err = s.testClient()
+ c.Assert(err, check.IsNil)
c.Check(clients[i].w.Encode(map[string]interface{}{
"method": "subscribe",
}
}
+func (s *v0Suite) TestEventPropertiesFields(c *check.C) {
+ ac := arvados.NewClientFromEnv()
+ ac.AuthToken = s.token
+
+ conn, r, w, err := s.testClient()
+ c.Assert(err, check.IsNil)
+ defer conn.Close()
+
+ c.Check(w.Encode(map[string]interface{}{
+ "method": "subscribe",
+ "filters": [][]string{{"object_uuid", "=", arvadostest.RunningContainerUUID}},
+ }), check.IsNil)
+ s.expectStatus(c, r, 200)
+
+ err = ac.RequestAndDecode(nil, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
+ "object_uuid": arvadostest.RunningContainerUUID,
+ "event_type": "update",
+ "properties": map[string]interface{}{
+ "new_attributes": map[string]interface{}{
+ "name": "namevalue",
+ "requesting_container_uuid": "uuidvalue",
+ "state": "statevalue",
+ },
+ },
+ }), nil)
+ c.Assert(err, check.IsNil)
+
+ lg := s.expectLog(c, r)
+ c.Check(lg.ObjectUUID, check.Equals, arvadostest.RunningContainerUUID)
+ c.Check(lg.EventType, check.Equals, "update")
+ c.Check(lg.Properties["new_attributes"].(map[string]interface{})["requesting_container_uuid"], check.Equals, "uuidvalue")
+ c.Check(lg.Properties["new_attributes"].(map[string]interface{})["name"], check.Equals, "namevalue")
+ c.Check(lg.Properties["new_attributes"].(map[string]interface{})["state"], check.Equals, "statevalue")
+}
+
// Trashing/deleting a collection produces an "update" event with
// properties["new_attributes"]["is_trashed"] == true.
func (s *v0Suite) TestTrashedCollection(c *check.C) {
ac := arvados.NewClientFromEnv()
ac.AuthToken = s.token
- coll := &arvados.Collection{ManifestText: ""}
- err := ac.RequestAndDecode(coll, "POST", "arvados/v1/collections", s.jsonBody("collection", coll), map[string]interface{}{"ensure_unique_name": true})
+ var coll arvados.Collection
+ err := ac.RequestAndDecode(&coll, "POST", "arvados/v1/collections", s.jsonBody("collection", `{"manifest_text":""}`), map[string]interface{}{"ensure_unique_name": true})
c.Assert(err, check.IsNil)
s.ignoreLogID = s.lastLogID(c)
- conn, r, w := s.testClient()
+ conn, r, w, err := s.testClient()
+ c.Assert(err, check.IsNil)
defer conn.Close()
c.Check(w.Encode(map[string]interface{}{
}
func (s *v0Suite) TestSendBadJSON(c *check.C) {
- conn, r, w := s.testClient()
+ conn, r, w, err := s.testClient()
+ c.Assert(err, check.IsNil)
defer conn.Close()
c.Check(w.Encode(map[string]interface{}{
}), check.IsNil)
s.expectStatus(c, r, 200)
- _, err := fmt.Fprint(conn, "^]beep\n")
+ _, err = fmt.Fprint(conn, "^]beep\n")
c.Check(err, check.IsNil)
s.expectStatus(c, r, 400)
}
func (s *v0Suite) TestSubscribe(c *check.C) {
- conn, r, w := s.testClient()
+ conn, r, w, err := s.testClient()
+ c.Assert(err, check.IsNil)
defer conn.Close()
- s.emitEvents(nil)
+ s.emitEvents(c, nil, nil)
- err := w.Encode(map[string]interface{}{"21": 12})
+ err = w.Encode(map[string]interface{}{"21": 12})
c.Check(err, check.IsNil)
s.expectStatus(c, r, 400)
s.expectStatus(c, r, 200)
uuidChan := make(chan string, 1)
- go s.emitEvents(uuidChan)
+ go s.emitEvents(c, uuidChan, nil)
uuid := <-uuidChan
for _, etype := range []string{"create", "blip", "update"} {
}
}
+func (s *v0Suite) TestManyEventsAndSubscribers(c *check.C) {
+ // Frequent slow listener pings create the conditions for a
+ // deadlock issue with the lib/pq example listener usage.
+ //
+ // Specifically: a lib/pq/example/listen-style event loop can
+ // deadlock if enough (~32) server notifications arrive after
+ // the event loop decides to call Ping (e.g., while
+ // listener.Ping() is waiting for a response from the server,
+ // or in the time.Sleep() invoked by testSlowPing).
+ //
+ // (*ListenerConn)listenerConnLoop() doesn't see the server's
+ // ping response until it finishes sending a previous
+ // notification through its internal queue to
+ // (*Listener)listenerConnLoop(), which is blocked on sending
+ // to our Notify channel, which is blocked on waiting for the
+ // Ping response.
+ defer func(d time.Duration) {
+ listenerPingInterval = d
+ testSlowPing = false
+ }(listenerPingInterval)
+ listenerPingInterval = time.Second / 2
+ testSlowPing = true
+ // Restart the test server in order to get one that uses our
+ // test globals.
+ s.TearDownTest(c)
+ s.SetUpTest(c)
+
+ done := make(chan struct{})
+ defer close(done)
+ go s.emitEvents(c, nil, done)
+
+ // We will expect to receive at least one event during each
+ // one-second interval while the test is running.
+ t0 := time.Now()
+ seconds := 10
+ receivedPerSecond := make([]int64, seconds)
+
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(seconds)*time.Second))
+ defer cancel()
+ for clientID := 0; clientID < 100; clientID++ {
+ clientID := clientID
+ go func() {
+ for ctx.Err() == nil {
+ conn, r, w, err := s.testClient()
+ if ctx.Err() != nil {
+ return
+ }
+ c.Assert(err, check.IsNil)
+ defer conn.Close()
+ err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
+ if ctx.Err() != nil {
+ return
+ }
+ c.Check(err, check.IsNil)
+ s.expectStatus(c, r, 200)
+ for {
+ if clientID%10 == 0 {
+ // slow client
+ time.Sleep(time.Second / 20)
+ } else if rand.Float64() < 0.01 {
+ // disconnect+reconnect
+ break
+ }
+ var lg arvados.Log
+ err := r.Decode(&lg)
+ if ctx.Err() != nil {
+ return
+ }
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ c.Check(err, check.IsNil)
+ if i := int(time.Since(t0) / time.Second); i < seconds {
+ atomic.AddInt64(&receivedPerSecond[i], 1)
+ }
+ }
+ conn.Close()
+ }
+ }()
+ }
+ <-ctx.Done()
+ c.Log("done")
+ for i, n := range receivedPerSecond {
+ c.Logf("t<%d n=%d", i+1, n)
+ c.Check(int64(n), check.Not(check.Equals), int64(0))
+ }
+}
+
// Generate some events by creating and updating a workflow object,
// and creating a custom log entry (event_type="blip") about the newly
-// created workflow. If uuidChan is not nil, send the new workflow
-// UUID to uuidChan as soon as it's known.
-func (s *v0Suite) emitEvents(uuidChan chan<- string) {
+// created workflow.
+//
+// If uuidChan is not nil, send the new workflow UUID to uuidChan as
+// soon as it's known.
+//
+// If done is not nil, keep generating events until done receives or
+// closes.
+func (s *v0Suite) emitEvents(c *check.C, uuidChan chan<- string, done <-chan struct{}) {
s.wg.Add(1)
defer s.wg.Done()
wf := &arvados.Workflow{
Name: "ws_test",
}
- err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
- if err != nil {
- panic(err)
- }
+ err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", `{"name":"ws_test"}`), map[string]interface{}{"ensure_unique_name": true})
+ c.Assert(err, check.IsNil)
+ s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID)
if uuidChan != nil {
uuidChan <- wf.UUID
}
- lg := &arvados.Log{}
- err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
- ObjectUUID: wf.UUID,
- EventType: "blip",
- Properties: map[string]interface{}{
- "beep": "boop",
- },
- }), nil)
+ for i := 0; ; i++ {
+ lg := &arvados.Log{}
+ err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
+ "object_uuid": wf.UUID,
+ "event_type": "blip",
+ "properties": map[string]interface{}{
+ "beep": "boop",
+ },
+ }), nil)
+ s.toDelete = append(s.toDelete, "arvados/v1/logs/"+lg.UUID)
+ if done != nil {
+ select {
+ case <-done:
+ default:
+ if i%50 == 0 {
+ time.Sleep(100 * time.Millisecond)
+ }
+ continue
+ }
+ }
+ break
+ }
if err != nil {
panic(err)
}
- err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
+ err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", `{"name":"ws_test"}`), nil)
if err != nil {
panic(err)
}
- s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
}
func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
- j, err := json.Marshal(ob)
- if err != nil {
- panic(err)
+ val, ok := ob.(string)
+ if !ok {
+ j, err := json.Marshal(ob)
+ if err != nil {
+ panic(err)
+ }
+ val = string(j)
}
v := url.Values{}
- v[rscName] = []string{string(j)}
+ v[rscName] = []string{val}
return bytes.NewBufferString(v.Encode())
}
lg := &arvados.Log{}
ok := make(chan struct{})
go func() {
+ defer close(ok)
for lg.ID <= s.ignoreLogID {
- c.Check(r.Decode(lg), check.IsNil)
+ c.Assert(r.Decode(lg), check.IsNil)
}
- close(ok)
}()
select {
case <-time.After(10 * time.Second):
- panic("timed out")
+ c.Error("timed out")
+ c.FailNow()
+ return lg
case <-ok:
return lg
}
}
-func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
- srv := s.serverSuite.srv
- conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
+func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder, error) {
+ srv := s.serviceSuite.srv
+ conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL)
if err != nil {
- panic(err)
+ return nil, nil, nil, err
}
w := json.NewEncoder(conn)
r := json.NewDecoder(conn)
- return conn, r, w
+ return conn, r, w, nil
}
-func (s *v0Suite) lastLogID(c *check.C) uint64 {
- var lastID uint64
+func (s *v0Suite) lastLogID(c *check.C) int64 {
+ var lastID int64
c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
return lastID
}