20559: Test concurrent writes using same token.
[arvados.git] / services / ws / session_v0.go
index 4fbfc489cf30fe0e56425e37d909c250f83d967d..66bc6ac7facdeb8115ba880b8e31e3db4bdf2e06 100644 (file)
@@ -2,18 +2,20 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "database/sql"
        "encoding/json"
        "errors"
+       "reflect"
        "sync"
        "sync/atomic"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/sirupsen/logrus"
 )
 
 var (
@@ -43,7 +45,7 @@ type v0session struct {
        permChecker   permChecker
        subscriptions []v0subscribe
        lastMsgID     uint64
-       log           *logrus.Entry
+       log           logrus.FieldLogger
        mtx           sync.Mutex
        setupOnce     sync.Once
 }
@@ -58,7 +60,7 @@ func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecke
                db:          db,
                ac:          ac,
                permChecker: pc,
-               log:         logger(ws.Request().Context()),
+               log:         ctxlog.FromContext(ws.Request().Context()),
        }
 
        err := ws.Request().ParseForm()
@@ -86,6 +88,24 @@ func (sess *v0session) Receive(buf []byte) error {
                sess.mtx.Unlock()
                sub.sendOldEvents(sess)
                return nil
+       } else if sub.Method == "unsubscribe" {
+               sess.mtx.Lock()
+               found := false
+               for i, s := range sess.subscriptions {
+                       if !reflect.DeepEqual(s.Filters, sub.Filters) {
+                               continue
+                       }
+                       copy(sess.subscriptions[i:], sess.subscriptions[i+1:])
+                       sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1]
+                       found = true
+                       break
+               }
+               sess.mtx.Unlock()
+               sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe")
+               if found {
+                       sess.sendq <- v0subscribeOK
+                       return nil
+               }
        } else {
                sess.log.WithField("Method", sub.Method).Info("unknown method")
        }
@@ -109,7 +129,7 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) {
        } else {
                permTarget = detail.ObjectUUID
        }
-       ok, err := sess.permChecker.Check(permTarget)
+       ok, err := sess.permChecker.Check(sess.ws.Request().Context(), permTarget)
        if err != nil || !ok {
                return nil, err
        }
@@ -181,9 +201,9 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                return
        }
 
-       var ids []uint64
+       var ids []int64
        for rows.Next() {
-               var id uint64
+               var id int64
                err := rows.Scan(&id)
                if err != nil {
                        sess.log.WithError(err).Error("sendOldEvents row Scan failed")