8460: Retrieve recent logs and send old matching events if last_log_id given.
authorTom Clegg <tom@curoverse.com>
Wed, 16 Nov 2016 06:45:41 +0000 (01:45 -0500)
committerTom Clegg <tom@curoverse.com>
Wed, 16 Nov 2016 06:45:41 +0000 (01:45 -0500)
sdk/go/arvados/log.go
services/ws/session_v0.go

index caea04c82a2517a8ca8ddfbf6b3c7f9111cce158..ef56e85ae7b0e8711b38a65090823d182b994415 100644 (file)
@@ -14,3 +14,11 @@ type Log struct {
        Properties      map[string]interface{} `json:"properties"`
        CreatedAt       *time.Time             `json:"created_at,omitempty"`
 }
+
+// LogList is an arvados#logList resource.
+type LogList struct {
+       Items          []Log `json:"items"`
+       ItemsAvailable int   `json:"items_available"`
+       Offset         int   `json:"offset"`
+       Limit          int   `json:"limit"`
+}
index c888a843e171c0613cfb198afdf38500a0858c8f..5622304bf4df562e56b1f478f3f3ecb23bcd9333 100644 (file)
@@ -3,7 +3,9 @@ package main
 import (
        "encoding/json"
        "errors"
+       "fmt"
        "log"
+       "net/url"
        "sync"
        "time"
 
@@ -64,6 +66,42 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte
                sess.mtx.Lock()
                sess.subscriptions = append(sess.subscriptions, sub)
                sess.mtx.Unlock()
+
+               resp := [][]byte{v0subscribeOK}
+
+               if sub.LastLogID > 0 {
+                       // Hack 1: use the permission checker's
+                       // Arvados client to retrieve old log IDs. Use
+                       // "created < 2 hours ago" to ensure the
+                       // database query doesn't get too expensive
+                       // when our client gives us last_log_id==1.
+                       ac := sess.permChecker.(*cachingPermChecker).Client
+                       var old arvados.LogList
+                       ac.RequestAndDecode(&old, "GET", "arvados/v1/logs", nil, url.Values{
+                               "limit": {"1000"},
+                               "filters": {fmt.Sprintf(
+                                       `[["id",">",%d],["created_at",">","%s"]]`,
+                                       sub.LastLogID,
+                                       time.Now().UTC().Add(-2*time.Hour).Format(time.RFC3339Nano))},
+                       })
+                       for _, log := range old.Items {
+                               // Hack 2: populate the event's logRow
+                               // using the API response -- otherwise
+                               // Detail() would crash because e.db
+                               // is nil.
+                               e := &event{
+                                       LogID:    log.ID,
+                                       Received: time.Now(),
+                                       logRow:   &log,
+                               }
+                               msg, err := sess.EventMessage(e)
+                               if err != nil {
+                                       continue
+                               }
+                               resp = append(resp, msg)
+                       }
+               }
+
                return [][]byte{v0subscribeOK}
        }
        return [][]byte{v0subscribeFail}
@@ -122,9 +160,11 @@ func (sess *v0session) Filter(e *event) bool {
 }
 
 type v0subscribe struct {
-       Method  string
-       Filters []v0filter
-       funcs   []func(*event) bool
+       Method    string
+       Filters   []v0filter
+       LastLogID int64 `json:"last_log_id"`
+
+       funcs []func(*event) bool
 }
 
 type v0filter [3]interface{}