+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"encoding/json"
"fmt"
"io"
- "net"
- "net/http"
"net/url"
"os"
"time"
var _ = check.Suite(&v0Suite{})
type v0Suite struct {
- token string
- toDelete []string
+ serverSuite serverSuite
+ token string
+ toDelete []string
}
func (s *v0Suite) SetUpTest(c *check.C) {
+ s.serverSuite.SetUpTest(c)
s.token = arvadostest.ActiveToken
}
}), check.IsNil)
s.expectStatus(c, r, 200)
+ avoidRace := make(chan struct{}, cap(uuidChan))
go func() {
+ // When last_log_id is given, although v0session sends
+ // old events in order, and sends new events in order,
+ // it doesn't necessarily finish sending all old
+ // events before sending any new events. To avoid
+ // hitting this bug in the test, we wait for the old
+ // events to arrive before emitting any new events.
+ <-avoidRace
s.emitEvents(uuidChan)
close(uuidChan)
}()
- done := make(chan bool)
go func() {
for uuid := range uuidChan {
for _, etype := range []string{"create", "blip", "update"} {
lg := s.expectLog(c, r)
- c.Check(lg.ObjectUUID, check.Equals, uuid)
+ for lg.ObjectUUID != uuid {
+ lg = s.expectLog(c, r)
+ }
c.Check(lg.EventType, check.Equals, etype)
}
+ avoidRace <- struct{}{}
}
- close(done)
}()
-
- select {
- case <-time.After(10 * time.Second):
- c.Fatal("timeout")
- case <-done:
- }
}
func (s *v0Suite) TestPermission(c *check.C) {
}), check.IsNil)
s.expectStatus(c, r, 200)
- uuidChan := make(chan string, 1)
+ uuidChan := make(chan string, 2)
go func() {
s.token = arvadostest.AdminToken
- s.emitEvents(nil)
+ s.emitEvents(uuidChan)
s.token = arvadostest.ActiveToken
s.emitEvents(uuidChan)
}()
+ wrongUUID := <-uuidChan
+ rightUUID := <-uuidChan
lg := s.expectLog(c, r)
- c.Check(lg.ObjectUUID, check.Equals, <-uuidChan)
+ for lg.ObjectUUID != rightUUID {
+ c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
+ lg = s.expectLog(c, r)
+ }
}
func (s *v0Suite) TestSendBadJSON(c *check.C) {
for _, etype := range []string{"create", "blip", "update"} {
lg := s.expectLog(c, r)
- c.Check(lg.ObjectUUID, check.Equals, uuid)
+ for lg.ObjectUUID != uuid {
+ lg = s.expectLog(c, r)
+ }
c.Check(lg.EventType, check.Equals, etype)
}
}
func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
lg := &arvados.Log{}
- c.Check(r.Decode(lg), check.IsNil)
- return lg
+ ok := make(chan struct{})
+ go func() {
+ c.Check(r.Decode(lg), check.IsNil)
+ close(ok)
+ }()
+ select {
+ case <-time.After(10 * time.Second):
+ panic("timed out")
+ case <-ok:
+ return lg
+ }
}
-func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) {
- srv := newTestServer()
- conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr)
+func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
+ go s.serverSuite.srv.Run()
+ s.serverSuite.srv.WaitReady()
+ srv := s.serverSuite.srv
+ conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
if err != nil {
panic(err)
}
r := json.NewDecoder(conn)
return srv, conn, r, w
}
-
-type testServer struct {
- http.Server
- addr string
- ln net.Listener
- pges *pgEventSource
-}
-
-func (srv *testServer) Close() {
- srv.ln.Close()
- srv.pges.cancel()
-}
-
-func newTestServer() *testServer {
- ln, err := net.Listen("tcp", ":")
- if err != nil {
- panic(err)
- }
- cfg := defaultConfig()
- cfg.Client = *(arvados.NewClientFromEnv())
- pges := &pgEventSource{
- DataSource: testDBConfig().ConnectionString(),
- QueueSize: 4,
- }
- srv := &testServer{
- Server: http.Server{
- Addr: ":",
- ReadTimeout: 10 * time.Second,
- WriteTimeout: 10 * time.Second,
- Handler: &router{
- Config: &cfg,
- eventSource: pges,
- newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
- },
- },
- addr: ln.Addr().String(),
- ln: ln,
- pges: pges,
- }
- go pges.Run()
- go srv.Serve(ln)
- pges.waitReady()
- return srv
-}