8460: Merge branch 'master' into 8460-websocket-go
authorTom Clegg <tom@curoverse.com>
Wed, 16 Nov 2016 23:15:56 +0000 (18:15 -0500)
committerTom Clegg <tom@curoverse.com>
Wed, 16 Nov 2016 23:15:56 +0000 (18:15 -0500)
20 files changed:
build/run-build-packages.sh
build/run-tests.sh
sdk/go/arvados/client.go
sdk/go/arvados/log.go [new file with mode: 0644]
sdk/go/config/load.go
sdk/python/tests/nginx.conf
sdk/python/tests/run_test_server.py
sdk/python/tests/test_events.py
services/api/config/application.default.yml
services/ws/config.go [new file with mode: 0644]
services/ws/event.go [new file with mode: 0644]
services/ws/handler.go [new file with mode: 0644]
services/ws/log.go [new file with mode: 0644]
services/ws/main.go [new file with mode: 0644]
services/ws/permission.go [new file with mode: 0644]
services/ws/pg.go [new file with mode: 0644]
services/ws/router.go [new file with mode: 0644]
services/ws/session.go [new file with mode: 0644]
services/ws/session_v0.go [new file with mode: 0644]
services/ws/session_v1.go [new file with mode: 0644]

index 320f9d445c3a052a62bf5b8560b2080c98b06904..0a4559f95b3518aff58246fe4839052a7d3746b8 100755 (executable)
@@ -427,6 +427,8 @@ package_go_binary services/keepstore keepstore \
     "Keep storage daemon, accessible to clients on the LAN"
 package_go_binary services/keep-web keep-web \
     "Static web hosting service for user data stored in Arvados Keep"
+package_go_binary services/ws arvados-ws \
+    "Arvados Websocket server"
 package_go_binary tools/keep-block-check keep-block-check \
     "Verify that all data from one set of Keep servers to another was copied"
 package_go_binary tools/keep-rsync keep-rsync \
index 8959cfbe09c3ea7ac6ded2142b626259787d2121..2c288179529bcdfa15d44f1c10fd68a409908012 100755 (executable)
@@ -79,6 +79,7 @@ services/nodemanager
 services/crunch-run
 services/crunch-dispatch-local
 services/crunch-dispatch-slurm
+services/ws
 sdk/cli
 sdk/pam
 sdk/python
@@ -264,15 +265,18 @@ start_api() {
         && eval $(python sdk/python/tests/run_test_server.py start --auth admin) \
         && export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
         && export ARVADOS_TEST_API_INSTALLED="$$" \
+        && python sdk/python/tests/run_test_server.py start_ws \
+        && python sdk/python/tests/run_test_server.py start_nginx \
         && (env | egrep ^ARVADOS)
 }
 
 start_nginx_proxy_services() {
-    echo 'Starting keepproxy, keep-web, arv-git-httpd, and nginx ssl proxy...'
+    echo 'Starting keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
     cd "$WORKSPACE" \
         && python sdk/python/tests/run_test_server.py start_keep_proxy \
         && python sdk/python/tests/run_test_server.py start_keep-web \
         && python sdk/python/tests/run_test_server.py start_arv-git-httpd \
+        && python sdk/python/tests/run_test_server.py start_ws \
         && python sdk/python/tests/run_test_server.py start_nginx \
         && export ARVADOS_TEST_PROXY_SERVICES=1
 }
@@ -283,12 +287,15 @@ stop_services() {
         cd "$WORKSPACE" \
             && python sdk/python/tests/run_test_server.py stop_nginx \
             && python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
+            && python sdk/python/tests/run_test_server.py stop_ws \
             && python sdk/python/tests/run_test_server.py stop_keep-web \
             && python sdk/python/tests/run_test_server.py stop_keep_proxy
     fi
     if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
         unset ARVADOS_TEST_API_HOST
         cd "$WORKSPACE" \
+            && python sdk/python/tests/run_test_server.py stop_nginx \
+            && python sdk/python/tests/run_test_server.py stop_ws \
             && python sdk/python/tests/run_test_server.py stop
     fi
 }
@@ -765,6 +772,7 @@ gostuff=(
     services/crunch-dispatch-local
     services/crunch-dispatch-slurm
     services/crunch-run
+    services/ws
     tools/keep-block-check
     tools/keep-exercise
     tools/keep-rsync
index 36f4eb52ae298982dfa09ddf82b0cea08c2604f7..0c18d38974f8be6ce99460ef2713e220a2cff403 100644 (file)
@@ -41,6 +41,8 @@ type Client struct {
        // callers who use a Client to initialize an
        // arvadosclient.ArvadosClient.)
        KeepServiceURIs []string `json:",omitempty"`
+
+       dd *DiscoveryDocument
 }
 
 // The default http.Client used by a Client with Insecure==true and
@@ -198,14 +200,83 @@ func (c *Client) apiURL(path string) string {
 
 // DiscoveryDocument is the Arvados server's description of itself.
 type DiscoveryDocument struct {
-       DefaultCollectionReplication int   `json:"defaultCollectionReplication"`
-       BlobSignatureTTL             int64 `json:"blobSignatureTtl"`
+       BasePath                     string              `json:"basePath"`
+       DefaultCollectionReplication int                 `json:"defaultCollectionReplication"`
+       BlobSignatureTTL             int64               `json:"blobSignatureTtl"`
+       Schemas                      map[string]Schema   `json:"schemas"`
+       Resources                    map[string]Resource `json:"resources"`
+}
+
+type Resource struct {
+       Methods map[string]ResourceMethod `json:"methods"`
+}
+
+type ResourceMethod struct {
+       HTTPMethod string         `json:"httpMethod"`
+       Path       string         `json:"path"`
+       Response   MethodResponse `json:"response"`
+}
+
+type MethodResponse struct {
+       Ref string `json:"$ref"`
+}
+
+type Schema struct {
+       UUIDPrefix string `json:"uuidPrefix"`
 }
 
 // DiscoveryDocument returns a *DiscoveryDocument. The returned object
 // should not be modified: the same object may be returned by
 // subsequent calls.
 func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+       if c.dd != nil {
+               return c.dd, nil
+       }
        var dd DiscoveryDocument
-       return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+       err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+       if err != nil {
+               return nil, err
+       }
+       c.dd = &dd
+       return c.dd, nil
+}
+
+func (c *Client) PathForUUID(method, uuid string) (string, error) {
+       if len(uuid) != 27 {
+               return "", fmt.Errorf("invalid UUID: %q", uuid)
+       }
+       dd, err := c.DiscoveryDocument()
+       if err != nil {
+               return "", err
+       }
+       infix := uuid[6:11]
+       var model string
+       for m, s := range dd.Schemas {
+               if s.UUIDPrefix == infix {
+                       model = m
+                       break
+               }
+       }
+       if model == "" {
+               return "", fmt.Errorf("unrecognized UUID infix: %q", infix)
+       }
+       var resource string
+       for r, rsc := range dd.Resources {
+               if rsc.Methods["get"].Response.Ref == model {
+                       resource = r
+                       break
+               }
+       }
+       if resource == "" {
+               return "", fmt.Errorf("no resource for model: %q", model)
+       }
+       m, ok := dd.Resources[resource].Methods[method]
+       if !ok {
+               return "", fmt.Errorf("no method %q for resource %q", method, resource)
+       }
+       path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1)
+       if path[0] == '/' {
+               path = path[1:]
+       }
+       return path, nil
 }
diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go
new file mode 100644 (file)
index 0000000..ef56e85
--- /dev/null
@@ -0,0 +1,24 @@
+package arvados
+
+import (
+       "time"
+)
+
+// Log is an arvados#log record
+type Log struct {
+       ID              uint64                 `json:"id"`
+       UUID            string                 `json:"uuid"`
+       ObjectUUID      string                 `json:"object_uuid"`
+       ObjectOwnerUUID string                 `json:"object_owner_uuid"`
+       EventType       string                 `json:"event_type"`
+       Properties      map[string]interface{} `json:"properties"`
+       CreatedAt       *time.Time             `json:"created_at,omitempty"`
+}
+
+// LogList is an arvados#logList resource.
+type LogList struct {
+       Items          []Log `json:"items"`
+       ItemsAvailable int   `json:"items_available"`
+       Offset         int   `json:"offset"`
+       Limit          int   `json:"limit"`
+}
index 9c65d65e84a57d9120d69dd84912615ff3949e35..2bbb440fb31211241a78a6f15c788f7e4d706334 100644 (file)
@@ -22,3 +22,8 @@ func LoadFile(cfg interface{}, configPath string) error {
        }
        return nil
 }
+
+// Dump returns a YAML representation of cfg.
+func Dump(cfg interface{}) ([]byte, error) {
+       return yaml.Marshal(cfg)
+}
index 2b8b6ca1c4ad531c29bfd1c9a149da7c9bdf3599..006604077d457d286cdb9148332e087a0204313c 100644 (file)
@@ -54,4 +54,20 @@ http {
       proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/;
     }
   }
+  upstream ws {
+    server localhost:{{WSPORT}};
+  }
+  server {
+    listen *:{{WSSPORT}} ssl default_server;
+    server_name ~^(?<request_host>.*)$;
+    ssl_certificate {{SSLCERT}};
+    ssl_certificate_key {{SSLKEY}};
+    location  / {
+      proxy_pass http://ws;
+      proxy_set_header Upgrade $http_upgrade;
+      proxy_set_header Connection "upgrade";
+      proxy_set_header Host $request_host:{{WSPORT}};
+      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+    }
+  }
 }
index 642b7ccbad51846a9f1c25acc86f6b0505897c62..5ef5e2a9f5c8b83e8c7ece912ea7eba6624968c8 100644 (file)
@@ -44,6 +44,7 @@ if not os.path.exists(TEST_TMPDIR):
 
 my_api_host = None
 _cached_config = {}
+_cached_db_config = {}
 
 def find_server_pid(PID_PATH, wait=10):
     now = time.time()
@@ -284,10 +285,16 @@ def run(leave_running_atexit=False):
         os.makedirs(gitdir)
     subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball])
 
+    # The nginx proxy isn't listening here yet, but we need to choose
+    # the wss:// port now so we can write the API server config file.
+    wss_port = find_available_port()
+    _setport('wss', wss_port)
+
     port = find_available_port()
     env = os.environ.copy()
     env['RAILS_ENV'] = 'test'
-    env['ARVADOS_WEBSOCKETS'] = 'yes'
+    env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
+    env.pop('ARVADOS_WEBSOCKETS', None)
     env.pop('ARVADOS_TEST_API_HOST', None)
     env.pop('ARVADOS_API_HOST', None)
     env.pop('ARVADOS_API_HOST_INSECURE', None)
@@ -360,6 +367,45 @@ def stop(force=False):
         kill_server_pid(_pidfile('api'))
         my_api_host = None
 
+def run_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    stop_ws()
+    port = find_available_port()
+    conf = os.path.join(TEST_TMPDIR, 'ws.yml')
+    with open(conf, 'w') as f:
+        f.write("""
+Client:
+  APIHost: {}
+  Insecure: true
+Listen: :{}
+Postgres:
+  host: {}
+  dbname: {}
+  user: {}
+  password: {}
+  sslmode: require
+        """.format(os.environ['ARVADOS_API_HOST'],
+                   port,
+                   _dbconfig('host'),
+                   _dbconfig('database'),
+                   _dbconfig('username'),
+                   _dbconfig('password')))
+    logf = open(_fifo2stderr('ws'), 'w')
+    ws = subprocess.Popen(
+        ["ws", "-config", conf],
+        stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+    with open(_pidfile('ws'), 'w') as f:
+        f.write(str(ws.pid))
+    _wait_until_port_listens(port)
+    _setport('ws', port)
+    return port
+
+def stop_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    kill_server_pid(_pidfile('ws'))
+
 def _start_keep(n, keep_args):
     keep0 = tempfile.mkdtemp()
     port = find_available_port()
@@ -545,6 +591,8 @@ def run_nginx():
     nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
     nginxconf['GITPORT'] = _getport('arv-git-httpd')
     nginxconf['GITSSLPORT'] = find_available_port()
+    nginxconf['WSPORT'] = _getport('ws')
+    nginxconf['WSSPORT'] = _getport('wss')
     nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
     nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
     nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
@@ -593,7 +641,15 @@ def _getport(program):
     except IOError:
         return 9
 
+def _dbconfig(key):
+    global _cached_db_config
+    if not _cached_db_config:
+        _cached_db_config = yaml.load(open(os.path.join(
+            SERVICES_SRC_DIR, 'api', 'config', 'database.yml')))
+    return _cached_db_config['test'][key]
+
 def _apiconfig(key):
+    global _cached_config
     if _cached_config:
         return _cached_config[key]
     def _load(f, required=True):
@@ -647,6 +703,7 @@ class TestCaseWithServers(unittest.TestCase):
     original environment.
     """
     MAIN_SERVER = None
+    WS_SERVER = None
     KEEP_SERVER = None
     KEEP_PROXY_SERVER = None
     KEEP_WEB_SERVER = None
@@ -667,6 +724,7 @@ class TestCaseWithServers(unittest.TestCase):
         os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
         for server_kwargs, start_func, stop_func in (
                 (cls.MAIN_SERVER, run, reset),
+                (cls.WS_SERVER, run_ws, stop_ws),
                 (cls.KEEP_SERVER, run_keep, stop_keep),
                 (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
                 (cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
@@ -693,6 +751,7 @@ class TestCaseWithServers(unittest.TestCase):
 if __name__ == "__main__":
     actions = [
         'start', 'stop',
+        'start_ws', 'stop_ws',
         'start_keep', 'stop_keep',
         'start_keep_proxy', 'stop_keep_proxy',
         'start_keep-web', 'stop_keep-web',
@@ -725,6 +784,10 @@ if __name__ == "__main__":
             print(host)
     elif args.action == 'stop':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
+    elif args.action == 'start_ws':
+        run_ws()
+    elif args.action == 'stop_ws':
+        stop_ws()
     elif args.action == 'start_keep':
         run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
     elif args.action == 'stop_keep':
index f2cdba28c775a523bc178052644cb4a76dac2771..7ce4dc93fc7d070e6a5da40adf8fe26ce13c4b33 100644 (file)
@@ -51,21 +51,22 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         self.assertEqual(200, events.get(True, 5)['status'])
         human = arvados.api('v1').humans().create(body={}).execute()
 
-        log_object_uuids = []
-        for i in range(0, expected):
-            log_object_uuids.append(events.get(True, 5)['object_uuid'])
-
+        want_uuids = []
         if expected > 0:
-            self.assertIn(human['uuid'], log_object_uuids)
-
+            want_uuids.append(human['uuid'])
         if expected > 1:
-            self.assertIn(ancestor['uuid'], log_object_uuids)
+            want_uuids.append(ancestor['uuid'])
+        log_object_uuids = []
+        while set(want_uuids) - set(log_object_uuids):
+            log_object_uuids.append(events.get(True, 5)['object_uuid'])
 
-        with self.assertRaises(Queue.Empty):
-            # assertEqual just serves to show us what unexpected thing
-            # comes out of the queue when the assertRaises fails; when
-            # the test passes, this assertEqual doesn't get called.
-            self.assertEqual(events.get(True, 2), None)
+        if expected < 2:
+            with self.assertRaises(Queue.Empty):
+                # assertEqual just serves to show us what unexpected
+                # thing comes out of the queue when the assertRaises
+                # fails; when the test passes, this assertEqual
+                # doesn't get called.
+                self.assertEqual(events.get(True, 2), None)
 
     def test_subscribe_websocket(self):
         self._test_subscribe(
@@ -143,8 +144,8 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
 
     def isotz(self, offset):
-        """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
-        return '{:+03d}{:02d}'.format(offset/60, offset%60)
+        """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
+        return '{:+03d}:{:02d}'.format(offset/60, offset%60)
 
     # Test websocket reconnection on (un)execpted close
     def _test_websocket_reconnect(self, close_unexpected):
index a9aa953f9f36e948dc57d34336c7d3f1cc1df43c..ab560d7f6b79724a9a3ff1a22f6a413e16eb18c4 100644 (file)
@@ -444,3 +444,4 @@ test:
   workbench_address: https://localhost:3001/
   git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %>
   git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %>
+  websocket_address: "wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"
diff --git a/services/ws/config.go b/services/ws/config.go
new file mode 100644 (file)
index 0000000..9c2e80a
--- /dev/null
@@ -0,0 +1,37 @@
+package main
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type Config struct {
+       Client   arvados.Client
+       Postgres pgConfig
+       Listen   string
+       Debug    bool
+
+       PingTimeout      arvados.Duration
+       ClientEventQueue int
+       ServerEventQueue int
+}
+
+func DefaultConfig() Config {
+       return Config{
+               Client: arvados.Client{
+                       APIHost: "localhost:443",
+               },
+               Postgres: pgConfig{
+                       "dbname":          "arvados_production",
+                       "user":            "arvados",
+                       "password":        "xyzzy",
+                       "host":            "localhost",
+                       "connect_timeout": "30",
+                       "sslmode":         "require",
+               },
+               PingTimeout:      arvados.Duration(time.Minute),
+               ClientEventQueue: 64,
+               ServerEventQueue: 4,
+       }
+}
diff --git a/services/ws/event.go b/services/ws/event.go
new file mode 100644 (file)
index 0000000..77acf44
--- /dev/null
@@ -0,0 +1,64 @@
+package main
+
+import (
+       "database/sql"
+       "log"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/ghodss/yaml"
+)
+
+type eventSink interface {
+       Channel() <-chan *event
+       Stop()
+}
+
+type eventSource interface {
+       NewSink() eventSink
+       DB() *sql.DB
+}
+
+type event struct {
+       LogID    uint64
+       Received time.Time
+       Serial   uint64
+
+       db     *sql.DB
+       logRow *arvados.Log
+       err    error
+       mtx    sync.Mutex
+}
+
+// Detail returns the database row corresponding to the event. It can
+// be called safely from multiple goroutines. Only one attempt will be
+// made. If the database row cannot be retrieved, Detail returns nil.
+func (e *event) Detail() *arvados.Log {
+       e.mtx.Lock()
+       defer e.mtx.Unlock()
+       if e.logRow != nil || e.err != nil {
+               return e.logRow
+       }
+       var logRow arvados.Log
+       var propYAML []byte
+       e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
+               &logRow.ID,
+               &logRow.UUID,
+               &logRow.ObjectUUID,
+               &logRow.ObjectOwnerUUID,
+               &logRow.EventType,
+               &logRow.CreatedAt,
+               &propYAML)
+       if e.err != nil {
+               log.Printf("retrieving log row %d: %s", e.LogID, e.err)
+               return nil
+       }
+       e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
+       if e.err != nil {
+               log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err)
+               return nil
+       }
+       e.logRow = &logRow
+       return e.logRow
+}
diff --git a/services/ws/handler.go b/services/ws/handler.go
new file mode 100644 (file)
index 0000000..1470c66
--- /dev/null
@@ -0,0 +1,175 @@
+package main
+
+import (
+       "encoding/json"
+       "io"
+       "log"
+       "net/http"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type wsConn interface {
+       io.ReadWriter
+       Request() *http.Request
+       SetReadDeadline(time.Time) error
+       SetWriteDeadline(time.Time) error
+}
+
+type handler struct {
+       Client      arvados.Client
+       PingTimeout time.Duration
+       QueueSize   int
+       NewSession  func(wsConn) (session, error)
+}
+
+type handlerStats struct {
+       QueueDelay time.Duration
+       WriteDelay time.Duration
+       EventBytes uint64
+       EventCount uint64
+}
+
+func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
+       sess, err := h.NewSession(ws)
+       if err != nil {
+               log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
+               return
+       }
+
+       queue := make(chan interface{}, h.QueueSize)
+
+       stopped := make(chan struct{})
+       stop := make(chan error, 5)
+
+       go func() {
+               buf := make([]byte, 2<<20)
+               for {
+                       select {
+                       case <-stopped:
+                               return
+                       default:
+                       }
+                       ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
+                       n, err := ws.Read(buf)
+                       sess.debugLogf("received frame: %q", buf[:n])
+                       if err == nil && n == len(buf) {
+                               err = errFrameTooBig
+                       }
+                       if err != nil {
+                               if err != io.EOF {
+                                       sess.debugLogf("handler: read: %s", err)
+                               }
+                               stop <- err
+                               return
+                       }
+                       msg := make(map[string]interface{})
+                       err = json.Unmarshal(buf[:n], &msg)
+                       if err != nil {
+                               sess.debugLogf("handler: unmarshal: %s", err)
+                               stop <- err
+                               return
+                       }
+                       for _, buf := range sess.Receive(msg, buf[:n]) {
+                               sess.debugLogf("handler: to queue: %s", string(buf))
+                               queue <- buf
+                       }
+               }
+       }()
+
+       go func() {
+               for e := range queue {
+                       if buf, ok := e.([]byte); ok {
+                               ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+                               sess.debugLogf("handler: send msg: %s", string(buf))
+                               _, err := ws.Write(buf)
+                               if err != nil {
+                                       sess.debugLogf("handler: write {}: %s", err)
+                                       stop <- err
+                                       break
+                               }
+                               continue
+                       }
+                       e := e.(*event)
+
+                       buf, err := sess.EventMessage(e)
+                       if err != nil {
+                               sess.debugLogf("EventMessage %d: err %s", err)
+                               stop <- err
+                               break
+                       } else if len(buf) == 0 {
+                               sess.debugLogf("EventMessage %d: skip", e.Serial)
+                               continue
+                       }
+
+                       sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
+                       ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+                       t0 := time.Now()
+                       _, err = ws.Write(buf)
+                       if err != nil {
+                               sess.debugLogf("handler: write: %s", err)
+                               stop <- err
+                               break
+                       }
+                       sess.debugLogf("handler: sent event %d", e.Serial)
+                       stats.WriteDelay += time.Since(t0)
+                       stats.QueueDelay += t0.Sub(e.Received)
+                       stats.EventBytes += uint64(len(buf))
+                       stats.EventCount++
+               }
+               for _ = range queue {
+               }
+       }()
+
+       // Filter incoming events against the current subscription
+       // list, and forward matching events to the outgoing message
+       // queue. Close the queue and return when the "stopped"
+       // channel closes or the incoming event stream ends. Shut down
+       // the handler if the outgoing queue fills up.
+       go func() {
+               send := func(e *event) {
+                       select {
+                       case queue <- e:
+                       default:
+                               stop <- errQueueFull
+                       }
+               }
+
+               ticker := time.NewTicker(h.PingTimeout)
+               defer ticker.Stop()
+
+               for {
+                       var e *event
+                       var ok bool
+                       select {
+                       case <-stopped:
+                               close(queue)
+                               return
+                       case <-ticker.C:
+                               // If the outgoing queue is empty,
+                               // send an empty message. This can
+                               // help detect a disconnected network
+                               // socket, and prevent an idle socket
+                               // from being closed.
+                               if len(queue) == 0 {
+                                       queue <- []byte(`{}`)
+                               }
+                               continue
+                       case e, ok = <-events:
+                               if !ok {
+                                       close(queue)
+                                       return
+                               }
+                       }
+                       if sess.Filter(e) {
+                               send(e)
+                       }
+               }
+       }()
+
+       <-stop
+       close(stopped)
+
+       return
+}
diff --git a/services/ws/log.go b/services/ws/log.go
new file mode 100644 (file)
index 0000000..1511691
--- /dev/null
@@ -0,0 +1,41 @@
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "log"
+       "time"
+)
+
+func init() {
+       log.SetFlags(0)
+}
+
+func errorLogf(f string, args ...interface{}) {
+       log.Print(`{"error":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
+}
+
+var debugLogf = func(f string, args ...interface{}) {
+       log.Print(`{"debug":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
+}
+
+func mustMarshal(v interface{}) []byte {
+       buf, err := json.Marshal(v)
+       if err != nil {
+               panic(err)
+       }
+       return buf
+}
+
+func logj(args ...interface{}) {
+       m := map[string]interface{}{"Time": time.Now().UTC()}
+       for i := 0; i < len(args)-1; i += 2 {
+               m[fmt.Sprintf("%s", args[i])] = args[i+1]
+       }
+       buf, err := json.Marshal(m)
+       if err != nil {
+               errorLogf("logj: %s", err)
+               return
+       }
+       log.Print(string(buf))
+}
diff --git a/services/ws/main.go b/services/ws/main.go
new file mode 100644 (file)
index 0000000..719128f
--- /dev/null
@@ -0,0 +1,54 @@
+package main
+
+import (
+       "flag"
+       "fmt"
+       "log"
+       "net/http"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/config"
+)
+
+func main() {
+       configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+       dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
+       cfg := DefaultConfig()
+       flag.Parse()
+
+       err := config.LoadFile(&cfg, *configPath)
+       if err != nil {
+               log.Fatal(err)
+       }
+       if !cfg.Debug {
+               debugLogf = func(string, ...interface{}) {}
+       }
+
+       if *dumpConfig {
+               txt, err := config.Dump(&cfg)
+               if err != nil {
+                       log.Fatal(err)
+               }
+               fmt.Print(string(txt))
+               return
+       }
+
+       eventSource := &pgEventSource{
+               DataSource: cfg.Postgres.ConnectionString(),
+               QueueSize:  cfg.ServerEventQueue,
+       }
+       srv := &http.Server{
+               Addr:           cfg.Listen,
+               ReadTimeout:    time.Minute,
+               WriteTimeout:   time.Minute,
+               MaxHeaderBytes: 1 << 20,
+               Handler: &router{
+                       Config:      &cfg,
+                       eventSource: eventSource,
+               },
+       }
+       eventSource.NewSink().Stop()
+
+       log.Printf("listening at %s", srv.Addr)
+       log.Fatal(srv.ListenAndServe())
+}
diff --git a/services/ws/permission.go b/services/ws/permission.go
new file mode 100644 (file)
index 0000000..1dc06b8
--- /dev/null
@@ -0,0 +1,86 @@
+package main
+
+import (
+       "net/http"
+       "net/url"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+const (
+       maxPermCacheAge = time.Hour
+       minPermCacheAge = 5 * time.Minute
+)
+
+type permChecker interface {
+       SetToken(token string)
+       Check(uuid string) (bool, error)
+}
+
+func NewPermChecker(ac arvados.Client) permChecker {
+       ac.AuthToken = ""
+       return &cachingPermChecker{
+               Client:     &ac,
+               cache:      make(map[string]cacheEnt),
+               maxCurrent: 16,
+       }
+}
+
+type cacheEnt struct {
+       time.Time
+       allowed bool
+}
+
+type cachingPermChecker struct {
+       *arvados.Client
+       cache      map[string]cacheEnt
+       maxCurrent int
+}
+
+func (pc *cachingPermChecker) SetToken(token string) {
+       pc.Client.AuthToken = token
+}
+
+func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+       pc.tidy()
+       now := time.Now()
+       if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
+               debugLogf("perm (cached): %+q %+q ...%v", pc.Client.AuthToken, uuid, perm.allowed)
+               return perm.allowed, nil
+       }
+       var buf map[string]interface{}
+       path, err := pc.PathForUUID("get", uuid)
+       if err != nil {
+               return false, err
+       }
+       err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
+               "select": {`["uuid"]`},
+       })
+
+       var allowed bool
+       if err == nil {
+               allowed = true
+       } else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+               allowed = false
+       } else {
+               errorLogf("perm err: %+q %+q: %T %s", pc.Client.AuthToken, uuid, err, err)
+               return false, err
+       }
+       debugLogf("perm: %+q %+q ...%v", pc.Client.AuthToken, uuid, allowed)
+       pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
+       return allowed, nil
+}
+
+func (pc *cachingPermChecker) tidy() {
+       if len(pc.cache) <= pc.maxCurrent*2 {
+               return
+       }
+       tooOld := time.Now().Add(-minPermCacheAge)
+       for uuid, t := range pc.cache {
+               if t.Before(tooOld) {
+                       delete(pc.cache, uuid)
+               }
+       }
+       pc.maxCurrent = len(pc.cache)
+}
diff --git a/services/ws/pg.go b/services/ws/pg.go
new file mode 100644 (file)
index 0000000..08fbee1
--- /dev/null
@@ -0,0 +1,181 @@
+package main
+
+import (
+       "database/sql"
+       "fmt"
+       "log"
+       "strconv"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/lib/pq"
+)
+
+type pgConfig map[string]string
+
+func (c pgConfig) ConnectionString() string {
+       s := ""
+       for k, v := range c {
+               s += k
+               s += "='"
+               s += strings.Replace(
+                       strings.Replace(v, `\`, `\\`, -1),
+                       `'`, `\'`, -1)
+               s += "' "
+       }
+       return s
+}
+
+type pgEventSource struct {
+       DataSource string
+       QueueSize  int
+
+       db         *sql.DB
+       pqListener *pq.Listener
+       sinks      map[*pgEventSink]bool
+       setupOnce  sync.Once
+       mtx        sync.Mutex
+       shutdown   chan error
+}
+
+func (ps *pgEventSource) setup() {
+       ps.shutdown = make(chan error, 1)
+       ps.sinks = make(map[*pgEventSink]bool)
+
+       db, err := sql.Open("postgres", ps.DataSource)
+       if err != nil {
+               log.Fatalf("sql.Open: %s", err)
+       }
+       if err = db.Ping(); err != nil {
+               log.Fatalf("db.Ping: %s", err)
+       }
+       ps.db = db
+
+       ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+               if err != nil {
+                       // Until we have a mechanism for catching up
+                       // on missed events, we cannot recover from a
+                       // dropped connection without breaking our
+                       // promises to clients.
+                       ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
+               }
+       })
+       err = ps.pqListener.Listen("logs")
+       if err != nil {
+               log.Fatal(err)
+       }
+       debugLogf("pgEventSource listening")
+
+       go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+       eventQueue := make(chan *event, ps.QueueSize)
+
+       go func() {
+               for e := range eventQueue {
+                       // Wait for the "select ... from logs" call to
+                       // finish. This limits max concurrent queries
+                       // to ps.QueueSize. Without this, max
+                       // concurrent queries would be bounded by
+                       // client_count X client_queue_size.
+                       e.Detail()
+                       debugLogf("event %d detail %+v", e.Serial, e.Detail())
+                       ps.mtx.Lock()
+                       for sink := range ps.sinks {
+                               sink.channel <- e
+                       }
+                       ps.mtx.Unlock()
+               }
+       }()
+
+       var serial uint64
+       ticker := time.NewTicker(time.Minute)
+       defer ticker.Stop()
+       for {
+               select {
+               case err, ok := <-ps.shutdown:
+                       if ok {
+                               debugLogf("shutdown on error: %s", err)
+                       }
+                       close(eventQueue)
+                       return
+
+               case <-ticker.C:
+                       debugLogf("pgEventSource listener ping")
+                       ps.pqListener.Ping()
+
+               case pqEvent, ok := <-ps.pqListener.Notify:
+                       if !ok {
+                               close(eventQueue)
+                               return
+                       }
+                       if pqEvent.Channel != "logs" {
+                               continue
+                       }
+                       logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+                       if err != nil {
+                               log.Printf("bad notify payload: %+v", pqEvent)
+                               continue
+                       }
+                       serial++
+                       e := &event{
+                               LogID:    logID,
+                               Received: time.Now(),
+                               Serial:   serial,
+                               db:       ps.db,
+                       }
+                       debugLogf("event %d %+v", e.Serial, e)
+                       eventQueue <- e
+                       go e.Detail()
+               }
+       }
+}
+
+// NewSink subscribes to the event source. NewSink returns an
+// eventSink, whose Channel() method returns a channel: a pointer to
+// each subsequent event will be sent to that channel.
+//
+// The caller must ensure events are received from the sink channel as
+// quickly as possible because when one sink stops being ready, all
+// other sinks block.
+func (ps *pgEventSource) NewSink() eventSink {
+       ps.setupOnce.Do(ps.setup)
+       sink := &pgEventSink{
+               channel: make(chan *event, 1),
+               source:  ps,
+       }
+       ps.mtx.Lock()
+       ps.sinks[sink] = true
+       ps.mtx.Unlock()
+       return sink
+}
+
+func (ps *pgEventSource) DB() *sql.DB {
+       ps.setupOnce.Do(ps.setup)
+       return ps.db
+}
+
+type pgEventSink struct {
+       channel chan *event
+       source  *pgEventSource
+}
+
+func (sink *pgEventSink) Channel() <-chan *event {
+       return sink.channel
+}
+
+func (sink *pgEventSink) Stop() {
+       go func() {
+               // Ensure this sink cannot fill up and block the
+               // server-side queue (which otherwise could in turn
+               // block our mtx.Lock() here)
+               for _ = range sink.channel {
+               }
+       }()
+       sink.source.mtx.Lock()
+       delete(sink.source.sinks, sink)
+       sink.source.mtx.Unlock()
+       close(sink.channel)
+}
diff --git a/services/ws/router.go b/services/ws/router.go
new file mode 100644 (file)
index 0000000..2a4e52e
--- /dev/null
@@ -0,0 +1,74 @@
+package main
+
+import (
+       "database/sql"
+       "encoding/json"
+       "log"
+       "net/http"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "golang.org/x/net/websocket"
+)
+
+type router struct {
+       Config *Config
+
+       eventSource eventSource
+       mux         *http.ServeMux
+       setupOnce   sync.Once
+}
+
+func (rtr *router) setup() {
+       rtr.mux = http.NewServeMux()
+       rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
+       rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
+}
+
+func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (session, error)) *websocket.Server {
+       handler := &handler{
+               PingTimeout: rtr.Config.PingTimeout.Duration(),
+               QueueSize:   rtr.Config.ClientEventQueue,
+               NewSession:  func(ws wsConn) (session, error) {
+                       return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
+               },
+       }
+       return &websocket.Server{
+               Handshake: func(c *websocket.Config, r *http.Request) error {
+                       return nil
+               },
+               Handler: websocket.Handler(func(ws *websocket.Conn) {
+                       logj("Type", "connect",
+                               "RemoteAddr", ws.Request().RemoteAddr)
+                       t0 := time.Now()
+
+                       sink := rtr.eventSource.NewSink()
+                       stats := handler.Handle(ws, sink.Channel())
+
+                       logj("Type", "disconnect",
+                               "RemoteAddr", ws.Request().RemoteAddr,
+                               "Elapsed", time.Now().Sub(t0).Seconds(),
+                               "Stats", stats)
+
+                       sink.Stop()
+                       ws.Close()
+               }),
+       }
+}
+
+func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       rtr.setupOnce.Do(rtr.setup)
+       logj("Type", "request",
+               "RemoteAddr", req.RemoteAddr,
+               "X-Forwarded-For", req.Header.Get("X-Forwarded-For"))
+       rtr.mux.ServeHTTP(resp, req)
+}
+
+func reqLog(m map[string]interface{}) {
+       j, err := json.Marshal(m)
+       if err != nil {
+               log.Fatal(err)
+       }
+       log.Print(string(j))
+}
diff --git a/services/ws/session.go b/services/ws/session.go
new file mode 100644 (file)
index 0000000..a0658d9
--- /dev/null
@@ -0,0 +1,27 @@
+package main
+
+type session interface {
+       // Receive processes a message received from the client. If
+       // the returned list of messages is non-nil, they will be
+       // queued for sending to the client.
+       Receive(map[string]interface{}, []byte) [][]byte
+
+       // Filter returns true if the event should be queued for
+       // sending to the client. It should return as fast as
+       // possible, and must not block.
+       Filter(*event) bool
+
+       // EventMessage encodes the given event (from the front of the
+       // queue) into a form suitable to send to the client. If a
+       // non-nil error is returned, the connection is terminated. If
+       // the returned buffer is empty, nothing is sent to the client
+       // and the event is not counted in statistics.
+       //
+       // Unlike Filter, EventMessage can block without affecting
+       // other connections. If EventMessage is slow, additional
+       // incoming events will be queued. If the event queue fills
+       // up, the connection will be dropped.
+       EventMessage(*event) ([]byte, error)
+
+       debugLogf(string, ...interface{})
+}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
new file mode 100644 (file)
index 0000000..33cdb2f
--- /dev/null
@@ -0,0 +1,278 @@
+package main
+
+import (
+       "database/sql"
+       "encoding/json"
+       "errors"
+       "log"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var (
+       errQueueFull   = errors.New("client queue full")
+       errFrameTooBig = errors.New("frame too big")
+
+       sendObjectAttributes = []string{"state", "name"}
+
+       v0subscribeOK   = []byte(`{"status":200}`)
+       v0subscribeFail = []byte(`{"status":400}`)
+)
+
+type v0session struct {
+       ws            wsConn
+       db            *sql.DB
+       permChecker   permChecker
+       subscriptions []v0subscribe
+       mtx           sync.Mutex
+       setupOnce     sync.Once
+}
+
+func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
+       sess := &v0session{
+               ws:          ws,
+               db:          db,
+               permChecker: NewPermChecker(ac),
+       }
+
+       err := ws.Request().ParseForm()
+       if err != nil {
+               log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+               return nil, err
+       }
+       token := ws.Request().Form.Get("api_token")
+       sess.permChecker.SetToken(token)
+       sess.debugLogf("token = %+q", token)
+
+       return sess, nil
+}
+
+func (sess *v0session) debugLogf(s string, args ...interface{}) {
+       args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
+       debugLogf("%s "+s, args...)
+}
+
+func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
+       sess.debugLogf("received message: %+v", msg)
+       var sub v0subscribe
+       if err := json.Unmarshal(buf, &sub); err != nil {
+               sess.debugLogf("ignored unrecognized request: %s", err)
+               return nil
+       }
+       if sub.Method == "subscribe" {
+               sub.prepare()
+               sess.debugLogf("subscription: %v", sub)
+               sess.mtx.Lock()
+               sess.subscriptions = append(sess.subscriptions, sub)
+               sess.mtx.Unlock()
+
+               return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
+       }
+       return [][]byte{v0subscribeFail}
+}
+
+func (sess *v0session) EventMessage(e *event) ([]byte, error) {
+       detail := e.Detail()
+       if detail == nil {
+               return nil, nil
+       }
+
+       ok, err := sess.permChecker.Check(detail.ObjectUUID)
+       if err != nil || !ok {
+               return nil, err
+       }
+
+       msg := map[string]interface{}{
+               "msgID":             e.Serial,
+               "id":                detail.ID,
+               "uuid":              detail.UUID,
+               "object_uuid":       detail.ObjectUUID,
+               "object_owner_uuid": detail.ObjectOwnerUUID,
+               "event_type":        detail.EventType,
+       }
+       if detail.Properties != nil && detail.Properties["text"] != nil {
+               msg["properties"] = detail.Properties
+       } else {
+               msgProps := map[string]map[string]interface{}{}
+               for _, ak := range []string{"old_attributes", "new_attributes"} {
+                       eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
+                       if !ok {
+                               continue
+                       }
+                       msgAttrs := map[string]interface{}{}
+                       for _, k := range sendObjectAttributes {
+                               if v, ok := eventAttrs[k]; ok {
+                                       msgAttrs[k] = v
+                               }
+                       }
+                       msgProps[ak] = msgAttrs
+               }
+               msg["properties"] = msgProps
+       }
+       return json.Marshal(msg)
+}
+
+func (sess *v0session) Filter(e *event) bool {
+       sess.mtx.Lock()
+       defer sess.mtx.Unlock()
+       for _, sub := range sess.subscriptions {
+               if sub.match(e) {
+                       return true
+               }
+       }
+       return false
+}
+
+func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
+       if sub.LastLogID == 0 {
+               return
+       }
+       debugLogf("getOldEvents(%d)", sub.LastLogID)
+       // Here we do a "select id" query and queue an event for every
+       // log since the given ID, then use (*event)Detail() to
+       // retrieve the whole row and decide whether to send it. This
+       // approach is very inefficient if the subscriber asks for
+       // last_log_id==1, even if the filters end up matching very
+       // few events.
+       //
+       // To mitigate this, filter on "created > 10 minutes ago" when
+       // retrieving the list of old event IDs to consider.
+       rows, err := sess.db.Query(
+               `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
+               sub.LastLogID,
+               time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
+       if err != nil {
+               errorLogf("db.Query: %s", err)
+               return
+       }
+       for rows.Next() {
+               var id uint64
+               err := rows.Scan(&id)
+               if err != nil {
+                       errorLogf("Scan: %s", err)
+                       continue
+               }
+               e := &event{
+                       LogID:    id,
+                       Received: time.Now(),
+                       db:       sess.db,
+               }
+               if !sub.match(e) {
+                       debugLogf("skip old event %+v", e)
+                       continue
+               }
+               msg, err := sess.EventMessage(e)
+               if err != nil {
+                       debugLogf("event marshal: %s", err)
+                       continue
+               }
+               debugLogf("old event: %s", string(msg))
+               msgs = append(msgs, msg)
+       }
+       if err := rows.Err(); err != nil {
+               errorLogf("db.Query: %s", err)
+       }
+       return
+}
+
+type v0subscribe struct {
+       Method    string
+       Filters   []v0filter
+       LastLogID int64 `json:"last_log_id"`
+
+       funcs []func(*event) bool
+}
+
+type v0filter [3]interface{}
+
+func (sub *v0subscribe) match(e *event) bool {
+       detail := e.Detail()
+       if detail == nil {
+               debugLogf("match(%d): failed on no detail", e.LogID)
+               return false
+       }
+       for i, f := range sub.funcs {
+               if !f(e) {
+                       debugLogf("match(%d): failed on func %d", e.LogID, i)
+                       return false
+               }
+       }
+       debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
+       return true
+}
+
+func (sub *v0subscribe) prepare() {
+       for _, f := range sub.Filters {
+               if len(f) != 3 {
+                       continue
+               }
+               if col, ok := f[0].(string); ok && col == "event_type" {
+                       op, ok := f[1].(string)
+                       if !ok || op != "in" {
+                               continue
+                       }
+                       arr, ok := f[2].([]interface{})
+                       if !ok {
+                               continue
+                       }
+                       var strs []string
+                       for _, s := range arr {
+                               if s, ok := s.(string); ok {
+                                       strs = append(strs, s)
+                               }
+                       }
+                       sub.funcs = append(sub.funcs, func(e *event) bool {
+                               debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
+                               for _, s := range strs {
+                                       if s == e.Detail().EventType {
+                                               return true
+                                       }
+                               }
+                               return false
+                       })
+               } else if ok && col == "created_at" {
+                       op, ok := f[1].(string)
+                       if !ok {
+                               continue
+                       }
+                       tstr, ok := f[2].(string)
+                       if !ok {
+                               continue
+                       }
+                       t, err := time.Parse(time.RFC3339Nano, tstr)
+                       if err != nil {
+                               debugLogf("time.Parse(%q): %s", tstr, err)
+                               continue
+                       }
+                       switch op {
+                       case ">=":
+                               sub.funcs = append(sub.funcs, func(e *event) bool {
+                                       debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
+                                       return !e.Detail().CreatedAt.Before(t)
+                               })
+                       case "<=":
+                               sub.funcs = append(sub.funcs, func(e *event) bool {
+                                       debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
+                                       return !e.Detail().CreatedAt.After(t)
+                               })
+                       case ">":
+                               sub.funcs = append(sub.funcs, func(e *event) bool {
+                                       debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
+                                       return e.Detail().CreatedAt.After(t)
+                               })
+                       case "<":
+                               sub.funcs = append(sub.funcs, func(e *event) bool {
+                                       debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
+                                       return e.Detail().CreatedAt.Before(t)
+                               })
+                       case "=":
+                               sub.funcs = append(sub.funcs, func(e *event) bool {
+                                       debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
+                                       return e.Detail().CreatedAt.Equal(t)
+                               })
+                       }
+               }
+       }
+}
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
new file mode 100644 (file)
index 0000000..60d12c4
--- /dev/null
@@ -0,0 +1,12 @@
+package main
+
+import (
+       "database/sql"
+       "errors"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+func NewSessionV1(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
+       return nil, errors.New("Not implemented")
+}