14 "git.curoverse.com/arvados.git/sdk/go/arvados"
15 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
16 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
17 "golang.org/x/net/websocket"
18 check "gopkg.in/check.v1"
22 if os.Getenv("ARVADOS_DEBUG") != "" {
23 ctxlog.SetLevel("debug")
27 var _ = check.Suite(&v0Suite{})
34 func (s *v0Suite) SetUpTest(c *check.C) {
35 s.token = arvadostest.ActiveToken
38 func (s *v0Suite) TearDownSuite(c *check.C) {
39 ac := arvados.NewClientFromEnv()
40 ac.AuthToken = arvadostest.AdminToken
41 for _, path := range s.toDelete {
42 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
49 func (s *v0Suite) TestFilters(c *check.C) {
50 srv, conn, r, w := s.testClient()
54 c.Check(w.Encode(map[string]interface{}{
55 "method": "subscribe",
56 "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
58 s.expectStatus(c, r, 200)
61 lg := s.expectLog(c, r)
62 c.Check(lg.EventType, check.Equals, "update")
65 func (s *v0Suite) TestLastLogID(c *check.C) {
67 c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
69 srv, conn, r, w := s.testClient()
73 uuidChan := make(chan string, 2)
74 s.emitEvents(uuidChan)
76 c.Check(w.Encode(map[string]interface{}{
77 "method": "subscribe",
78 "last_log_id": lastID,
80 s.expectStatus(c, r, 200)
83 s.emitEvents(uuidChan)
87 done := make(chan bool)
89 for uuid := range uuidChan {
90 for _, etype := range []string{"create", "blip", "update"} {
91 lg := s.expectLog(c, r)
92 c.Check(lg.ObjectUUID, check.Equals, uuid)
93 c.Check(lg.EventType, check.Equals, etype)
100 case <-time.After(10 * time.Second):
106 func (s *v0Suite) TestPermission(c *check.C) {
107 srv, conn, r, w := s.testClient()
111 c.Check(w.Encode(map[string]interface{}{
112 "method": "subscribe",
114 s.expectStatus(c, r, 200)
116 uuidChan := make(chan string, 1)
118 s.token = arvadostest.AdminToken
120 s.token = arvadostest.ActiveToken
121 s.emitEvents(uuidChan)
124 lg := s.expectLog(c, r)
125 c.Check(lg.ObjectUUID, check.Equals, <-uuidChan)
128 func (s *v0Suite) TestSendBadJSON(c *check.C) {
129 srv, conn, r, w := s.testClient()
133 c.Check(w.Encode(map[string]interface{}{
134 "method": "subscribe",
136 s.expectStatus(c, r, 200)
138 _, err := fmt.Fprint(conn, "^]beep\n")
139 c.Check(err, check.IsNil)
140 s.expectStatus(c, r, 400)
142 c.Check(w.Encode(map[string]interface{}{
143 "method": "subscribe",
145 s.expectStatus(c, r, 200)
148 func (s *v0Suite) TestSubscribe(c *check.C) {
149 srv, conn, r, w := s.testClient()
155 err := w.Encode(map[string]interface{}{"21": 12})
156 c.Check(err, check.IsNil)
157 s.expectStatus(c, r, 400)
159 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
160 c.Check(err, check.IsNil)
161 s.expectStatus(c, r, 200)
163 uuidChan := make(chan string, 1)
164 go s.emitEvents(uuidChan)
167 for _, etype := range []string{"create", "blip", "update"} {
168 lg := s.expectLog(c, r)
169 c.Check(lg.ObjectUUID, check.Equals, uuid)
170 c.Check(lg.EventType, check.Equals, etype)
174 // Generate some events by creating and updating a workflow object,
175 // and creating a custom log entry (event_type="blip") about the newly
176 // created workflow. If uuidChan is not nil, send the new workflow
177 // UUID to uuidChan as soon as it's known.
178 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
179 ac := arvados.NewClientFromEnv()
180 ac.AuthToken = s.token
181 wf := &arvados.Workflow{
184 err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
192 err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
195 Properties: map[string]interface{}{
202 err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
206 s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
209 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
210 j, err := json.Marshal(ob)
215 v[rscName] = []string{string(j)}
216 return bytes.NewBufferString(v.Encode())
219 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
220 msg := map[string]interface{}{}
221 c.Check(r.Decode(&msg), check.IsNil)
222 c.Check(int(msg["status"].(float64)), check.Equals, status)
225 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
227 c.Check(r.Decode(lg), check.IsNil)
231 func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) {
232 srv := newTestServer()
233 conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr)
237 w := json.NewEncoder(conn)
238 r := json.NewDecoder(conn)
239 return srv, conn, r, w
242 type testServer struct {
249 func (srv *testServer) Close() {
254 func newTestServer() *testServer {
255 ln, err := net.Listen("tcp", ":")
259 cfg := defaultConfig()
260 cfg.Client = *(arvados.NewClientFromEnv())
261 pges := &pgEventSource{
262 DataSource: testDBConfig().ConnectionString(),
268 ReadTimeout: 10 * time.Second,
269 WriteTimeout: 10 * time.Second,
273 newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
276 addr: ln.Addr().String(),