12 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
14 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
15 "golang.org/x/net/websocket"
16 check "gopkg.in/check.v1"
20 if os.Getenv("ARVADOS_DEBUG") != "" {
21 ctxlog.SetLevel("debug")
25 var _ = check.Suite(&v0Suite{})
32 func (s *v0Suite) SetUpTest(c *check.C) {
33 s.token = arvadostest.ActiveToken
36 func (s *v0Suite) TearDownSuite(c *check.C) {
37 ac := arvados.NewClientFromEnv()
38 ac.AuthToken = arvadostest.AdminToken
39 for _, path := range s.toDelete {
40 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
47 func (s *v0Suite) TestFilters(c *check.C) {
48 srv, conn, r, w := s.testClient()
52 c.Check(w.Encode(map[string]interface{}{
53 "method": "subscribe",
54 "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
56 s.expectStatus(c, r, 200)
59 lg := s.expectLog(c, r)
60 c.Check(lg.EventType, check.Equals, "update")
63 func (s *v0Suite) TestLastLogID(c *check.C) {
65 c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
67 srv, conn, r, w := s.testClient()
71 uuidChan := make(chan string, 2)
72 s.emitEvents(uuidChan)
74 c.Check(w.Encode(map[string]interface{}{
75 "method": "subscribe",
76 "last_log_id": lastID,
78 s.expectStatus(c, r, 200)
81 s.emitEvents(uuidChan)
85 done := make(chan bool)
87 for uuid := range uuidChan {
88 for _, etype := range []string{"create", "blip", "update"} {
89 lg := s.expectLog(c, r)
90 c.Check(lg.ObjectUUID, check.Equals, uuid)
91 c.Check(lg.EventType, check.Equals, etype)
98 case <-time.After(10 * time.Second):
104 func (s *v0Suite) TestPermission(c *check.C) {
105 srv, conn, r, w := s.testClient()
109 c.Check(w.Encode(map[string]interface{}{
110 "method": "subscribe",
112 s.expectStatus(c, r, 200)
114 uuidChan := make(chan string, 1)
116 s.token = arvadostest.AdminToken
118 s.token = arvadostest.ActiveToken
119 s.emitEvents(uuidChan)
122 lg := s.expectLog(c, r)
123 c.Check(lg.ObjectUUID, check.Equals, <-uuidChan)
126 func (s *v0Suite) TestSendBadJSON(c *check.C) {
127 srv, conn, r, w := s.testClient()
131 c.Check(w.Encode(map[string]interface{}{
132 "method": "subscribe",
134 s.expectStatus(c, r, 200)
136 _, err := fmt.Fprint(conn, "^]beep\n")
137 c.Check(err, check.IsNil)
138 s.expectStatus(c, r, 400)
140 c.Check(w.Encode(map[string]interface{}{
141 "method": "subscribe",
143 s.expectStatus(c, r, 200)
146 func (s *v0Suite) TestSubscribe(c *check.C) {
147 srv, conn, r, w := s.testClient()
153 err := w.Encode(map[string]interface{}{"21": 12})
154 c.Check(err, check.IsNil)
155 s.expectStatus(c, r, 400)
157 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
158 c.Check(err, check.IsNil)
159 s.expectStatus(c, r, 200)
161 uuidChan := make(chan string, 1)
162 go s.emitEvents(uuidChan)
165 for _, etype := range []string{"create", "blip", "update"} {
166 lg := s.expectLog(c, r)
167 c.Check(lg.ObjectUUID, check.Equals, uuid)
168 c.Check(lg.EventType, check.Equals, etype)
172 // Generate some events by creating and updating a workflow object,
173 // and creating a custom log entry (event_type="blip") about the newly
174 // created workflow. If uuidChan is not nil, send the new workflow
175 // UUID to uuidChan as soon as it's known.
176 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
177 ac := arvados.NewClientFromEnv()
178 ac.AuthToken = s.token
179 wf := &arvados.Workflow{
182 err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
190 err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
193 Properties: map[string]interface{}{
200 err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
204 s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
207 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
208 j, err := json.Marshal(ob)
213 v[rscName] = []string{string(j)}
214 return bytes.NewBufferString(v.Encode())
217 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
218 msg := map[string]interface{}{}
219 c.Check(r.Decode(&msg), check.IsNil)
220 c.Check(int(msg["status"].(float64)), check.Equals, status)
223 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
225 c.Check(r.Decode(lg), check.IsNil)
229 func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
230 srv := newTestServer()
231 conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
235 w := json.NewEncoder(conn)
236 r := json.NewDecoder(conn)
237 return srv, conn, r, w