X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/dd2320c6877939365caf79767b5244d86a288437..b3b8d695afb1a7624ece94f4c5dcdd671030374e:/services/ws/handler.go diff --git a/services/ws/handler.go b/services/ws/handler.go index ca9231c986..d527c39ba1 100644 --- a/services/ws/handler.go +++ b/services/ws/handler.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -31,6 +35,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC h.setupOnce.Do(h.setup) ctx, cancel := context.WithCancel(ws.Request().Context()) + defer cancel() log := logger(ctx) incoming := eventSource.NewSink() @@ -52,7 +57,10 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC return } + // Receive websocket frames from the client and pass them to + // sess.Receive(). go func() { + defer cancel() buf := make([]byte, 2<<20) for { select { @@ -68,22 +76,24 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC err = errFrameTooBig } if err != nil { - if err != io.EOF { + if err != io.EOF && ctx.Err() == nil { log.WithError(err).Info("read error") } - cancel() return } err = sess.Receive(buf) if err != nil { log.WithError(err).Error("sess.Receive() failed") - cancel() return } } }() + // Take items from the outgoing queue, serialize them using + // sess.EventMessage() as needed, and send them to the client + // as websocket frames. go func() { + defer cancel() for { var ok bool var data interface{} @@ -109,8 +119,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC buf, err = sess.EventMessage(e) if err != nil { log.WithError(err).Error("EventMessage failed") - cancel() - break + return } else if len(buf) == 0 { log.Debug("skip") continue @@ -125,9 +134,10 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC t0 := time.Now() _, err = ws.Write(buf) if err != nil { - log.WithError(err).Error("write failed") - cancel() - break + if ctx.Err() == nil { + log.WithError(err).Error("write failed") + } + return } log.Debug("sent") @@ -149,6 +159,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC // is done/cancelled or the incoming event stream ends. Shut // down the handler if the outgoing queue fills up. go func() { + defer cancel() ticker := time.NewTicker(h.PingTimeout) defer ticker.Stop() @@ -168,10 +179,8 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC default: } } - continue case e, ok := <-incoming.Channel(): if !ok { - cancel() return } if !sess.Filter(e) { @@ -181,7 +190,6 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC case queue <- e: default: log.WithError(errQueueFull).Error("terminate") - cancel() return } }