//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package ws
import (
"bytes"
"io"
"net/url"
"os"
+ "strings"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"golang.org/x/net/websocket"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&v0Suite{})
type v0Suite struct {
- serverSuite serverSuite
- token string
- toDelete []string
- wg sync.WaitGroup
+ serviceSuite serviceSuite
+ token string
+ toDelete []string
+ wg sync.WaitGroup
+ ignoreLogID uint64
}
func (s *v0Suite) SetUpTest(c *check.C) {
- s.serverSuite.SetUpTest(c)
- go s.serverSuite.srv.Run()
- s.serverSuite.srv.WaitReady()
+ s.serviceSuite.SetUpTest(c)
+ s.serviceSuite.start(c)
s.token = arvadostest.ActiveToken
+ s.ignoreLogID = s.lastLogID(c)
}
func (s *v0Suite) TearDownTest(c *check.C) {
s.wg.Wait()
- s.serverSuite.srv.Close()
+ s.serviceSuite.TearDownTest(c)
}
func (s *v0Suite) TearDownSuite(c *check.C) {
conn, r, w := s.testClient()
defer conn.Close()
- c.Check(w.Encode(map[string]interface{}{
- "method": "subscribe",
- "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
- }), check.IsNil)
- s.expectStatus(c, r, 200)
+ cmd := func(method, eventType string, status int) {
+ c.Check(w.Encode(map[string]interface{}{
+ "method": method,
+ "filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
+ }), check.IsNil)
+ s.expectStatus(c, r, status)
+ }
+ cmd("subscribe", "update", 200)
+ cmd("subscribe", "update", 200)
+ cmd("subscribe", "create", 200)
+ cmd("subscribe", "update", 200)
+ cmd("unsubscribe", "blip", 400)
+ cmd("unsubscribe", "create", 200)
+ cmd("unsubscribe", "update", 200)
go s.emitEvents(nil)
lg := s.expectLog(c, r)
c.Check(lg.EventType, check.Equals, "update")
+
+ cmd("unsubscribe", "update", 200)
+ cmd("unsubscribe", "update", 200)
+ cmd("unsubscribe", "update", 400)
}
func (s *v0Suite) TestLastLogID(c *check.C) {
- var lastID uint64
- c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
+ lastID := s.lastLogID(c)
- conn, r, w := s.testClient()
- defer conn.Close()
+ checkLogs := func(r *json.Decoder, uuid string) {
+ for _, etype := range []string{"create", "blip", "update"} {
+ lg := s.expectLog(c, r)
+ for lg.ObjectUUID != uuid {
+ lg = s.expectLog(c, r)
+ }
+ c.Check(lg.EventType, check.Equals, etype)
+ }
+ }
- uuidChan := make(chan string, 2)
+ // Connecting connEarly (before sending the early events) lets
+ // us confirm all of the "early" events have already passed
+ // through the server.
+ connEarly, rEarly, wEarly := s.testClient()
+ defer connEarly.Close()
+ c.Check(wEarly.Encode(map[string]interface{}{
+ "method": "subscribe",
+ }), check.IsNil)
+ s.expectStatus(c, rEarly, 200)
+
+ // Send the early events.
+ uuidChan := make(chan string, 1)
s.emitEvents(uuidChan)
+ uuidEarly := <-uuidChan
+
+ // Wait for the early events to pass through.
+ checkLogs(rEarly, uuidEarly)
+
+ // Connect the client that wants to get old events via
+ // last_log_id.
+ conn, r, w := s.testClient()
+ defer conn.Close()
c.Check(w.Encode(map[string]interface{}{
"method": "subscribe",
}), check.IsNil)
s.expectStatus(c, r, 200)
- avoidRace := make(chan struct{}, cap(uuidChan))
- go func() {
- // When last_log_id is given, although v0session sends
- // old events in order, and sends new events in order,
- // it doesn't necessarily finish sending all old
- // events before sending any new events. To avoid
- // hitting this bug in the test, we wait for the old
- // events to arrive before emitting any new events.
- <-avoidRace
- s.emitEvents(uuidChan)
- close(uuidChan)
- }()
-
- go func() {
- for uuid := range uuidChan {
- for _, etype := range []string{"create", "blip", "update"} {
- lg := s.expectLog(c, r)
- for lg.ObjectUUID != uuid {
- lg = s.expectLog(c, r)
- }
- c.Check(lg.EventType, check.Equals, etype)
- }
- avoidRace <- struct{}{}
- }
- }()
+ checkLogs(r, uuidEarly)
+ s.emitEvents(uuidChan)
+ checkLogs(r, <-uuidChan)
}
func (s *v0Suite) TestPermission(c *check.C) {
s.expectStatus(c, clients[i].r, 200)
}
+ s.ignoreLogID = s.lastLogID(c)
s.deleteTestObjects(c)
for _, client := range clients {
ac := arvados.NewClientFromEnv()
ac.AuthToken = s.token
- coll := &arvados.Collection{ManifestText: ""}
- err := ac.RequestAndDecode(coll, "POST", "arvados/v1/collections", s.jsonBody("collection", coll), map[string]interface{}{"ensure_unique_name": true})
+ var coll arvados.Collection
+ err := ac.RequestAndDecode(&coll, "POST", "arvados/v1/collections", s.jsonBody("collection", `{"manifest_text":""}`), map[string]interface{}{"ensure_unique_name": true})
c.Assert(err, check.IsNil)
+ s.ignoreLogID = s.lastLogID(c)
conn, r, w := s.testClient()
defer conn.Close()
wf := &arvados.Workflow{
Name: "ws_test",
}
- err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
+ err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", `{"name":"ws_test"}`), map[string]interface{}{"ensure_unique_name": true})
if err != nil {
panic(err)
}
uuidChan <- wf.UUID
}
lg := &arvados.Log{}
- err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
- ObjectUUID: wf.UUID,
- EventType: "blip",
- Properties: map[string]interface{}{
+ err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
+ "object_uuid": wf.UUID,
+ "event_type": "blip",
+ "properties": map[string]interface{}{
"beep": "boop",
},
}), nil)
if err != nil {
panic(err)
}
- err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
+ err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", `{"name":"ws_test"}`), nil)
if err != nil {
panic(err)
}
}
func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
- j, err := json.Marshal(ob)
- if err != nil {
- panic(err)
+ val, ok := ob.(string)
+ if !ok {
+ j, err := json.Marshal(ob)
+ if err != nil {
+ panic(err)
+ }
+ val = string(j)
}
v := url.Values{}
- v[rscName] = []string{string(j)}
+ v[rscName] = []string{val}
return bytes.NewBufferString(v.Encode())
}
lg := &arvados.Log{}
ok := make(chan struct{})
go func() {
- c.Check(r.Decode(lg), check.IsNil)
+ for lg.ID <= s.ignoreLogID {
+ c.Check(r.Decode(lg), check.IsNil)
+ }
close(ok)
}()
select {
}
func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
- srv := s.serverSuite.srv
- conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
+ srv := s.serviceSuite.srv
+ conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL)
if err != nil {
panic(err)
}
r := json.NewDecoder(conn)
return conn, r, w
}
+
+func (s *v0Suite) lastLogID(c *check.C) uint64 {
+ var lastID uint64
+ c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
+ return lastID
+}