From 8e37378b2955346c2b4a3c1e38fcdfb2e74b7e07 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 16 Nov 2016 01:45:41 -0500 Subject: [PATCH] 8460: Retrieve recent logs and send old matching events if last_log_id given. --- sdk/go/arvados/log.go | 8 +++++++ services/ws/session_v0.go | 46 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go index caea04c82a..ef56e85ae7 100644 --- a/sdk/go/arvados/log.go +++ b/sdk/go/arvados/log.go @@ -14,3 +14,11 @@ type Log struct { Properties map[string]interface{} `json:"properties"` CreatedAt *time.Time `json:"created_at,omitempty"` } + +// LogList is an arvados#logList resource. +type LogList struct { + Items []Log `json:"items"` + ItemsAvailable int `json:"items_available"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index c888a843e1..5622304bf4 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -3,7 +3,9 @@ package main import ( "encoding/json" "errors" + "fmt" "log" + "net/url" "sync" "time" @@ -64,6 +66,42 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte sess.mtx.Lock() sess.subscriptions = append(sess.subscriptions, sub) sess.mtx.Unlock() + + resp := [][]byte{v0subscribeOK} + + if sub.LastLogID > 0 { + // Hack 1: use the permission checker's + // Arvados client to retrieve old log IDs. Use + // "created < 2 hours ago" to ensure the + // database query doesn't get too expensive + // when our client gives us last_log_id==1. + ac := sess.permChecker.(*cachingPermChecker).Client + var old arvados.LogList + ac.RequestAndDecode(&old, "GET", "arvados/v1/logs", nil, url.Values{ + "limit": {"1000"}, + "filters": {fmt.Sprintf( + `[["id",">",%d],["created_at",">","%s"]]`, + sub.LastLogID, + time.Now().UTC().Add(-2*time.Hour).Format(time.RFC3339Nano))}, + }) + for _, log := range old.Items { + // Hack 2: populate the event's logRow + // using the API response -- otherwise + // Detail() would crash because e.db + // is nil. + e := &event{ + LogID: log.ID, + Received: time.Now(), + logRow: &log, + } + msg, err := sess.EventMessage(e) + if err != nil { + continue + } + resp = append(resp, msg) + } + } + return [][]byte{v0subscribeOK} } return [][]byte{v0subscribeFail} @@ -122,9 +160,11 @@ func (sess *v0session) Filter(e *event) bool { } type v0subscribe struct { - Method string - Filters []v0filter - funcs []func(*event) bool + Method string + Filters []v0filter + LastLogID int64 `json:"last_log_id"` + + funcs []func(*event) bool } type v0filter [3]interface{} -- 2.30.2