18870: Need to declare NODES as array
[arvados.git] / services / ws / session_v0.go
index 58c64231cb53c1204ceed70b0ea030a7050ebb95..309352b39edbd329aa031ec0c6194791341acec9 100644 (file)
@@ -2,18 +2,20 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "database/sql"
        "encoding/json"
        "errors"
+       "reflect"
        "sync"
        "sync/atomic"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/sirupsen/logrus"
 )
 
 var (
@@ -43,7 +45,7 @@ type v0session struct {
        permChecker   permChecker
        subscriptions []v0subscribe
        lastMsgID     uint64
-       log           *logrus.Entry
+       log           logrus.FieldLogger
        mtx           sync.Mutex
        setupOnce     sync.Once
 }
@@ -58,7 +60,7 @@ func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecke
                db:          db,
                ac:          ac,
                permChecker: pc,
-               log:         logger(ws.Request().Context()),
+               log:         ctxlog.FromContext(ws.Request().Context()),
        }
 
        err := ws.Request().ParseForm()
@@ -86,6 +88,24 @@ func (sess *v0session) Receive(buf []byte) error {
                sess.mtx.Unlock()
                sub.sendOldEvents(sess)
                return nil
+       } else if sub.Method == "unsubscribe" {
+               sess.mtx.Lock()
+               found := false
+               for i, s := range sess.subscriptions {
+                       if !reflect.DeepEqual(s.Filters, sub.Filters) {
+                               continue
+                       }
+                       copy(sess.subscriptions[i:], sess.subscriptions[i+1:])
+                       sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1]
+                       found = true
+                       break
+               }
+               sess.mtx.Unlock()
+               sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe")
+               if found {
+                       sess.sendq <- v0subscribeOK
+                       return nil
+               }
        } else {
                sess.log.WithField("Method", sub.Method).Info("unknown method")
        }
@@ -109,7 +129,7 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) {
        } else {
                permTarget = detail.ObjectUUID
        }
-       ok, err := sess.permChecker.Check(permTarget)
+       ok, err := sess.permChecker.Check(sess.ws.Request().Context(), permTarget)
        if err != nil || !ok {
                return nil, err
        }
@@ -205,6 +225,10 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        // client will probably reconnect and do the
                        // same thing all over again.
                        time.Sleep(100 * time.Millisecond)
+                       if sess.ws.Request().Context().Err() != nil {
+                               // Session terminated while we were sleeping
+                               return
+                       }
                }
                now := time.Now()
                e := &event{