From b3fa9983ac0b7b38a5b3787af56a7bb1502ae3be Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 1 Mar 2017 02:36:18 -0500 Subject: [PATCH] 10764: Test v0 session. --- sdk/go/arvados/client.go | 1 + sdk/go/arvados/collection.go | 4 +- sdk/go/arvados/log.go | 12 +- sdk/go/arvados/workflow.go | 22 +++ services/ws/event_source.go | 18 ++ services/ws/event_source_test.go | 29 ++-- services/ws/event_test.go | 27 +-- services/ws/session_v0_test.go | 286 +++++++++++++++++++++++++++++++ 8 files changed, 348 insertions(+), 51 deletions(-) create mode 100644 sdk/go/arvados/workflow.go create mode 100644 services/ws/session_v0_test.go diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go index 8319b5da3b..9691e7a07e 100644 --- a/sdk/go/arvados/client.go +++ b/sdk/go/arvados/client.go @@ -181,6 +181,7 @@ func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io. if err != nil { return err } + req.Header.Set("Content-type", "application/x-www-form-urlencoded") return c.DoAndDecode(dst, req) } diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go index df7c224a21..bdd8e6e6b3 100644 --- a/sdk/go/arvados/collection.go +++ b/sdk/go/arvados/collection.go @@ -3,9 +3,10 @@ package arvados import ( "bufio" "fmt" - "git.curoverse.com/arvados.git/sdk/go/blockdigest" "strings" "time" + + "git.curoverse.com/arvados.git/sdk/go/blockdigest" ) // Collection is an arvados#collection resource. @@ -14,6 +15,7 @@ type Collection struct { TrashAt *time.Time `json:"trash_at,omitempty"` ManifestText string `json:"manifest_text,omitempty"` UnsignedManifestText string `json:"unsigned_manifest_text,omitempty"` + Name string `json:"name,omitempty"` CreatedAt *time.Time `json:"created_at,omitempty"` ModifiedAt *time.Time `json:"modified_at,omitempty"` PortableDataHash string `json:"portable_data_hash,omitempty"` diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go index a48f1c6b5c..5adc52822b 100644 --- a/sdk/go/arvados/log.go +++ b/sdk/go/arvados/log.go @@ -6,13 +6,13 @@ import ( // Log is an arvados#log record type Log struct { - ID uint64 `json:"id"` - UUID string `json:"uuid"` - ObjectUUID string `json:"object_uuid"` - ObjectOwnerUUID string `json:"object_owner_uuid"` - EventType string `json:"event_type"` + ID uint64 `json:"id,omitempty"` + UUID string `json:"uuid,omitempty"` + ObjectUUID string `json:"object_uuid,omitempty"` + ObjectOwnerUUID string `json:"object_owner_uuid,omitempty"` + EventType string `json:"event_type,omitempty"` EventAt *time.Time `json:"event,omitempty"` - Properties map[string]interface{} `json:"properties"` + Properties map[string]interface{} `json:"properties,omitempty"` CreatedAt *time.Time `json:"created_at,omitempty"` } diff --git a/sdk/go/arvados/workflow.go b/sdk/go/arvados/workflow.go new file mode 100644 index 0000000000..42a851e8e9 --- /dev/null +++ b/sdk/go/arvados/workflow.go @@ -0,0 +1,22 @@ +package arvados + +import "time" + +// Workflow is an arvados#workflow resource. +type Workflow struct { + UUID string `json:"uuid,omitempty"` + OwnerUUID string `json:"owner_uuid,omitempty"` + Name string `json:"name,omitempty"` + Description string `json:"description,omitempty"` + Definition string `json:"definition,omitempty"` + CreatedAt *time.Time `json:"created_at,omitempty"` + ModifiedAt *time.Time `json:"modified_at,omitempty"` +} + +// WorkflowList is an arvados#workflowList resource. +type WorkflowList struct { + Items []Workflow `json:"items"` + ItemsAvailable int `json:"items_available"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} diff --git a/services/ws/event_source.go b/services/ws/event_source.go index 622084c612..ed1ac0db97 100644 --- a/services/ws/event_source.go +++ b/services/ws/event_source.go @@ -43,6 +43,9 @@ type pgEventSource struct { eventsOut uint64 cancel func() + + setupOnce sync.Once + ready chan bool } var _ debugStatuser = (*pgEventSource)(nil) @@ -63,12 +66,25 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) { ps.cancel() } +func (ps *pgEventSource) setup() { + ps.ready = make(chan bool) +} + +// waitReady returns when private fields (cancel, db) are available +// for tests to use. +func (ps *pgEventSource) waitReady() { + ps.setupOnce.Do(ps.setup) + <-ps.ready +} + // Run listens for event notifications on the "logs" channel and sends // them to all subscribers. func (ps *pgEventSource) Run() { logger(nil).Debug("pgEventSource Run starting") defer logger(nil).Debug("pgEventSource Run finished") + ps.setupOnce.Do(ps.setup) + ctx, cancel := context.WithCancel(context.Background()) ps.cancel = cancel defer cancel() @@ -102,6 +118,8 @@ func (ps *pgEventSource) Run() { defer ps.pqListener.Close() logger(nil).Debug("pq Listen setup done") + close(ps.ready) + ps.queue = make(chan *event, ps.QueueSize) defer close(ps.queue) diff --git a/services/ws/event_source_test.go b/services/ws/event_source_test.go index ee1da08bf1..675ac90085 100644 --- a/services/ws/event_source_test.go +++ b/services/ws/event_source_test.go @@ -14,7 +14,7 @@ var _ = check.Suite(&eventSourceSuite{}) type eventSourceSuite struct{} -func testDBConfig() (pgConfig, error) { +func testDBConfig() pgConfig { var railsDB struct { Test struct { Database string @@ -25,7 +25,7 @@ func testDBConfig() (pgConfig, error) { } err := config.LoadFile(&railsDB, "../api/config/database.yml") if err != nil { - return nil, err + panic(err) } cfg := pgConfig{ "dbname": railsDB.Test.Database, @@ -33,22 +33,20 @@ func testDBConfig() (pgConfig, error) { "password": railsDB.Test.Password, "user": railsDB.Test.Username, } - return cfg, nil + return cfg } -func testDB() (*sql.DB, error) { - cfg, err := testDBConfig() +func testDB() *sql.DB { + db, err := sql.Open("postgres", testDBConfig().ConnectionString()) if err != nil { - return nil, err + panic(err) } - return sql.Open("postgres", cfg.ConnectionString()) + return db } func (*eventSourceSuite) TestEventSource(c *check.C) { - cfg, err := testDBConfig() - if err != nil { - c.Fatal(err) - } + cfg := testDBConfig() + db := testDB() pges := &pgEventSource{ DataSource: cfg.ConnectionString(), QueueSize: 4, @@ -59,16 +57,9 @@ func (*eventSourceSuite) TestEventSource(c *check.C) { sinks[i] = pges.NewSink() } - // wait for listener to start, as evidenced by queue channel - // appearing (relying on internal implementation detail here) - for deadline := time.Now().Add(10 * time.Second); pges.queue == nil; time.Sleep(10 * time.Millisecond) { - c.Assert(time.Now().After(deadline), check.Equals, false) - } + pges.waitReady() defer pges.cancel() - db, err := testDB() - c.Assert(err, check.IsNil) - done := make(chan bool, 1) go func() { diff --git a/services/ws/event_test.go b/services/ws/event_test.go index eb67a3443c..f09537262b 100644 --- a/services/ws/event_test.go +++ b/services/ws/event_test.go @@ -1,38 +1,15 @@ package main -import ( - "database/sql" - - "git.curoverse.com/arvados.git/sdk/go/config" - check "gopkg.in/check.v1" -) +import check "gopkg.in/check.v1" var _ = check.Suite(&eventSuite{}) type eventSuite struct{} func (*eventSuite) TestDetail(c *check.C) { - var railsDB struct { - Test struct { - Database string - Username string - Password string - Host string - } - } - err := config.LoadFile(&railsDB, "../api/config/database.yml") - c.Assert(err, check.IsNil) - cfg := pgConfig{ - "dbname": railsDB.Test.Database, - "host": railsDB.Test.Host, - "password": railsDB.Test.Password, - "user": railsDB.Test.Username, - } - db, err := sql.Open("postgres", cfg.ConnectionString()) - c.Assert(err, check.IsNil) e := &event{ LogID: 17, - db: db, + db: testDB(), } logRow := e.Detail() c.Assert(logRow, check.NotNil) diff --git a/services/ws/session_v0_test.go b/services/ws/session_v0_test.go new file mode 100644 index 0000000000..f4dc23fecf --- /dev/null +++ b/services/ws/session_v0_test.go @@ -0,0 +1,286 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "os" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadostest" + "git.curoverse.com/arvados.git/sdk/go/ctxlog" + "golang.org/x/net/websocket" + check "gopkg.in/check.v1" +) + +func init() { + if os.Getenv("ARVADOS_DEBUG") != "" { + ctxlog.SetLevel("debug") + } +} + +var _ = check.Suite(&v0Suite{}) + +type v0Suite struct { + token string + toDelete []string +} + +func (s *v0Suite) SetUpTest(c *check.C) { + s.token = arvadostest.ActiveToken +} + +func (s *v0Suite) TearDownSuite(c *check.C) { + ac := arvados.NewClientFromEnv() + ac.AuthToken = arvadostest.AdminToken + for _, path := range s.toDelete { + err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil) + if err != nil { + panic(err) + } + } +} + +func (s *v0Suite) TestFilters(c *check.C) { + srv, conn, r, w := s.testClient() + defer srv.Close() + defer conn.Close() + + c.Check(w.Encode(map[string]interface{}{ + "method": "subscribe", + "filters": [][]interface{}{{"event_type", "in", []string{"update"}}}, + }), check.IsNil) + s.expectStatus(c, r, 200) + + go s.emitEvents(nil) + lg := s.expectLog(c, r) + c.Check(lg.EventType, check.Equals, "update") +} + +func (s *v0Suite) TestLastLogID(c *check.C) { + var lastID uint64 + c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil) + + srv, conn, r, w := s.testClient() + defer srv.Close() + defer conn.Close() + + uuidChan := make(chan string, 2) + s.emitEvents(uuidChan) + + c.Check(w.Encode(map[string]interface{}{ + "method": "subscribe", + "last_log_id": lastID, + }), check.IsNil) + s.expectStatus(c, r, 200) + + go func() { + s.emitEvents(uuidChan) + close(uuidChan) + }() + + done := make(chan bool) + go func() { + for uuid := range uuidChan { + for _, etype := range []string{"create", "blip", "update"} { + lg := s.expectLog(c, r) + c.Check(lg.ObjectUUID, check.Equals, uuid) + c.Check(lg.EventType, check.Equals, etype) + } + } + close(done) + }() + + select { + case <-time.After(10 * time.Second): + c.Fatal("timeout") + case <-done: + } +} + +func (s *v0Suite) TestPermission(c *check.C) { + srv, conn, r, w := s.testClient() + defer srv.Close() + defer conn.Close() + + c.Check(w.Encode(map[string]interface{}{ + "method": "subscribe", + }), check.IsNil) + s.expectStatus(c, r, 200) + + uuidChan := make(chan string, 1) + go func() { + s.token = arvadostest.AdminToken + s.emitEvents(nil) + s.token = arvadostest.ActiveToken + s.emitEvents(uuidChan) + }() + + lg := s.expectLog(c, r) + c.Check(lg.ObjectUUID, check.Equals, <-uuidChan) +} + +func (s *v0Suite) TestSendBadJSON(c *check.C) { + srv, conn, r, w := s.testClient() + defer srv.Close() + defer conn.Close() + + c.Check(w.Encode(map[string]interface{}{ + "method": "subscribe", + }), check.IsNil) + s.expectStatus(c, r, 200) + + _, err := fmt.Fprint(conn, "^]beep\n") + c.Check(err, check.IsNil) + s.expectStatus(c, r, 400) + + c.Check(w.Encode(map[string]interface{}{ + "method": "subscribe", + }), check.IsNil) + s.expectStatus(c, r, 200) +} + +func (s *v0Suite) TestSubscribe(c *check.C) { + srv, conn, r, w := s.testClient() + defer srv.Close() + defer conn.Close() + + s.emitEvents(nil) + + err := w.Encode(map[string]interface{}{"21": 12}) + c.Check(err, check.IsNil) + s.expectStatus(c, r, 400) + + err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}}) + c.Check(err, check.IsNil) + s.expectStatus(c, r, 200) + + uuidChan := make(chan string, 1) + go s.emitEvents(uuidChan) + uuid := <-uuidChan + + for _, etype := range []string{"create", "blip", "update"} { + lg := s.expectLog(c, r) + c.Check(lg.ObjectUUID, check.Equals, uuid) + c.Check(lg.EventType, check.Equals, etype) + } +} + +// Generate some events by creating and updating a workflow object, +// and creating a custom log entry (event_type="blip") about the newly +// created workflow. If uuidChan is not nil, send the new workflow +// UUID to uuidChan as soon as it's known. +func (s *v0Suite) emitEvents(uuidChan chan<- string) { + ac := arvados.NewClientFromEnv() + ac.AuthToken = s.token + wf := &arvados.Workflow{ + Name: "ws_test", + } + err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true}) + if err != nil { + panic(err) + } + if uuidChan != nil { + uuidChan <- wf.UUID + } + lg := &arvados.Log{} + err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{ + ObjectUUID: wf.UUID, + EventType: "blip", + Properties: map[string]interface{}{ + "beep": "boop", + }, + }), nil) + if err != nil { + panic(err) + } + err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil) + if err != nil { + panic(err) + } + s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID) +} + +func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader { + j, err := json.Marshal(ob) + if err != nil { + panic(err) + } + v := url.Values{} + v[rscName] = []string{string(j)} + return bytes.NewBufferString(v.Encode()) +} + +func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) { + msg := map[string]interface{}{} + c.Check(r.Decode(&msg), check.IsNil) + c.Check(int(msg["status"].(float64)), check.Equals, status) +} + +func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log { + lg := &arvados.Log{} + c.Check(r.Decode(lg), check.IsNil) + return lg +} + +func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) { + srv := newTestServer() + conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr) + if err != nil { + panic(err) + } + w := json.NewEncoder(conn) + r := json.NewDecoder(conn) + return srv, conn, r, w +} + +type testServer struct { + http.Server + addr string + stop chan bool +} + +func (srv *testServer) Close() { + close(srv.stop) +} + +func newTestServer() *testServer { + ln, err := net.Listen("tcp", ":") + if err != nil { + panic(err) + } + cfg := defaultConfig() + cfg.Client = *(arvados.NewClientFromEnv()) + es := &pgEventSource{ + DataSource: testDBConfig().ConnectionString(), + QueueSize: 4, + } + srv := &testServer{ + Server: http.Server{ + Addr: ":", + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + Handler: &router{ + Config: &cfg, + eventSource: es, + newPermChecker: func() permChecker { return newPermChecker(cfg.Client) }, + }, + }, + addr: ln.Addr().String(), + stop: make(chan bool), + } + go es.Run() + go srv.Serve(ln) + go func() { + <-srv.stop + ln.Close() + es.cancel() + }() + es.waitReady() + return srv +} -- 2.30.2