8460: Merge branch 'master' into 8460-websocket-go
authorTom Clegg <tom@curoverse.com>
Tue, 15 Nov 2016 18:25:03 +0000 (13:25 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 15 Nov 2016 18:25:03 +0000 (13:25 -0500)
18 files changed:
build/run-build-packages.sh
build/run-tests.sh
sdk/go/arvados/client.go
sdk/go/arvados/log.go [new file with mode: 0644]
sdk/go/config/load.go
sdk/python/tests/nginx.conf
sdk/python/tests/run_test_server.py
services/api/config/application.default.yml
services/ws/config.go [new file with mode: 0644]
services/ws/event.go [new file with mode: 0644]
services/ws/handler.go [new file with mode: 0644]
services/ws/main.go [new file with mode: 0644]
services/ws/permission.go [new file with mode: 0644]
services/ws/pg.go [new file with mode: 0644]
services/ws/router.go [new file with mode: 0644]
services/ws/session.go [new file with mode: 0644]
services/ws/session_v0.go [new file with mode: 0644]
services/ws/session_v1.go [new file with mode: 0644]

index 320f9d445c3a052a62bf5b8560b2080c98b06904..0a4559f95b3518aff58246fe4839052a7d3746b8 100755 (executable)
@@ -427,6 +427,8 @@ package_go_binary services/keepstore keepstore \
     "Keep storage daemon, accessible to clients on the LAN"
 package_go_binary services/keep-web keep-web \
     "Static web hosting service for user data stored in Arvados Keep"
+package_go_binary services/ws arvados-ws \
+    "Arvados Websocket server"
 package_go_binary tools/keep-block-check keep-block-check \
     "Verify that all data from one set of Keep servers to another was copied"
 package_go_binary tools/keep-rsync keep-rsync \
index 8959cfbe09c3ea7ac6ded2142b626259787d2121..c771bdc0ada64c1579f945bf3f148ad13896114c 100755 (executable)
@@ -79,6 +79,7 @@ services/nodemanager
 services/crunch-run
 services/crunch-dispatch-local
 services/crunch-dispatch-slurm
+services/ws
 sdk/cli
 sdk/pam
 sdk/python
@@ -268,10 +269,11 @@ start_api() {
 }
 
 start_nginx_proxy_services() {
-    echo 'Starting keepproxy, keep-web, arv-git-httpd, and nginx ssl proxy...'
+    echo 'Starting keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
     cd "$WORKSPACE" \
         && python sdk/python/tests/run_test_server.py start_keep_proxy \
         && python sdk/python/tests/run_test_server.py start_keep-web \
+        && python sdk/python/tests/run_test_server.py start_ws \
         && python sdk/python/tests/run_test_server.py start_arv-git-httpd \
         && python sdk/python/tests/run_test_server.py start_nginx \
         && export ARVADOS_TEST_PROXY_SERVICES=1
@@ -283,6 +285,7 @@ stop_services() {
         cd "$WORKSPACE" \
             && python sdk/python/tests/run_test_server.py stop_nginx \
             && python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
+            && python sdk/python/tests/run_test_server.py stop_ws \
             && python sdk/python/tests/run_test_server.py stop_keep-web \
             && python sdk/python/tests/run_test_server.py stop_keep_proxy
     fi
@@ -765,6 +768,7 @@ gostuff=(
     services/crunch-dispatch-local
     services/crunch-dispatch-slurm
     services/crunch-run
+    services/ws
     tools/keep-block-check
     tools/keep-exercise
     tools/keep-rsync
index 36f4eb52ae298982dfa09ddf82b0cea08c2604f7..0c18d38974f8be6ce99460ef2713e220a2cff403 100644 (file)
@@ -41,6 +41,8 @@ type Client struct {
        // callers who use a Client to initialize an
        // arvadosclient.ArvadosClient.)
        KeepServiceURIs []string `json:",omitempty"`
+
+       dd *DiscoveryDocument
 }
 
 // The default http.Client used by a Client with Insecure==true and
@@ -198,14 +200,83 @@ func (c *Client) apiURL(path string) string {
 
 // DiscoveryDocument is the Arvados server's description of itself.
 type DiscoveryDocument struct {
-       DefaultCollectionReplication int   `json:"defaultCollectionReplication"`
-       BlobSignatureTTL             int64 `json:"blobSignatureTtl"`
+       BasePath                     string              `json:"basePath"`
+       DefaultCollectionReplication int                 `json:"defaultCollectionReplication"`
+       BlobSignatureTTL             int64               `json:"blobSignatureTtl"`
+       Schemas                      map[string]Schema   `json:"schemas"`
+       Resources                    map[string]Resource `json:"resources"`
+}
+
+type Resource struct {
+       Methods map[string]ResourceMethod `json:"methods"`
+}
+
+type ResourceMethod struct {
+       HTTPMethod string         `json:"httpMethod"`
+       Path       string         `json:"path"`
+       Response   MethodResponse `json:"response"`
+}
+
+type MethodResponse struct {
+       Ref string `json:"$ref"`
+}
+
+type Schema struct {
+       UUIDPrefix string `json:"uuidPrefix"`
 }
 
 // DiscoveryDocument returns a *DiscoveryDocument. The returned object
 // should not be modified: the same object may be returned by
 // subsequent calls.
 func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+       if c.dd != nil {
+               return c.dd, nil
+       }
        var dd DiscoveryDocument
-       return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+       err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+       if err != nil {
+               return nil, err
+       }
+       c.dd = &dd
+       return c.dd, nil
+}
+
+func (c *Client) PathForUUID(method, uuid string) (string, error) {
+       if len(uuid) != 27 {
+               return "", fmt.Errorf("invalid UUID: %q", uuid)
+       }
+       dd, err := c.DiscoveryDocument()
+       if err != nil {
+               return "", err
+       }
+       infix := uuid[6:11]
+       var model string
+       for m, s := range dd.Schemas {
+               if s.UUIDPrefix == infix {
+                       model = m
+                       break
+               }
+       }
+       if model == "" {
+               return "", fmt.Errorf("unrecognized UUID infix: %q", infix)
+       }
+       var resource string
+       for r, rsc := range dd.Resources {
+               if rsc.Methods["get"].Response.Ref == model {
+                       resource = r
+                       break
+               }
+       }
+       if resource == "" {
+               return "", fmt.Errorf("no resource for model: %q", model)
+       }
+       m, ok := dd.Resources[resource].Methods[method]
+       if !ok {
+               return "", fmt.Errorf("no method %q for resource %q", method, resource)
+       }
+       path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1)
+       if path[0] == '/' {
+               path = path[1:]
+       }
+       return path, nil
 }
diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go
new file mode 100644 (file)
index 0000000..caea04c
--- /dev/null
@@ -0,0 +1,16 @@
+package arvados
+
+import (
+       "time"
+)
+
+// Log is an arvados#log record
+type Log struct {
+       ID              uint64                 `json:"id"`
+       UUID            string                 `json:"uuid"`
+       ObjectUUID      string                 `json:"object_uuid"`
+       ObjectOwnerUUID string                 `json:"object_owner_uuid"`
+       EventType       string                 `json:"event_type"`
+       Properties      map[string]interface{} `json:"properties"`
+       CreatedAt       *time.Time             `json:"created_at,omitempty"`
+}
index 9c65d65e84a57d9120d69dd84912615ff3949e35..2bbb440fb31211241a78a6f15c788f7e4d706334 100644 (file)
@@ -22,3 +22,8 @@ func LoadFile(cfg interface{}, configPath string) error {
        }
        return nil
 }
+
+// Dump returns a YAML representation of cfg.
+func Dump(cfg interface{}) ([]byte, error) {
+       return yaml.Marshal(cfg)
+}
index 2b8b6ca1c4ad531c29bfd1c9a149da7c9bdf3599..006604077d457d286cdb9148332e087a0204313c 100644 (file)
@@ -54,4 +54,20 @@ http {
       proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/;
     }
   }
+  upstream ws {
+    server localhost:{{WSPORT}};
+  }
+  server {
+    listen *:{{WSSPORT}} ssl default_server;
+    server_name ~^(?<request_host>.*)$;
+    ssl_certificate {{SSLCERT}};
+    ssl_certificate_key {{SSLKEY}};
+    location  / {
+      proxy_pass http://ws;
+      proxy_set_header Upgrade $http_upgrade;
+      proxy_set_header Connection "upgrade";
+      proxy_set_header Host $request_host:{{WSPORT}};
+      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+    }
+  }
 }
index 642b7ccbad51846a9f1c25acc86f6b0505897c62..5ef5e2a9f5c8b83e8c7ece912ea7eba6624968c8 100644 (file)
@@ -44,6 +44,7 @@ if not os.path.exists(TEST_TMPDIR):
 
 my_api_host = None
 _cached_config = {}
+_cached_db_config = {}
 
 def find_server_pid(PID_PATH, wait=10):
     now = time.time()
@@ -284,10 +285,16 @@ def run(leave_running_atexit=False):
         os.makedirs(gitdir)
     subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball])
 
+    # The nginx proxy isn't listening here yet, but we need to choose
+    # the wss:// port now so we can write the API server config file.
+    wss_port = find_available_port()
+    _setport('wss', wss_port)
+
     port = find_available_port()
     env = os.environ.copy()
     env['RAILS_ENV'] = 'test'
-    env['ARVADOS_WEBSOCKETS'] = 'yes'
+    env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
+    env.pop('ARVADOS_WEBSOCKETS', None)
     env.pop('ARVADOS_TEST_API_HOST', None)
     env.pop('ARVADOS_API_HOST', None)
     env.pop('ARVADOS_API_HOST_INSECURE', None)
@@ -360,6 +367,45 @@ def stop(force=False):
         kill_server_pid(_pidfile('api'))
         my_api_host = None
 
+def run_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    stop_ws()
+    port = find_available_port()
+    conf = os.path.join(TEST_TMPDIR, 'ws.yml')
+    with open(conf, 'w') as f:
+        f.write("""
+Client:
+  APIHost: {}
+  Insecure: true
+Listen: :{}
+Postgres:
+  host: {}
+  dbname: {}
+  user: {}
+  password: {}
+  sslmode: require
+        """.format(os.environ['ARVADOS_API_HOST'],
+                   port,
+                   _dbconfig('host'),
+                   _dbconfig('database'),
+                   _dbconfig('username'),
+                   _dbconfig('password')))
+    logf = open(_fifo2stderr('ws'), 'w')
+    ws = subprocess.Popen(
+        ["ws", "-config", conf],
+        stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+    with open(_pidfile('ws'), 'w') as f:
+        f.write(str(ws.pid))
+    _wait_until_port_listens(port)
+    _setport('ws', port)
+    return port
+
+def stop_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    kill_server_pid(_pidfile('ws'))
+
 def _start_keep(n, keep_args):
     keep0 = tempfile.mkdtemp()
     port = find_available_port()
@@ -545,6 +591,8 @@ def run_nginx():
     nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
     nginxconf['GITPORT'] = _getport('arv-git-httpd')
     nginxconf['GITSSLPORT'] = find_available_port()
+    nginxconf['WSPORT'] = _getport('ws')
+    nginxconf['WSSPORT'] = _getport('wss')
     nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
     nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
     nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
@@ -593,7 +641,15 @@ def _getport(program):
     except IOError:
         return 9
 
+def _dbconfig(key):
+    global _cached_db_config
+    if not _cached_db_config:
+        _cached_db_config = yaml.load(open(os.path.join(
+            SERVICES_SRC_DIR, 'api', 'config', 'database.yml')))
+    return _cached_db_config['test'][key]
+
 def _apiconfig(key):
+    global _cached_config
     if _cached_config:
         return _cached_config[key]
     def _load(f, required=True):
@@ -647,6 +703,7 @@ class TestCaseWithServers(unittest.TestCase):
     original environment.
     """
     MAIN_SERVER = None
+    WS_SERVER = None
     KEEP_SERVER = None
     KEEP_PROXY_SERVER = None
     KEEP_WEB_SERVER = None
@@ -667,6 +724,7 @@ class TestCaseWithServers(unittest.TestCase):
         os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
         for server_kwargs, start_func, stop_func in (
                 (cls.MAIN_SERVER, run, reset),
+                (cls.WS_SERVER, run_ws, stop_ws),
                 (cls.KEEP_SERVER, run_keep, stop_keep),
                 (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
                 (cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
@@ -693,6 +751,7 @@ class TestCaseWithServers(unittest.TestCase):
 if __name__ == "__main__":
     actions = [
         'start', 'stop',
+        'start_ws', 'stop_ws',
         'start_keep', 'stop_keep',
         'start_keep_proxy', 'stop_keep_proxy',
         'start_keep-web', 'stop_keep-web',
@@ -725,6 +784,10 @@ if __name__ == "__main__":
             print(host)
     elif args.action == 'stop':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
+    elif args.action == 'start_ws':
+        run_ws()
+    elif args.action == 'stop_ws':
+        stop_ws()
     elif args.action == 'start_keep':
         run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
     elif args.action == 'stop_keep':
index a9aa953f9f36e948dc57d34336c7d3f1cc1df43c..ab560d7f6b79724a9a3ff1a22f6a413e16eb18c4 100644 (file)
@@ -444,3 +444,4 @@ test:
   workbench_address: https://localhost:3001/
   git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %>
   git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %>
+  websocket_address: "wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"
diff --git a/services/ws/config.go b/services/ws/config.go
new file mode 100644 (file)
index 0000000..9c2e80a
--- /dev/null
@@ -0,0 +1,37 @@
+package main
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type Config struct {
+       Client   arvados.Client
+       Postgres pgConfig
+       Listen   string
+       Debug    bool
+
+       PingTimeout      arvados.Duration
+       ClientEventQueue int
+       ServerEventQueue int
+}
+
+func DefaultConfig() Config {
+       return Config{
+               Client: arvados.Client{
+                       APIHost: "localhost:443",
+               },
+               Postgres: pgConfig{
+                       "dbname":          "arvados_production",
+                       "user":            "arvados",
+                       "password":        "xyzzy",
+                       "host":            "localhost",
+                       "connect_timeout": "30",
+                       "sslmode":         "require",
+               },
+               PingTimeout:      arvados.Duration(time.Minute),
+               ClientEventQueue: 64,
+               ServerEventQueue: 4,
+       }
+}
diff --git a/services/ws/event.go b/services/ws/event.go
new file mode 100644 (file)
index 0000000..09c9d0f
--- /dev/null
@@ -0,0 +1,63 @@
+package main
+
+import (
+       "database/sql"
+       "log"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/ghodss/yaml"
+)
+
+type eventSink interface {
+       Channel() <-chan *event
+       Stop()
+}
+
+type eventSource interface {
+       NewSink() eventSink
+}
+
+type event struct {
+       LogID    uint64
+       Received time.Time
+       Serial   uint64
+
+       db     *sql.DB
+       logRow *arvados.Log
+       err    error
+       mtx    sync.Mutex
+}
+
+// Detail returns the database row corresponding to the event. It can
+// be called safely from multiple goroutines. Only one attempt will be
+// made. If the database row cannot be retrieved, Detail returns nil.
+func (e *event) Detail() *arvados.Log {
+       e.mtx.Lock()
+       defer e.mtx.Unlock()
+       if e.logRow != nil || e.err != nil {
+               return e.logRow
+       }
+       var logRow arvados.Log
+       var propYAML []byte
+       e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
+               &logRow.ID,
+               &logRow.UUID,
+               &logRow.ObjectUUID,
+               &logRow.ObjectOwnerUUID,
+               &logRow.EventType,
+               &logRow.CreatedAt,
+               &propYAML)
+       if e.err != nil {
+               log.Printf("retrieving log row %d: %s", e.LogID, e.err)
+               return nil
+       }
+       e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
+       if e.err != nil {
+               log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err)
+               return nil
+       }
+       e.logRow = &logRow
+       return e.logRow
+}
diff --git a/services/ws/handler.go b/services/ws/handler.go
new file mode 100644 (file)
index 0000000..1c9d5ba
--- /dev/null
@@ -0,0 +1,156 @@
+package main
+
+import (
+       "encoding/json"
+       "io"
+       "log"
+       "net/http"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type wsConn interface {
+       io.ReadWriter
+       Request() *http.Request
+       SetReadDeadline(time.Time) error
+       SetWriteDeadline(time.Time) error
+}
+
+type handler struct {
+       Client      arvados.Client
+       PingTimeout time.Duration
+       QueueSize   int
+       NewSession  func(wsConn, arvados.Client) (session, error)
+}
+
+func (h *handler) Handle(ws wsConn, events <-chan *event) {
+       sess, err := h.NewSession(ws, h.Client)
+       if err != nil {
+               log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
+               return
+       }
+
+       queue := make(chan *event, h.QueueSize)
+
+       stopped := make(chan struct{})
+       stop := make(chan error, 5)
+
+       go func() {
+               buf := make([]byte, 2<<20)
+               for {
+                       select {
+                       case <-stopped:
+                               return
+                       default:
+                       }
+                       ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
+                       n, err := ws.Read(buf)
+                       sess.debugLogf("received frame: %q", buf[:n])
+                       if err == nil && n == len(buf) {
+                               err = errFrameTooBig
+                       }
+                       if err != nil {
+                               if err != io.EOF {
+                                       sess.debugLogf("handler: read: %s", err)
+                               }
+                               stop <- err
+                               return
+                       }
+                       msg := make(map[string]interface{})
+                       err = json.Unmarshal(buf[:n], &msg)
+                       if err != nil {
+                               sess.debugLogf("handler: unmarshal: %s", err)
+                               stop <- err
+                               return
+                       }
+                       sess.Receive(msg, buf[:n])
+               }
+       }()
+
+       go func() {
+               for e := range queue {
+                       if e == nil {
+                               ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+                               _, err := ws.Write([]byte("{}"))
+                               if err != nil {
+                                       sess.debugLogf("handler: write {}: %s", err)
+                                       stop <- err
+                                       break
+                               }
+                               continue
+                       }
+
+                       buf, err := sess.EventMessage(e)
+                       if err != nil {
+                               sess.debugLogf("EventMessage %d: err %s", err)
+                               stop <- err
+                               break
+                       } else if len(buf) == 0 {
+                               sess.debugLogf("EventMessage %d: skip", e.Serial)
+                               continue
+                       }
+
+                       sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
+                       ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+                       _, err = ws.Write(buf)
+                       if err != nil {
+                               sess.debugLogf("handler: write: %s", err)
+                               stop <- err
+                               break
+                       }
+                       sess.debugLogf("handler: sent event %d", e.Serial)
+               }
+               for _ = range queue {
+               }
+       }()
+
+       // Filter incoming events against the current subscription
+       // list, and forward matching events to the outgoing message
+       // queue. Close the queue and return when the "stopped"
+       // channel closes or the incoming event stream ends. Shut down
+       // the handler if the outgoing queue fills up.
+       go func() {
+               send := func(e *event) {
+                       select {
+                       case queue <- e:
+                       default:
+                               stop <- errQueueFull
+                       }
+               }
+
+               ticker := time.NewTicker(h.PingTimeout)
+               defer ticker.Stop()
+
+               for {
+                       var e *event
+                       var ok bool
+                       select {
+                       case <-stopped:
+                               close(queue)
+                               return
+                       case <-ticker.C:
+                               // If the outgoing queue is empty,
+                               // send an empty message. This can
+                               // help detect a disconnected network
+                               // socket, and prevent an idle socket
+                               // from being closed.
+                               if len(queue) == 0 {
+                                       queue <- nil
+                               }
+                               continue
+                       case e, ok = <-events:
+                               if !ok {
+                                       close(queue)
+                                       return
+                               }
+                       }
+                       if sess.Filter(e) {
+                               send(e)
+                       }
+               }
+       }()
+
+       <-stop
+       close(stopped)
+}
diff --git a/services/ws/main.go b/services/ws/main.go
new file mode 100644 (file)
index 0000000..a143ae9
--- /dev/null
@@ -0,0 +1,56 @@
+package main
+
+import (
+       "flag"
+       "fmt"
+       "log"
+       "net/http"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/config"
+)
+
+var debugLogf = func(string, ...interface{}) {}
+
+func main() {
+       configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+       dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
+       cfg := DefaultConfig()
+       flag.Parse()
+
+       err := config.LoadFile(&cfg, *configPath)
+       if err != nil {
+               log.Fatal(err)
+       }
+       if cfg.Debug {
+               debugLogf = log.Printf
+       }
+
+       if *dumpConfig {
+               txt, err := config.Dump(&cfg)
+               if err != nil {
+                       log.Fatal(err)
+               }
+               fmt.Print(string(txt))
+               return
+       }
+
+       eventSource := &pgEventSource{
+               DataSource: cfg.Postgres.ConnectionString(),
+               QueueSize:  cfg.ServerEventQueue,
+       }
+       srv := &http.Server{
+               Addr:           cfg.Listen,
+               ReadTimeout:    time.Minute,
+               WriteTimeout:   time.Minute,
+               MaxHeaderBytes: 1 << 20,
+               Handler: &router{
+                       Config:      &cfg,
+                       eventSource: eventSource,
+               },
+       }
+       eventSource.NewSink().Stop()
+
+       log.Printf("listening at %s", srv.Addr)
+       log.Fatal(srv.ListenAndServe())
+}
diff --git a/services/ws/permission.go b/services/ws/permission.go
new file mode 100644 (file)
index 0000000..b2b962c
--- /dev/null
@@ -0,0 +1,78 @@
+package main
+
+import (
+       "net/http"
+       "net/url"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+const (
+       maxPermCacheAge = time.Hour
+       minPermCacheAge = 5 * time.Minute
+)
+
+type permChecker interface {
+       SetToken(token string)
+       Check(uuid string) (bool, error)
+}
+
+func NewPermChecker(ac arvados.Client) permChecker {
+       ac.AuthToken = ""
+       return &cachingPermChecker{
+               Client:     &ac,
+               cache:      make(map[string]time.Time),
+               maxCurrent: 16,
+       }
+}
+
+type cachingPermChecker struct {
+       *arvados.Client
+       cache      map[string]time.Time
+       maxCurrent int
+}
+
+func (pc *cachingPermChecker) SetToken(token string) {
+       pc.Client.AuthToken = token
+}
+
+func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+       pc.tidy()
+       if t, ok := pc.cache[uuid]; ok && time.Now().Sub(t) < maxPermCacheAge {
+               debugLogf("perm ok (cached): %+q %+q", pc.Client.AuthToken, uuid)
+               return true, nil
+       }
+       var buf map[string]interface{}
+       path, err := pc.PathForUUID("get", uuid)
+       if err != nil {
+               return false, err
+       }
+       err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
+               "select": {`["uuid"]`},
+       })
+       if err, ok := err.(arvados.TransactionError); ok && err.StatusCode == http.StatusNotFound {
+               debugLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
+               return false, nil
+       }
+       if err != nil {
+               debugLogf("perm !ok: %+q %+q", pc.Client.AuthToken, uuid)
+               return false, err
+       }
+       debugLogf("perm ok: %+q %+q", pc.Client.AuthToken, uuid)
+       pc.cache[uuid] = time.Now()
+       return true, nil
+}
+
+func (pc *cachingPermChecker) tidy() {
+       if len(pc.cache) <= pc.maxCurrent*2 {
+               return
+       }
+       tooOld := time.Now().Add(-minPermCacheAge)
+       for uuid, t := range pc.cache {
+               if t.Before(tooOld) {
+                       delete(pc.cache, uuid)
+               }
+       }
+       pc.maxCurrent = len(pc.cache)
+}
diff --git a/services/ws/pg.go b/services/ws/pg.go
new file mode 100644 (file)
index 0000000..a5af9f7
--- /dev/null
@@ -0,0 +1,176 @@
+package main
+
+import (
+       "database/sql"
+       "fmt"
+       "log"
+       "strconv"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/lib/pq"
+)
+
+type pgConfig map[string]string
+
+func (c pgConfig) ConnectionString() string {
+       s := ""
+       for k, v := range c {
+               s += k
+               s += "='"
+               s += strings.Replace(
+                       strings.Replace(v, `\`, `\\`, -1),
+                       `'`, `\'`, -1)
+               s += "' "
+       }
+       return s
+}
+
+type pgEventSource struct {
+       DataSource string
+       QueueSize  int
+
+       db         *sql.DB
+       pqListener *pq.Listener
+       sinks      map[*pgEventSink]bool
+       setupOnce  sync.Once
+       mtx        sync.Mutex
+       shutdown   chan error
+}
+
+func (ps *pgEventSource) setup() {
+       ps.shutdown = make(chan error, 1)
+       ps.sinks = make(map[*pgEventSink]bool)
+
+       db, err := sql.Open("postgres", ps.DataSource)
+       if err != nil {
+               log.Fatalf("sql.Open: %s", err)
+       }
+       if err = db.Ping(); err != nil {
+               log.Fatalf("db.Ping: %s", err)
+       }
+       ps.db = db
+
+       ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+               if err != nil {
+                       // Until we have a mechanism for catching up
+                       // on missed events, we cannot recover from a
+                       // dropped connection without breaking our
+                       // promises to clients.
+                       ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
+               }
+       })
+       err = ps.pqListener.Listen("logs")
+       if err != nil {
+               log.Fatal(err)
+       }
+       debugLogf("pgEventSource listening")
+
+       go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+       eventQueue := make(chan *event, ps.QueueSize)
+
+       go func() {
+               for e := range eventQueue {
+                       // Wait for the "select ... from logs" call to
+                       // finish. This limits max concurrent queries
+                       // to ps.QueueSize. Without this, max
+                       // concurrent queries would be bounded by
+                       // client_count X client_queue_size.
+                       e.Detail()
+                       debugLogf("event %d detail %+v", e.Serial, e.Detail())
+                       ps.mtx.Lock()
+                       for sink := range ps.sinks {
+                               sink.channel <- e
+                       }
+                       ps.mtx.Unlock()
+               }
+       }()
+
+       var serial uint64
+       ticker := time.NewTicker(time.Minute)
+       defer ticker.Stop()
+       for {
+               select {
+               case err, ok := <-ps.shutdown:
+                       if ok {
+                               debugLogf("shutdown on error: %s", err)
+                       }
+                       close(eventQueue)
+                       return
+
+               case <-ticker.C:
+                       debugLogf("pgEventSource listener ping")
+                       ps.pqListener.Ping()
+
+               case pqEvent, ok := <-ps.pqListener.Notify:
+                       if !ok {
+                               close(eventQueue)
+                               return
+                       }
+                       if pqEvent.Channel != "logs" {
+                               continue
+                       }
+                       logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+                       if err != nil {
+                               log.Printf("bad notify payload: %+v", pqEvent)
+                               continue
+                       }
+                       serial++
+                       e := &event{
+                               LogID:    logID,
+                               Received: time.Now(),
+                               Serial:   serial,
+                               db:       ps.db,
+                       }
+                       debugLogf("event %d %+v", e.Serial, e)
+                       eventQueue <- e
+                       go e.Detail()
+               }
+       }
+}
+
+// NewSink subscribes to the event source. NewSink returns an
+// eventSink, whose Channel() method returns a channel: a pointer to
+// each subsequent event will be sent to that channel.
+//
+// The caller must ensure events are received from the sink channel as
+// quickly as possible because when one sink stops being ready, all
+// other sinks block.
+func (ps *pgEventSource) NewSink() eventSink {
+       ps.setupOnce.Do(ps.setup)
+       sink := &pgEventSink{
+               channel: make(chan *event, 1),
+               source:  ps,
+       }
+       ps.mtx.Lock()
+       ps.sinks[sink] = true
+       ps.mtx.Unlock()
+       return sink
+}
+
+type pgEventSink struct {
+       channel chan *event
+       source  *pgEventSource
+}
+
+func (sink *pgEventSink) Channel() <-chan *event {
+       return sink.channel
+}
+
+func (sink *pgEventSink) Stop() {
+       go func() {
+               // Ensure this sink cannot fill up and block the
+               // server-side queue (which otherwise could in turn
+               // block our mtx.Lock() here)
+               for _ = range sink.channel {
+               }
+       }()
+       sink.source.mtx.Lock()
+       delete(sink.source.sinks, sink)
+       sink.source.mtx.Unlock()
+       close(sink.channel)
+}
diff --git a/services/ws/router.go b/services/ws/router.go
new file mode 100644 (file)
index 0000000..69654cb
--- /dev/null
@@ -0,0 +1,74 @@
+package main
+
+import (
+       "encoding/json"
+       "log"
+       "net/http"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "golang.org/x/net/websocket"
+)
+
+type router struct {
+       Config *Config
+
+       eventSource eventSource
+       mux         *http.ServeMux
+       setupOnce   sync.Once
+}
+
+func (rtr *router) setup() {
+       rtr.mux = http.NewServeMux()
+       rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
+       rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
+}
+
+func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session, error)) *websocket.Server {
+       handler := &handler{
+               Client:      rtr.Config.Client,
+               PingTimeout: rtr.Config.PingTimeout.Duration(),
+               QueueSize:   rtr.Config.ClientEventQueue,
+               NewSession:  newSession,
+       }
+       return &websocket.Server{
+               Handshake: func(c *websocket.Config, r *http.Request) error {
+                       return nil
+               },
+               Handler: websocket.Handler(func(ws *websocket.Conn) {
+                       sink := rtr.eventSource.NewSink()
+                       handler.Handle(ws, sink.Channel())
+                       sink.Stop()
+                       ws.Close()
+               }),
+       }
+}
+
+func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       rtr.setupOnce.Do(rtr.setup)
+       t0 := time.Now()
+       reqLog(map[string]interface{}{
+               "Connect":         req.RemoteAddr,
+               "RemoteAddr":      req.RemoteAddr,
+               "X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
+               "Time":            t0.UTC(),
+       })
+       rtr.mux.ServeHTTP(resp, req)
+       t1 := time.Now()
+       reqLog(map[string]interface{}{
+               "Disconnect":      req.RemoteAddr,
+               "RemoteAddr":      req.RemoteAddr,
+               "X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
+               "Time":            t1.UTC(),
+               "Elapsed":         time.Now().Sub(t0).Seconds(),
+       })
+}
+
+func reqLog(m map[string]interface{}) {
+       j, err := json.Marshal(m)
+       if err != nil {
+               log.Fatal(err)
+       }
+       log.Print(string(j))
+}
diff --git a/services/ws/session.go b/services/ws/session.go
new file mode 100644 (file)
index 0000000..98164e3
--- /dev/null
@@ -0,0 +1,8 @@
+package main
+
+type session interface {
+       Receive(map[string]interface{}, []byte)
+       EventMessage(*event) ([]byte, error)
+       Filter(*event) bool
+       debugLogf(string, ...interface{})
+}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
new file mode 100644 (file)
index 0000000..467d156
--- /dev/null
@@ -0,0 +1,163 @@
+package main
+
+import (
+       "encoding/json"
+       "errors"
+       "log"
+       "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var (
+       errQueueFull   = errors.New("client queue full")
+       errFrameTooBig = errors.New("frame too big")
+)
+
+type sessionV0 struct {
+       ws          wsConn
+       permChecker permChecker
+       subscribed  map[string]bool
+       eventTypes  map[string]bool
+       mtx         sync.Mutex
+       setupOnce   sync.Once
+}
+
+type v0subscribe struct {
+       Method  string
+       Filters []v0filter
+}
+
+type v0filter []interface{}
+
+func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
+       sess := &sessionV0{
+               ws:          ws,
+               permChecker: NewPermChecker(ac),
+               subscribed:  make(map[string]bool),
+               eventTypes:  make(map[string]bool),
+       }
+
+       err := ws.Request().ParseForm()
+       if err != nil {
+               log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+               return nil, err
+       }
+       token := ws.Request().Form.Get("api_token")
+       sess.permChecker.SetToken(token)
+       sess.debugLogf("token = %+q", token)
+
+       return sess, nil
+}
+
+func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
+       args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
+       debugLogf("%s "+s, args...)
+}
+
+// If every client subscription message includes filters consisting
+// only of [["event_type","in",...]] then send only the requested
+// event types. Otherwise, clear sess.eventTypes and send all event
+// types from now on.
+func (sess *sessionV0) checkFilters(filters []v0filter) {
+       if sess.eventTypes == nil {
+               // Already received a subscription request without
+               // event_type filters.
+               return
+       }
+       eventTypes := sess.eventTypes
+       sess.eventTypes = nil
+       if len(filters) == 0 {
+               return
+       }
+       useFilters := false
+       for _, f := range filters {
+               col, ok := f[0].(string)
+               if !ok || col != "event_type" {
+                       continue
+               }
+               op, ok := f[1].(string)
+               if !ok || op != "in" {
+                       return
+               }
+               arr, ok := f[2].([]interface{})
+               if !ok {
+                       return
+               }
+               useFilters = true
+               for _, s := range arr {
+                       if s, ok := s.(string); ok {
+                               eventTypes[s] = true
+                       } else {
+                               return
+                       }
+               }
+       }
+       if useFilters {
+               sess.debugLogf("eventTypes %+v", eventTypes)
+               sess.eventTypes = eventTypes
+       }
+}
+
+func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
+       sess.debugLogf("received message: %+v", msg)
+       var sub v0subscribe
+       if err := json.Unmarshal(buf, &sub); err != nil {
+               sess.debugLogf("ignored unrecognized request: %s", err)
+               return
+       }
+       if sub.Method == "subscribe" {
+               sess.debugLogf("subscribing to *")
+               sess.mtx.Lock()
+               sess.checkFilters(sub.Filters)
+               sess.subscribed["*"] = true
+               sess.mtx.Unlock()
+       }
+}
+
+func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
+       detail := e.Detail()
+       if detail == nil {
+               return nil, nil
+       }
+
+       ok, err := sess.permChecker.Check(detail.ObjectUUID)
+       if err != nil || !ok {
+               return nil, err
+       }
+
+       msg := map[string]interface{}{
+               "msgID":             e.Serial,
+               "id":                detail.ID,
+               "uuid":              detail.UUID,
+               "object_uuid":       detail.ObjectUUID,
+               "object_owner_uuid": detail.ObjectOwnerUUID,
+               "event_type":        detail.EventType,
+       }
+       if detail.Properties != nil && detail.Properties["text"] != nil {
+               msg["properties"] = detail.Properties
+       }
+       return json.Marshal(msg)
+}
+
+func (sess *sessionV0) Filter(e *event) bool {
+       detail := e.Detail()
+       sess.mtx.Lock()
+       defer sess.mtx.Unlock()
+       switch {
+       case sess.eventTypes != nil && detail == nil:
+               return false
+       case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
+               return false
+       case sess.subscribed["*"]:
+               return true
+       case detail == nil:
+               return false
+       case sess.subscribed[detail.ObjectUUID]:
+               return true
+       case sess.subscribed[detail.ObjectOwnerUUID]:
+               return true
+       default:
+               return false
+       }
+}
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
new file mode 100644 (file)
index 0000000..bc09ed0
--- /dev/null
@@ -0,0 +1,11 @@
+package main
+
+import (
+       "errors"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+func NewSessionV1(ws wsConn, ac arvados.Client) (session, error) {
+       return nil, errors.New("Not implemented")
+}