"database/sql"
"encoding/json"
"errors"
- "log"
"sync"
+ "sync/atomic"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
)
var (
errQueueFull = errors.New("client queue full")
errFrameTooBig = errors.New("frame too big")
- sendObjectAttributes = []string{"state", "name"}
+ sendObjectAttributes = []string{"state", "name", "owner_uuid", "portable_data_hash"}
v0subscribeOK = []byte(`{"status":200}`)
v0subscribeFail = []byte(`{"status":400}`)
)
type v0session struct {
+ ac *arvados.Client
ws wsConn
+ sendq chan<- interface{}
db *sql.DB
permChecker permChecker
subscriptions []v0subscribe
+ lastMsgID uint64
+ log *logrus.Entry
mtx sync.Mutex
setupOnce sync.Once
}
-func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
+func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
sess := &v0session{
+ sendq: sendq,
ws: ws,
db: db,
- permChecker: NewPermChecker(ac),
+ ac: ac,
+ permChecker: pc,
+ log: logger(ws.Request().Context()),
}
err := ws.Request().ParseForm()
if err != nil {
- log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+ sess.log.WithError(err).Error("ParseForm failed")
return nil, err
}
token := ws.Request().Form.Get("api_token")
sess.permChecker.SetToken(token)
- sess.debugLogf("token = %+q", token)
+ sess.log.WithField("token", token).Debug("set token")
return sess, nil
}
-func (sess *v0session) debugLogf(s string, args ...interface{}) {
- args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
- debugLogf("%s "+s, args...)
-}
-
-func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
- sess.debugLogf("received message: %+v", msg)
+func (sess *v0session) Receive(buf []byte) error {
var sub v0subscribe
if err := json.Unmarshal(buf, &sub); err != nil {
- sess.debugLogf("ignored unrecognized request: %s", err)
- return nil
- }
- if sub.Method == "subscribe" {
- sub.prepare()
- sess.debugLogf("subscription: %v", sub)
+ sess.log.WithError(err).Info("invalid message from client")
+ } else if sub.Method == "subscribe" {
+ sub.prepare(sess)
+ sess.log.WithField("sub", sub).Debug("sub prepared")
+ sess.sendq <- v0subscribeOK
sess.mtx.Lock()
sess.subscriptions = append(sess.subscriptions, sub)
sess.mtx.Unlock()
-
- return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
+ sub.sendOldEvents(sess)
+ return nil
+ } else {
+ sess.log.WithField("Method", sub.Method).Info("unknown method")
}
- return [][]byte{v0subscribeFail}
+ sess.sendq <- v0subscribeFail
+ return nil
}
func (sess *v0session) EventMessage(e *event) ([]byte, error) {
return nil, err
}
+ kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
msg := map[string]interface{}{
- "msgID": e.Serial,
+ "msgID": atomic.AddUint64(&sess.lastMsgID, 1),
"id": detail.ID,
"uuid": detail.UUID,
"object_uuid": detail.ObjectUUID,
"object_owner_uuid": detail.ObjectOwnerUUID,
+ "object_kind": kind,
"event_type": detail.EventType,
+ "event_at": detail.EventAt,
}
if detail.Properties != nil && detail.Properties["text"] != nil {
msg["properties"] = detail.Properties
sess.mtx.Lock()
defer sess.mtx.Unlock()
for _, sub := range sess.subscriptions {
- if sub.match(e) {
+ if sub.match(sess, e) {
return true
}
}
return false
}
-func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
+func (sub *v0subscribe) sendOldEvents(sess *v0session) {
if sub.LastLogID == 0 {
return
}
- debugLogf("getOldEvents(%d)", sub.LastLogID)
+ sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
// Here we do a "select id" query and queue an event for every
// log since the given ID, then use (*event)Detail() to
// retrieve the whole row and decide whether to send it. This
sub.LastLogID,
time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
if err != nil {
- errorLogf("db.Query: %s", err)
+ sess.log.WithError(err).Error("db.Query failed")
return
}
for rows.Next() {
var id uint64
err := rows.Scan(&id)
if err != nil {
- errorLogf("Scan: %s", err)
+ sess.log.WithError(err).Error("row Scan failed")
continue
}
+ for len(sess.sendq)*2 > cap(sess.sendq) {
+ // Ugly... but if we fill up the whole client
+ // queue with a backlog of old events, a
+ // single new event will overflow it and
+ // terminate the connection, and then the
+ // client will probably reconnect and do the
+ // same thing all over again.
+ time.Sleep(100 * time.Millisecond)
+ }
+ now := time.Now()
e := &event{
LogID: id,
- Received: time.Now(),
+ Received: now,
+ Ready: now,
db: sess.db,
}
- if !sub.match(e) {
- debugLogf("skip old event %+v", e)
- continue
- }
- msg, err := sess.EventMessage(e)
- if err != nil {
- debugLogf("event marshal: %s", err)
- continue
+ if sub.match(sess, e) {
+ select {
+ case sess.sendq <- e:
+ case <-sess.ws.Request().Context().Done():
+ return
+ }
}
- debugLogf("old event: %s", string(msg))
- msgs = append(msgs, msg)
}
if err := rows.Err(); err != nil {
- errorLogf("db.Query: %s", err)
+ sess.log.WithError(err).Error("db.Query failed")
}
- return
}
type v0subscribe struct {
type v0filter [3]interface{}
-func (sub *v0subscribe) match(e *event) bool {
+func (sub *v0subscribe) match(sess *v0session, e *event) bool {
+ log := sess.log.WithField("LogID", e.LogID)
detail := e.Detail()
if detail == nil {
- debugLogf("match(%d): failed on no detail", e.LogID)
+ log.Error("match failed, no detail")
return false
}
+ log = log.WithField("funcs", len(sub.funcs))
for i, f := range sub.funcs {
if !f(e) {
- debugLogf("match(%d): failed on func %d", e.LogID, i)
+ log.WithField("func", i).Debug("match failed")
return false
}
}
- debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
+ log.Debug("match passed")
return true
}
-func (sub *v0subscribe) prepare() {
+func (sub *v0subscribe) prepare(sess *v0session) {
for _, f := range sub.Filters {
if len(f) != 3 {
continue
}
}
sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
for _, s := range strs {
if s == e.Detail().EventType {
return true
}
t, err := time.Parse(time.RFC3339Nano, tstr)
if err != nil {
- debugLogf("time.Parse(%q): %s", tstr, err)
+ sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
continue
}
+ var fn func(*event) bool
switch op {
case ">=":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return !e.Detail().CreatedAt.Before(t)
- })
+ }
case "<=":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return !e.Detail().CreatedAt.After(t)
- })
+ }
case ">":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return e.Detail().CreatedAt.After(t)
- })
+ }
case "<":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return e.Detail().CreatedAt.Before(t)
- })
+ }
case "=":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return e.Detail().CreatedAt.Equal(t)
- })
+ }
+ default:
+ sess.log.WithField("operator", op).Info("bogus operator")
+ continue
}
+ sub.funcs = append(sub.funcs, fn)
}
}
}