From d6b6b39bfe67926490506125c88f3567e45e7dcc Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 15 Nov 2016 23:26:57 -0500 Subject: [PATCH] 8460: Send {"status":200} messages. Bring up ws server for Python SDK tests. --- build/run-tests.sh | 4 +++- services/ws/handler.go | 14 +++++++++----- services/ws/session.go | 23 +++++++++++++++++++++-- services/ws/session_v0.go | 6 ++++-- 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/build/run-tests.sh b/build/run-tests.sh index c771bdc0ad..399de7e841 100755 --- a/build/run-tests.sh +++ b/build/run-tests.sh @@ -265,6 +265,8 @@ start_api() { && eval $(python sdk/python/tests/run_test_server.py start --auth admin) \ && export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \ && export ARVADOS_TEST_API_INSTALLED="$$" \ + && python sdk/python/tests/run_test_server.py start_ws \ + && python sdk/python/tests/run_test_server.py start_nginx \ && (env | egrep ^ARVADOS) } @@ -273,8 +275,8 @@ start_nginx_proxy_services() { cd "$WORKSPACE" \ && python sdk/python/tests/run_test_server.py start_keep_proxy \ && python sdk/python/tests/run_test_server.py start_keep-web \ - && python sdk/python/tests/run_test_server.py start_ws \ && python sdk/python/tests/run_test_server.py start_arv-git-httpd \ + && python sdk/python/tests/run_test_server.py start_ws \ && python sdk/python/tests/run_test_server.py start_nginx \ && export ARVADOS_TEST_PROXY_SERVICES=1 } diff --git a/services/ws/handler.go b/services/ws/handler.go index 28b121fcf6..721c36f994 100644 --- a/services/ws/handler.go +++ b/services/ws/handler.go @@ -38,7 +38,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { return } - queue := make(chan *event, h.QueueSize) + queue := make(chan interface{}, h.QueueSize) stopped := make(chan struct{}) stop := make(chan error, 5) @@ -71,15 +71,18 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { stop <- err return } - sess.Receive(msg, buf[:n]) + e := sess.Receive(msg, buf[:n]) + if e != nil { + queue <- e + } } }() go func() { for e := range queue { - if e == nil { + if buf, ok := e.([]byte); ok { ws.SetWriteDeadline(time.Now().Add(h.PingTimeout)) - _, err := ws.Write([]byte("{}")) + _, err := ws.Write(buf) if err != nil { sess.debugLogf("handler: write {}: %s", err) stop <- err @@ -87,6 +90,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { } continue } + e := e.(*event) buf, err := sess.EventMessage(e) if err != nil { @@ -148,7 +152,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { // socket, and prevent an idle socket // from being closed. if len(queue) == 0 { - queue <- nil + queue <- []byte(`{}`) } continue case e, ok = <-events: diff --git a/services/ws/session.go b/services/ws/session.go index 98164e3549..9111c6cc28 100644 --- a/services/ws/session.go +++ b/services/ws/session.go @@ -1,8 +1,27 @@ package main type session interface { - Receive(map[string]interface{}, []byte) - EventMessage(*event) ([]byte, error) + // Receive processes a message received from the client. If + // the returned response is non-nil, it will be queued and + // sent the client. + Receive(map[string]interface{}, []byte) []byte + + // Filter returns true if the event should be queued for + // sending to the client. It should return as fast as + // possible, and must not block. Filter(*event) bool + + // EventMessage encodes the given event (from the front of the + // queue) into a form suitable to send to the client. If a + // non-nil error is returned, the connection is terminated. If + // the returned buffer is empty, nothing is sent to the client + // and the event is not counted in statistics. + // + // Unlike Filter, EventMessage can block without affecting + // other connections. If EventMessage is slow, additional + // incoming events will be queued. If the event queue fills + // up, the connection will be dropped. + EventMessage(*event) ([]byte, error) + debugLogf(string, ...interface{}) } diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index 3894e30314..3a9cc3131a 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -101,12 +101,12 @@ func (sess *sessionV0) checkFilters(filters []v0filter) { } } -func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) { +func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) []byte { sess.debugLogf("received message: %+v", msg) var sub v0subscribe if err := json.Unmarshal(buf, &sub); err != nil { sess.debugLogf("ignored unrecognized request: %s", err) - return + return nil } if sub.Method == "subscribe" { sess.debugLogf("subscribing to *") @@ -114,7 +114,9 @@ func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) { sess.checkFilters(sub.Filters) sess.subscribed["*"] = true sess.mtx.Unlock() + return []byte(`{"status":200}`) } + return []byte(`{"status":400}`) } func (sess *sessionV0) EventMessage(e *event) ([]byte, error) { -- 2.30.2