11167: Merge branch 'master' into 11167-wb-remove-arvget
[arvados.git] / services / ws / handler.go
index f9f7f53edc58430f231e9a52d5d95bb1a025084a..d527c39ba1c4eeb12c0cbae63526150da27f096d 100644 (file)
@@ -60,6 +60,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // Receive websocket frames from the client and pass them to
        // sess.Receive().
        go func() {
+               defer cancel()
                buf := make([]byte, 2<<20)
                for {
                        select {
@@ -75,16 +76,14 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                err = errFrameTooBig
                        }
                        if err != nil {
-                               if err != io.EOF {
+                               if err != io.EOF && ctx.Err() == nil {
                                        log.WithError(err).Info("read error")
                                }
-                               cancel()
                                return
                        }
                        err = sess.Receive(buf)
                        if err != nil {
                                log.WithError(err).Error("sess.Receive() failed")
-                               cancel()
                                return
                        }
                }
@@ -94,6 +93,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // sess.EventMessage() as needed, and send them to the client
        // as websocket frames.
        go func() {
+               defer cancel()
                for {
                        var ok bool
                        var data interface{}
@@ -119,8 +119,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                buf, err = sess.EventMessage(e)
                                if err != nil {
                                        log.WithError(err).Error("EventMessage failed")
-                                       cancel()
-                                       break
+                                       return
                                } else if len(buf) == 0 {
                                        log.Debug("skip")
                                        continue
@@ -135,9 +134,10 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                        t0 := time.Now()
                        _, err = ws.Write(buf)
                        if err != nil {
-                               log.WithError(err).Error("write failed")
-                               cancel()
-                               break
+                               if ctx.Err() == nil {
+                                       log.WithError(err).Error("write failed")
+                               }
+                               return
                        }
                        log.Debug("sent")
 
@@ -159,6 +159,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // is done/cancelled or the incoming event stream ends. Shut
        // down the handler if the outgoing queue fills up.
        go func() {
+               defer cancel()
                ticker := time.NewTicker(h.PingTimeout)
                defer ticker.Stop()
 
@@ -178,10 +179,8 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                        default:
                                        }
                                }
-                               continue
                        case e, ok := <-incoming.Channel():
                                if !ok {
-                                       cancel()
                                        return
                                }
                                if !sess.Filter(e) {
@@ -191,7 +190,6 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                case queue <- e:
                                default:
                                        log.WithError(errQueueFull).Error("terminate")
-                                       cancel()
                                        return
                                }
                        }