8460: Merge branch 'master' into 8460-websocket-go
authorTom Clegg <tom@curoverse.com>
Sun, 11 Dec 2016 05:59:55 +0000 (00:59 -0500)
committerTom Clegg <tom@curoverse.com>
Sun, 11 Dec 2016 05:59:55 +0000 (00:59 -0500)
26 files changed:
build/run-build-packages.sh
build/run-tests.sh
doc/install/install-ws.html.textile.liquid [new file with mode: 0644]
sdk/go/arvados/client.go
sdk/go/arvados/log.go [new file with mode: 0644]
sdk/go/config/load.go
sdk/go/stats/duration.go [new file with mode: 0644]
sdk/go/stats/duration_test.go [new file with mode: 0644]
sdk/python/tests/nginx.conf
sdk/python/tests/run_test_server.py
sdk/python/tests/test_events.py
services/api/config/application.default.yml
services/keepstore/logging_router.go
services/ws/arvados-ws.service [new file with mode: 0644]
services/ws/config.go [new file with mode: 0644]
services/ws/doc.go [new file with mode: 0644]
services/ws/event.go [new file with mode: 0644]
services/ws/event_source.go [new file with mode: 0644]
services/ws/handler.go [new file with mode: 0644]
services/ws/log.go [new file with mode: 0644]
services/ws/main.go [new file with mode: 0644]
services/ws/permission.go [new file with mode: 0644]
services/ws/router.go [new file with mode: 0644]
services/ws/session.go [new file with mode: 0644]
services/ws/session_v0.go [new file with mode: 0644]
services/ws/session_v1.go [new file with mode: 0644]

index 0f10d26a9de37049c5a4ef7f1e1aed99e41db037..f5773e7a959a966fc18e9a8b4671218b1714b8cb 100755 (executable)
@@ -432,6 +432,8 @@ package_go_binary services/keepstore keepstore \
     "Keep storage daemon, accessible to clients on the LAN"
 package_go_binary services/keep-web keep-web \
     "Static web hosting service for user data stored in Arvados Keep"
+package_go_binary services/ws arvados-ws \
+    "Arvados Websocket server"
 package_go_binary tools/keep-block-check keep-block-check \
     "Verify that all data from one set of Keep servers to another was copied"
 package_go_binary tools/keep-rsync keep-rsync \
index 560a6933e8afe63e48f963dfa8c990a2da2b9897..e0e1ce28524327851e6925d26284530ced1dd5a0 100755 (executable)
@@ -79,6 +79,7 @@ services/nodemanager
 services/crunch-run
 services/crunch-dispatch-local
 services/crunch-dispatch-slurm
+services/ws
 sdk/cli
 sdk/pam
 sdk/python
@@ -90,6 +91,7 @@ sdk/go/httpserver
 sdk/go/manifest
 sdk/go/blockdigest
 sdk/go/streamer
+sdk/go/stats
 sdk/go/crunchrunner
 sdk/cwl
 tools/crunchstat-summary
@@ -270,15 +272,18 @@ start_api() {
         && eval $(python sdk/python/tests/run_test_server.py start --auth admin) \
         && export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
         && export ARVADOS_TEST_API_INSTALLED="$$" \
+        && python sdk/python/tests/run_test_server.py start_ws \
+        && python sdk/python/tests/run_test_server.py start_nginx \
         && (env | egrep ^ARVADOS)
 }
 
 start_nginx_proxy_services() {
-    echo 'Starting keepproxy, keep-web, arv-git-httpd, and nginx ssl proxy...'
+    echo 'Starting keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
     cd "$WORKSPACE" \
         && python sdk/python/tests/run_test_server.py start_keep_proxy \
         && python sdk/python/tests/run_test_server.py start_keep-web \
         && python sdk/python/tests/run_test_server.py start_arv-git-httpd \
+        && python sdk/python/tests/run_test_server.py start_ws \
         && python sdk/python/tests/run_test_server.py start_nginx \
         && export ARVADOS_TEST_PROXY_SERVICES=1
 }
@@ -289,12 +294,15 @@ stop_services() {
         cd "$WORKSPACE" \
             && python sdk/python/tests/run_test_server.py stop_nginx \
             && python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
+            && python sdk/python/tests/run_test_server.py stop_ws \
             && python sdk/python/tests/run_test_server.py stop_keep-web \
             && python sdk/python/tests/run_test_server.py stop_keep_proxy
     fi
     if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
         unset ARVADOS_TEST_API_HOST
         cd "$WORKSPACE" \
+            && python sdk/python/tests/run_test_server.py stop_nginx \
+            && python sdk/python/tests/run_test_server.py stop_ws \
             && python sdk/python/tests/run_test_server.py stop
     fi
 }
@@ -756,6 +764,7 @@ gostuff=(
     sdk/go/manifest
     sdk/go/streamer
     sdk/go/crunchrunner
+    sdk/go/stats
     lib/crunchstat
     services/arv-git-httpd
     services/crunchstat
@@ -767,6 +776,7 @@ gostuff=(
     services/crunch-dispatch-local
     services/crunch-dispatch-slurm
     services/crunch-run
+    services/ws
     tools/keep-block-check
     tools/keep-exercise
     tools/keep-rsync
diff --git a/doc/install/install-ws.html.textile.liquid b/doc/install/install-ws.html.textile.liquid
new file mode 100644 (file)
index 0000000..a36a59a
--- /dev/null
@@ -0,0 +1,204 @@
+---
+layout: default
+navsection: installguide
+title: Install the websocket server
+...
+
+{% include 'notebox_begin_warning' %}
+
+This websocket server is an alternative to the puma server that comes with the API server. It is available as an *experimental pre-release* and is not recommended for production sites.
+
+{% include 'notebox_end' %}
+
+The arvados-ws server provides event notifications to websocket clients. It can be installed anywhere with access to Postgres database and the Arvados API server, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for additional information.
+
+By convention, we use the following hostname for the websocket service.
+
+<notextile>
+<pre><code>ws.<span class="userinput">uuid_prefix.your.domain</span></code></pre>
+</notextile>
+
+The above hostname should resolve from anywhere on the internet.
+
+h2. Install arvados-ws
+
+Typically arvados-ws runs on the same host as the API server.
+
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install arvados-ws</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install arvados-ws</span>
+</code></pre>
+</notextile>
+
+Verify that @arvados-ws@ is functional:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arvados-ws -h</span>
+Usage of arvados-ws:
+  -config path
+        path to config file (default "/etc/arvados/ws/ws.yml")
+  -dump-config
+        show current configuration and exit
+</code></pre>
+</notextile>
+
+h3. Create a configuration file
+
+Create @/etc/arvados/ws/ws.yml@ using the following template. Replace @xxxxxxxx@ with the "password you generated during database setup":install-postgresql.html#api.
+
+<notextile>
+<pre><code>Client:
+  APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
+Listen: ":<span class="userinput">9003</span>"
+Postgres:
+  dbname: arvados_production
+  host: localhost
+  password: <span class="userinput">xxxxxxxx</span>
+  user: arvados
+</code></pre>
+</notextile>
+
+h3. Start the service (option 1: systemd)
+
+If your system does not use systemd, skip this section and follow the "runit instructions":#runit instead.
+
+If your system uses systemd, the arvados-ws service should already be set up. Start it and check its status:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo systemctl restart arvados-ws</span>
+~$ <span class="userinput">sudo systemctl status arvados-ws</span>
+&#x25cf; arvados-ws.service - Arvados websocket server
+   Loaded: loaded (/lib/systemd/system/arvados-ws.service; enabled)
+   Active: active (running) since Tue 2016-12-06 11:20:48 EST; 10s ago
+     Docs: https://doc.arvados.org/
+ Main PID: 9421 (arvados-ws)
+   CGroup: /system.slice/arvados-ws.service
+           └─9421 /usr/bin/arvados-ws
+
+Dec 06 11:20:48 zzzzz arvados-ws[9421]: {"level":"info","msg":"started","time":"2016-12-06T11:20:48.207617188-05:00"}
+Dec 06 11:20:48 zzzzz arvados-ws[9421]: {"Listen":":9003","level":"info","msg":"listening","time":"2016-12-06T11:20:48.244956506-05:00"}
+Dec 06 11:20:48 zzzzz systemd[1]: Started Arvados websocket server.
+</code></pre>
+</notextile>
+
+If it is not running, use @journalctl@ to check logs for errors:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo journalctl -n10 -u arvados-ws</span>
+...
+Dec 06 11:12:48 zzzzz systemd[1]: Starting Arvados websocket server...
+Dec 06 11:12:48 zzzzz arvados-ws[8918]: {"level":"info","msg":"started","time":"2016-12-06T11:12:48.030496636-05:00"}
+Dec 06 11:12:48 zzzzz arvados-ws[8918]: {"error":"pq: password authentication failed for user \"arvados\"","level":"fatal","msg":"db.Ping failed","time":"2016-12-06T11:12:48.058206400-05:00"}
+</code></pre>
+</notextile>
+
+Skip ahead to "confirm the service is working":#confirm.
+
+h3(#runit). Start the service (option 2: runit)
+
+Install runit to supervise the arvados-ws daemon.  {% include 'install_runit' %}
+
+Create a supervised service.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo mkdir /etc/service/arvados-ws</span>
+~$ <span class="userinput">cd /etc/service/arvados-ws</span>
+~$ <span class="userinput">sudo mkdir log log/main</span>
+~$ <span class="userinput">printf '#!/bin/sh\nexec arvados-ws 2>&1\n' | sudo tee run</span>
+~$ <span class="userinput">printf '#!/bin/sh\nexec svlogd main\n' | sudo tee log/run</span>
+~$ <span class="userinput">sudo chmod +x run log/run</span>
+~$ <span class="userinput">sudo sv exit .</span>
+~$ <span class="userinput">cd -</span>
+</code></pre>
+</notextile>
+
+Use @sv stat@ and check the log file to verify the service is running.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sv stat /etc/service/arvados-ws</span>
+run: /etc/service/arvados-ws: (pid 12520) 2s; run: log: (pid 12519) 2s
+~$ <span class="userinput">tail /etc/service/arvados-ws/log/main/current</span>
+{"level":"info","msg":"started","time":"2016-12-06T11:56:20.669171449-05:00"}
+{"Listen":":9003","level":"info","msg":"listening","time":"2016-12-06T11:56:20.708847627-05:00"}
+</code></pre>
+</notextile>
+
+h3(#confirm). Confirm the service is working
+
+Confirm the service is listening on its assigned port and responding to requests.
+
+<notextile>
+<pre><code>~$ <span class="userinput">curl http://0.0.0.0:<b>9003</b>/status.json</span>
+{"Clients":1}
+</code></pre>
+</notextile>
+
+h3. Set up a reverse proxy with SSL support
+
+The arvados-ws service will be accessible from anywhere on the internet, so we recommend using SSL for transport encryption.
+
+This is best achieved by putting a reverse proxy with SSL support in front of arvados-ws, running on port 443 and passing requests to arvados-ws on port 9003 (or whatever port you chose in your configuration file).
+
+For example, using Nginx:
+
+<notextile><pre>
+upstream arvados-ws {
+  server                127.0.0.1:<span class="userinput">9003</span>;
+}
+
+server {
+  listen                <span class="userinput">[your public IP address]</span>:443 ssl;
+  server_name           ws.<span class="userinput">uuid_prefix.your.domain</span>;
+
+  proxy_connect_timeout 90s;
+  proxy_read_timeout    300s;
+
+  ssl                   on;
+  ssl_certificate       <span class="userinput"/>YOUR/PATH/TO/cert.pem</span>;
+  ssl_certificate_key   <span class="userinput"/>YOUR/PATH/TO/cert.key</span>;
+
+  location / {
+    proxy_pass          http://arvados-ws;
+    proxy_set_header    Upgrade         $http_upgrade;
+    proxy_set_header    Connection      "upgrade";
+    proxy_set_header    Host            $host;
+    proxy_set_header    X-Forwarded-For $proxy_add_x_forwarded_for;
+  }
+}
+</pre></notextile>
+
+If Nginx is already configured to proxy @ws@ requests to puma, move that configuration out of the way or change its @server_name@ so it doesn't conflict.
+
+h3. Update API server configuration
+
+Ensure the websocket server address is correct in the API server configuration file @/etc/arvados/api/application.yml@.
+
+<notextile>
+<pre><code>websocket_address: wss://ws.<span class="userinput">uuid_prefix.your.domain</span>/websocket
+</code></pre>
+</notextile>
+
+Restart Nginx to reload the API server configuration.
+
+<notextile>
+<pre><code>$ sudo nginx -s reload</span>
+</code></pre>
+</notextile>
+
+h3. Verify DNS and proxy setup
+
+Use a host elsewhere on the Internet to confirm that your DNS, proxy, and SSL are configured correctly.
+
+<notextile>
+<pre><code>$ <span class="userinput">curl https://ws.<b>uuid_prefix.your.domain</b>/status.json</span>
+{"Clients":1}
+</code></pre>
+</notextile>
index 36f4eb52ae298982dfa09ddf82b0cea08c2604f7..0c18d38974f8be6ce99460ef2713e220a2cff403 100644 (file)
@@ -41,6 +41,8 @@ type Client struct {
        // callers who use a Client to initialize an
        // arvadosclient.ArvadosClient.)
        KeepServiceURIs []string `json:",omitempty"`
+
+       dd *DiscoveryDocument
 }
 
 // The default http.Client used by a Client with Insecure==true and
@@ -198,14 +200,83 @@ func (c *Client) apiURL(path string) string {
 
 // DiscoveryDocument is the Arvados server's description of itself.
 type DiscoveryDocument struct {
-       DefaultCollectionReplication int   `json:"defaultCollectionReplication"`
-       BlobSignatureTTL             int64 `json:"blobSignatureTtl"`
+       BasePath                     string              `json:"basePath"`
+       DefaultCollectionReplication int                 `json:"defaultCollectionReplication"`
+       BlobSignatureTTL             int64               `json:"blobSignatureTtl"`
+       Schemas                      map[string]Schema   `json:"schemas"`
+       Resources                    map[string]Resource `json:"resources"`
+}
+
+type Resource struct {
+       Methods map[string]ResourceMethod `json:"methods"`
+}
+
+type ResourceMethod struct {
+       HTTPMethod string         `json:"httpMethod"`
+       Path       string         `json:"path"`
+       Response   MethodResponse `json:"response"`
+}
+
+type MethodResponse struct {
+       Ref string `json:"$ref"`
+}
+
+type Schema struct {
+       UUIDPrefix string `json:"uuidPrefix"`
 }
 
 // DiscoveryDocument returns a *DiscoveryDocument. The returned object
 // should not be modified: the same object may be returned by
 // subsequent calls.
 func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+       if c.dd != nil {
+               return c.dd, nil
+       }
        var dd DiscoveryDocument
-       return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+       err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+       if err != nil {
+               return nil, err
+       }
+       c.dd = &dd
+       return c.dd, nil
+}
+
+func (c *Client) PathForUUID(method, uuid string) (string, error) {
+       if len(uuid) != 27 {
+               return "", fmt.Errorf("invalid UUID: %q", uuid)
+       }
+       dd, err := c.DiscoveryDocument()
+       if err != nil {
+               return "", err
+       }
+       infix := uuid[6:11]
+       var model string
+       for m, s := range dd.Schemas {
+               if s.UUIDPrefix == infix {
+                       model = m
+                       break
+               }
+       }
+       if model == "" {
+               return "", fmt.Errorf("unrecognized UUID infix: %q", infix)
+       }
+       var resource string
+       for r, rsc := range dd.Resources {
+               if rsc.Methods["get"].Response.Ref == model {
+                       resource = r
+                       break
+               }
+       }
+       if resource == "" {
+               return "", fmt.Errorf("no resource for model: %q", model)
+       }
+       m, ok := dd.Resources[resource].Methods[method]
+       if !ok {
+               return "", fmt.Errorf("no method %q for resource %q", method, resource)
+       }
+       path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1)
+       if path[0] == '/' {
+               path = path[1:]
+       }
+       return path, nil
 }
diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go
new file mode 100644 (file)
index 0000000..ef56e85
--- /dev/null
@@ -0,0 +1,24 @@
+package arvados
+
+import (
+       "time"
+)
+
+// Log is an arvados#log record
+type Log struct {
+       ID              uint64                 `json:"id"`
+       UUID            string                 `json:"uuid"`
+       ObjectUUID      string                 `json:"object_uuid"`
+       ObjectOwnerUUID string                 `json:"object_owner_uuid"`
+       EventType       string                 `json:"event_type"`
+       Properties      map[string]interface{} `json:"properties"`
+       CreatedAt       *time.Time             `json:"created_at,omitempty"`
+}
+
+// LogList is an arvados#logList resource.
+type LogList struct {
+       Items          []Log `json:"items"`
+       ItemsAvailable int   `json:"items_available"`
+       Offset         int   `json:"offset"`
+       Limit          int   `json:"limit"`
+}
index 9c65d65e84a57d9120d69dd84912615ff3949e35..2bbb440fb31211241a78a6f15c788f7e4d706334 100644 (file)
@@ -22,3 +22,8 @@ func LoadFile(cfg interface{}, configPath string) error {
        }
        return nil
 }
+
+// Dump returns a YAML representation of cfg.
+func Dump(cfg interface{}) ([]byte, error) {
+       return yaml.Marshal(cfg)
+}
diff --git a/sdk/go/stats/duration.go b/sdk/go/stats/duration.go
new file mode 100644 (file)
index 0000000..103dea0
--- /dev/null
@@ -0,0 +1,35 @@
+package stats
+
+import (
+       "fmt"
+       "strconv"
+       "time"
+)
+
+// Duration is a duration that is displayed as a number of seconds in
+// fixed-point notation.
+type Duration time.Duration
+
+// MarshalJSON implements json.Marshaler.
+func (d Duration) MarshalJSON() ([]byte, error) {
+       return []byte(d.String()), nil
+}
+
+// String implements fmt.Stringer.
+func (d Duration) String() string {
+       return fmt.Sprintf("%.6f", time.Duration(d).Seconds())
+}
+
+// UnmarshalJSON implements json.Unmarshaler
+func (d *Duration) UnmarshalJSON(data []byte) error {
+       return d.Set(string(data))
+}
+
+// Value implements flag.Value
+func (d *Duration) Set(s string) error {
+       sec, err := strconv.ParseFloat(s, 64)
+       if err == nil {
+               *d = Duration(sec * float64(time.Second))
+       }
+       return err
+}
diff --git a/sdk/go/stats/duration_test.go b/sdk/go/stats/duration_test.go
new file mode 100644 (file)
index 0000000..730e646
--- /dev/null
@@ -0,0 +1,23 @@
+package stats
+
+import (
+       "testing"
+       "time"
+)
+
+func TestString(t *testing.T) {
+       d := Duration(123123123123 * time.Nanosecond)
+       if s, expect := d.String(), "123.123123"; s != expect {
+               t.Errorf("got %s, expect %s", s, expect)
+       }
+}
+
+func TestSet(t *testing.T) {
+       var d Duration
+       if err := d.Set("123.456"); err != nil {
+               t.Fatal(err)
+       }
+       if got, expect := time.Duration(d).Nanoseconds(), int64(123456000000); got != expect {
+               t.Errorf("got %d, expect %d", got, expect)
+       }
+}
index 2b8b6ca1c4ad531c29bfd1c9a149da7c9bdf3599..006604077d457d286cdb9148332e087a0204313c 100644 (file)
@@ -54,4 +54,20 @@ http {
       proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/;
     }
   }
+  upstream ws {
+    server localhost:{{WSPORT}};
+  }
+  server {
+    listen *:{{WSSPORT}} ssl default_server;
+    server_name ~^(?<request_host>.*)$;
+    ssl_certificate {{SSLCERT}};
+    ssl_certificate_key {{SSLKEY}};
+    location  / {
+      proxy_pass http://ws;
+      proxy_set_header Upgrade $http_upgrade;
+      proxy_set_header Connection "upgrade";
+      proxy_set_header Host $request_host:{{WSPORT}};
+      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+    }
+  }
 }
index 642b7ccbad51846a9f1c25acc86f6b0505897c62..75b4f4d26dee608468a4ed587d26f7c6bc61c175 100644 (file)
@@ -44,6 +44,7 @@ if not os.path.exists(TEST_TMPDIR):
 
 my_api_host = None
 _cached_config = {}
+_cached_db_config = {}
 
 def find_server_pid(PID_PATH, wait=10):
     now = time.time()
@@ -284,10 +285,16 @@ def run(leave_running_atexit=False):
         os.makedirs(gitdir)
     subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball])
 
+    # The nginx proxy isn't listening here yet, but we need to choose
+    # the wss:// port now so we can write the API server config file.
+    wss_port = find_available_port()
+    _setport('wss', wss_port)
+
     port = find_available_port()
     env = os.environ.copy()
     env['RAILS_ENV'] = 'test'
-    env['ARVADOS_WEBSOCKETS'] = 'yes'
+    env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
+    env.pop('ARVADOS_WEBSOCKETS', None)
     env.pop('ARVADOS_TEST_API_HOST', None)
     env.pop('ARVADOS_API_HOST', None)
     env.pop('ARVADOS_API_HOST_INSECURE', None)
@@ -360,6 +367,47 @@ def stop(force=False):
         kill_server_pid(_pidfile('api'))
         my_api_host = None
 
+def run_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    stop_ws()
+    port = find_available_port()
+    conf = os.path.join(TEST_TMPDIR, 'ws.yml')
+    with open(conf, 'w') as f:
+        f.write("""
+Client:
+  APIHost: {}
+  Insecure: true
+Listen: :{}
+LogLevel: {}
+Postgres:
+  host: {}
+  dbname: {}
+  user: {}
+  password: {}
+  sslmode: require
+        """.format(os.environ['ARVADOS_API_HOST'],
+                   port,
+                   ('info' if os.environ.get('ARVADOS_DEBUG', '') in ['','0'] else 'debug'),
+                   _dbconfig('host'),
+                   _dbconfig('database'),
+                   _dbconfig('username'),
+                   _dbconfig('password')))
+    logf = open(_fifo2stderr('ws'), 'w')
+    ws = subprocess.Popen(
+        ["ws", "-config", conf],
+        stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+    with open(_pidfile('ws'), 'w') as f:
+        f.write(str(ws.pid))
+    _wait_until_port_listens(port)
+    _setport('ws', port)
+    return port
+
+def stop_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    kill_server_pid(_pidfile('ws'))
+
 def _start_keep(n, keep_args):
     keep0 = tempfile.mkdtemp()
     port = find_available_port()
@@ -537,6 +585,7 @@ def stop_keep_web():
 def run_nginx():
     if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
         return
+    stop_nginx()
     nginxconf = {}
     nginxconf['KEEPWEBPORT'] = _getport('keep-web')
     nginxconf['KEEPWEBDLSSLPORT'] = find_available_port()
@@ -545,6 +594,8 @@ def run_nginx():
     nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
     nginxconf['GITPORT'] = _getport('arv-git-httpd')
     nginxconf['GITSSLPORT'] = find_available_port()
+    nginxconf['WSPORT'] = _getport('ws')
+    nginxconf['WSSPORT'] = _getport('wss')
     nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
     nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
     nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
@@ -593,7 +644,15 @@ def _getport(program):
     except IOError:
         return 9
 
+def _dbconfig(key):
+    global _cached_db_config
+    if not _cached_db_config:
+        _cached_db_config = yaml.load(open(os.path.join(
+            SERVICES_SRC_DIR, 'api', 'config', 'database.yml')))
+    return _cached_db_config['test'][key]
+
 def _apiconfig(key):
+    global _cached_config
     if _cached_config:
         return _cached_config[key]
     def _load(f, required=True):
@@ -647,6 +706,7 @@ class TestCaseWithServers(unittest.TestCase):
     original environment.
     """
     MAIN_SERVER = None
+    WS_SERVER = None
     KEEP_SERVER = None
     KEEP_PROXY_SERVER = None
     KEEP_WEB_SERVER = None
@@ -667,6 +727,7 @@ class TestCaseWithServers(unittest.TestCase):
         os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
         for server_kwargs, start_func, stop_func in (
                 (cls.MAIN_SERVER, run, reset),
+                (cls.WS_SERVER, run_ws, stop_ws),
                 (cls.KEEP_SERVER, run_keep, stop_keep),
                 (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
                 (cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
@@ -693,6 +754,7 @@ class TestCaseWithServers(unittest.TestCase):
 if __name__ == "__main__":
     actions = [
         'start', 'stop',
+        'start_ws', 'stop_ws',
         'start_keep', 'stop_keep',
         'start_keep_proxy', 'stop_keep_proxy',
         'start_keep-web', 'stop_keep-web',
@@ -725,6 +787,10 @@ if __name__ == "__main__":
             print(host)
     elif args.action == 'stop':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
+    elif args.action == 'start_ws':
+        run_ws()
+    elif args.action == 'stop_ws':
+        stop_ws()
     elif args.action == 'start_keep':
         run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
     elif args.action == 'stop_keep':
index 0199724339b76f0fb37e5af89c8d62c53f332711..7e8c84ec11279495d55fd47770378847886ae76e 100644 (file)
@@ -53,21 +53,22 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         self.assertEqual(200, events.get(True, 5)['status'])
         human = arvados.api('v1').humans().create(body={}).execute()
 
-        log_object_uuids = []
-        for i in range(0, expected):
-            log_object_uuids.append(events.get(True, 5)['object_uuid'])
-
+        want_uuids = []
         if expected > 0:
-            self.assertIn(human['uuid'], log_object_uuids)
-
+            want_uuids.append(human['uuid'])
         if expected > 1:
-            self.assertIn(ancestor['uuid'], log_object_uuids)
+            want_uuids.append(ancestor['uuid'])
+        log_object_uuids = []
+        while set(want_uuids) - set(log_object_uuids):
+            log_object_uuids.append(events.get(True, 5)['object_uuid'])
 
-        with self.assertRaises(Queue.Empty):
-            # assertEqual just serves to show us what unexpected thing
-            # comes out of the queue when the assertRaises fails; when
-            # the test passes, this assertEqual doesn't get called.
-            self.assertEqual(events.get(True, 2), None)
+        if expected < 2:
+            with self.assertRaises(Queue.Empty):
+                # assertEqual just serves to show us what unexpected
+                # thing comes out of the queue when the assertRaises
+                # fails; when the test passes, this assertEqual
+                # doesn't get called.
+                self.assertEqual(events.get(True, 2), None)
 
     def test_subscribe_websocket(self):
         self._test_subscribe(
@@ -145,8 +146,8 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
 
     def isotz(self, offset):
-        """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
-        return '{:+03d}{:02d}'.format(offset/60, offset%60)
+        """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
+        return '{:+03d}:{:02d}'.format(offset/60, offset%60)
 
     # Test websocket reconnection on (un)execpted close
     def _test_websocket_reconnect(self, close_unexpected):
index a9aa953f9f36e948dc57d34336c7d3f1cc1df43c..ab560d7f6b79724a9a3ff1a22f6a413e16eb18c4 100644 (file)
@@ -444,3 +444,4 @@ test:
   workbench_address: https://localhost:3001/
   git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %>
   git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %>
+  websocket_address: "wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"
index bfd006ee8d2f3576332b8a3be4c6b040cce14214..e34f8581fd5448d606a6305e52c0673f944a8898 100644 (file)
@@ -5,12 +5,12 @@ package main
 
 import (
        "context"
-       "fmt"
        "net/http"
        "strings"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
        log "github.com/Sirupsen/logrus"
 )
 
@@ -97,25 +97,11 @@ func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWrite
        }
 
        lgr.WithFields(log.Fields{
-               "timeTotal":      loggedDuration(tDone.Sub(tStart)),
-               "timeToStatus":   loggedDuration(resp.sentHdr.Sub(tStart)),
-               "timeWriteBody":  loggedDuration(tDone.Sub(resp.sentHdr)),
+               "timeTotal":      stats.Duration(tDone.Sub(tStart)),
+               "timeToStatus":   stats.Duration(resp.sentHdr.Sub(tStart)),
+               "timeWriteBody":  stats.Duration(tDone.Sub(resp.sentHdr)),
                "respStatusCode": resp.Status,
                "respStatus":     statusText,
                "respBytes":      resp.Length,
        }).Info("response")
 }
-
-type loggedDuration time.Duration
-
-// MarshalJSON formats a duration as a number of seconds, using
-// fixed-point notation with no more than 6 decimal places.
-func (d loggedDuration) MarshalJSON() ([]byte, error) {
-       return []byte(d.String()), nil
-}
-
-// String formats a duration as a number of seconds, using
-// fixed-point notation with no more than 6 decimal places.
-func (d loggedDuration) String() string {
-       return fmt.Sprintf("%.6f", time.Duration(d).Seconds())
-}
diff --git a/services/ws/arvados-ws.service b/services/ws/arvados-ws.service
new file mode 100644 (file)
index 0000000..ebccf0c
--- /dev/null
@@ -0,0 +1,13 @@
+[Unit]
+Description=Arvados websocket server
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/ws/ws.yml
+
+[Service]
+Type=notify
+ExecStart=/usr/bin/arvados-ws
+Restart=always
+
+[Install]
+WantedBy=multi-user.target
diff --git a/services/ws/config.go b/services/ws/config.go
new file mode 100644 (file)
index 0000000..0faa863
--- /dev/null
@@ -0,0 +1,40 @@
+package main
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type wsConfig struct {
+       Client    arvados.Client
+       Postgres  pgConfig
+       Listen    string
+       LogLevel  string
+       LogFormat string
+
+       PingTimeout      arvados.Duration
+       ClientEventQueue int
+       ServerEventQueue int
+}
+
+func defaultConfig() wsConfig {
+       return wsConfig{
+               Client: arvados.Client{
+                       APIHost: "localhost:443",
+               },
+               Postgres: pgConfig{
+                       "dbname":          "arvados_production",
+                       "user":            "arvados",
+                       "password":        "xyzzy",
+                       "host":            "localhost",
+                       "connect_timeout": "30",
+                       "sslmode":         "require",
+               },
+               LogLevel:         "info",
+               LogFormat:        "json",
+               PingTimeout:      arvados.Duration(time.Minute),
+               ClientEventQueue: 64,
+               ServerEventQueue: 4,
+       }
+}
diff --git a/services/ws/doc.go b/services/ws/doc.go
new file mode 100644 (file)
index 0000000..c2d3187
--- /dev/null
@@ -0,0 +1,49 @@
+// Arvados-ws exposes Arvados APIs (currently just one, the
+// cache-invalidation event feed at "ws://.../websocket") to
+// websocket clients.
+//
+// See https://doc.arvados.org/install/install-ws.html.
+//
+// Usage
+//
+//     arvados-ws [-config /etc/arvados/ws/ws.yml] [-dump-config]
+//
+// Minimal configuration
+//
+//     Client:
+//       APIHost: localhost:443
+//     Listen: ":1234"
+//     Postgres:
+//       dbname: arvados_production
+//       host: localhost
+//       password: xyzzy
+//       user: arvados
+//
+// Options
+//
+// -config path
+//
+// Load configuration from the given file instead of the default
+// /etc/arvados/ws/ws.yml
+//
+// -dump-config
+//
+// Print the loaded configuration to stdout and exit.
+//
+// Logs
+//
+// Logs are printed to stderr, formatted as JSON.
+//
+// A log is printed each time a client connects or disconnects.
+//
+// Enable additional logs by configuring:
+//
+//     LogLevel: debug
+//
+// Runtime status
+//
+// GET /debug.json responds with debug stats.
+//
+// GET /status.json responds with health check results and
+// activity/usage metrics.
+package main
diff --git a/services/ws/event.go b/services/ws/event.go
new file mode 100644 (file)
index 0000000..fa2a5df
--- /dev/null
@@ -0,0 +1,64 @@
+package main
+
+import (
+       "database/sql"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/ghodss/yaml"
+)
+
+type eventSink interface {
+       Channel() <-chan *event
+       Stop()
+}
+
+type eventSource interface {
+       NewSink() eventSink
+       DB() *sql.DB
+}
+
+type event struct {
+       LogID    uint64
+       Received time.Time
+       Ready    time.Time
+       Serial   uint64
+
+       db     *sql.DB
+       logRow *arvados.Log
+       err    error
+       mtx    sync.Mutex
+}
+
+// Detail returns the database row corresponding to the event. It can
+// be called safely from multiple goroutines. Only one attempt will be
+// made. If the database row cannot be retrieved, Detail returns nil.
+func (e *event) Detail() *arvados.Log {
+       e.mtx.Lock()
+       defer e.mtx.Unlock()
+       if e.logRow != nil || e.err != nil {
+               return e.logRow
+       }
+       var logRow arvados.Log
+       var propYAML []byte
+       e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
+               &logRow.ID,
+               &logRow.UUID,
+               &logRow.ObjectUUID,
+               &logRow.ObjectOwnerUUID,
+               &logRow.EventType,
+               &logRow.CreatedAt,
+               &propYAML)
+       if e.err != nil {
+               logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
+               return nil
+       }
+       e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
+       if e.err != nil {
+               logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
+               return nil
+       }
+       e.logRow = &logRow
+       return e.logRow
+}
diff --git a/services/ws/event_source.go b/services/ws/event_source.go
new file mode 100644 (file)
index 0000000..ea90ec7
--- /dev/null
@@ -0,0 +1,216 @@
+package main
+
+import (
+       "database/sql"
+       "strconv"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/stats"
+       "github.com/lib/pq"
+)
+
+type pgConfig map[string]string
+
+func (c pgConfig) ConnectionString() string {
+       s := ""
+       for k, v := range c {
+               s += k
+               s += "='"
+               s += strings.Replace(
+                       strings.Replace(v, `\`, `\\`, -1),
+                       `'`, `\'`, -1)
+               s += "' "
+       }
+       return s
+}
+
+type pgEventSource struct {
+       DataSource string
+       QueueSize  int
+
+       db         *sql.DB
+       pqListener *pq.Listener
+       queue      chan *event
+       sinks      map[*pgEventSink]bool
+       setupOnce  sync.Once
+       mtx        sync.Mutex
+       shutdown   chan error
+
+       lastQDelay time.Duration
+       eventsIn   uint64
+       eventsOut  uint64
+}
+
+var _ debugStatuser = (*pgEventSource)(nil)
+
+func (ps *pgEventSource) setup() {
+       ps.shutdown = make(chan error, 1)
+       ps.sinks = make(map[*pgEventSink]bool)
+
+       db, err := sql.Open("postgres", ps.DataSource)
+       if err != nil {
+               logger(nil).WithError(err).Fatal("sql.Open failed")
+       }
+       if err = db.Ping(); err != nil {
+               logger(nil).WithError(err).Fatal("db.Ping failed")
+       }
+       ps.db = db
+
+       ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+               if err != nil {
+                       // Until we have a mechanism for catching up
+                       // on missed events, we cannot recover from a
+                       // dropped connection without breaking our
+                       // promises to clients.
+                       logger(nil).WithError(err).Error("listener problem")
+                       ps.shutdown <- err
+               }
+       })
+       err = ps.pqListener.Listen("logs")
+       if err != nil {
+               logger(nil).WithError(err).Fatal("pq Listen failed")
+       }
+       logger(nil).Debug("pgEventSource listening")
+
+       go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+       ps.queue = make(chan *event, ps.QueueSize)
+
+       go func() {
+               for e := range ps.queue {
+                       // Wait for the "select ... from logs" call to
+                       // finish. This limits max concurrent queries
+                       // to ps.QueueSize. Without this, max
+                       // concurrent queries would be bounded by
+                       // client_count X client_queue_size.
+                       e.Detail()
+
+                       logger(nil).
+                               WithField("serial", e.Serial).
+                               WithField("detail", e.Detail()).
+                               Debug("event ready")
+                       e.Ready = time.Now()
+                       ps.lastQDelay = e.Ready.Sub(e.Received)
+
+                       ps.mtx.Lock()
+                       atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
+                       for sink := range ps.sinks {
+                               sink.channel <- e
+                       }
+                       ps.mtx.Unlock()
+               }
+       }()
+
+       var serial uint64
+       ticker := time.NewTicker(time.Minute)
+       defer ticker.Stop()
+       for {
+               select {
+               case err, ok := <-ps.shutdown:
+                       if ok {
+                               logger(nil).WithError(err).Info("shutdown")
+                       }
+                       close(ps.queue)
+                       return
+
+               case <-ticker.C:
+                       logger(nil).Debug("listener ping")
+                       ps.pqListener.Ping()
+
+               case pqEvent, ok := <-ps.pqListener.Notify:
+                       if !ok {
+                               close(ps.queue)
+                               return
+                       }
+                       if pqEvent.Channel != "logs" {
+                               continue
+                       }
+                       logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+                       if err != nil {
+                               logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
+                               continue
+                       }
+                       serial++
+                       e := &event{
+                               LogID:    logID,
+                               Received: time.Now(),
+                               Serial:   serial,
+                               db:       ps.db,
+                       }
+                       logger(nil).WithField("event", e).Debug("incoming")
+                       atomic.AddUint64(&ps.eventsIn, 1)
+                       ps.queue <- e
+                       go e.Detail()
+               }
+       }
+}
+
+// NewSink subscribes to the event source. NewSink returns an
+// eventSink, whose Channel() method returns a channel: a pointer to
+// each subsequent event will be sent to that channel.
+//
+// The caller must ensure events are received from the sink channel as
+// quickly as possible because when one sink stops being ready, all
+// other sinks block.
+func (ps *pgEventSource) NewSink() eventSink {
+       ps.setupOnce.Do(ps.setup)
+       sink := &pgEventSink{
+               channel: make(chan *event, 1),
+               source:  ps,
+       }
+       ps.mtx.Lock()
+       ps.sinks[sink] = true
+       ps.mtx.Unlock()
+       return sink
+}
+
+func (ps *pgEventSource) DB() *sql.DB {
+       ps.setupOnce.Do(ps.setup)
+       return ps.db
+}
+
+func (ps *pgEventSource) DebugStatus() interface{} {
+       ps.mtx.Lock()
+       defer ps.mtx.Unlock()
+       blocked := 0
+       for sink := range ps.sinks {
+               blocked += len(sink.channel)
+       }
+       return map[string]interface{}{
+               "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
+               "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
+               "Queue":        len(ps.queue),
+               "QueueLimit":   cap(ps.queue),
+               "QueueDelay":   stats.Duration(ps.lastQDelay),
+               "Sinks":        len(ps.sinks),
+               "SinksBlocked": blocked,
+       }
+}
+
+type pgEventSink struct {
+       channel chan *event
+       source  *pgEventSource
+}
+
+func (sink *pgEventSink) Channel() <-chan *event {
+       return sink.channel
+}
+
+func (sink *pgEventSink) Stop() {
+       go func() {
+               // Ensure this sink cannot fill up and block the
+               // server-side queue (which otherwise could in turn
+               // block our mtx.Lock() here)
+               for _ = range sink.channel {
+               }
+       }()
+       sink.source.mtx.Lock()
+       delete(sink.source.sinks, sink)
+       sink.source.mtx.Unlock()
+       close(sink.channel)
+}
diff --git a/services/ws/handler.go b/services/ws/handler.go
new file mode 100644 (file)
index 0000000..b07b78c
--- /dev/null
@@ -0,0 +1,230 @@
+package main
+
+import (
+       "context"
+       "io"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
+)
+
+type handler struct {
+       Client      arvados.Client
+       PingTimeout time.Duration
+       QueueSize   int
+
+       mtx       sync.Mutex
+       lastDelay map[chan interface{}]stats.Duration
+       setupOnce sync.Once
+}
+
+type handlerStats struct {
+       QueueDelayNs time.Duration
+       WriteDelayNs time.Duration
+       EventBytes   uint64
+       EventCount   uint64
+}
+
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
+       h.setupOnce.Do(h.setup)
+
+       ctx, cancel := context.WithCancel(ws.Request().Context())
+       defer cancel()
+       log := logger(ctx)
+
+       incoming := eventSource.NewSink()
+       defer incoming.Stop()
+
+       queue := make(chan interface{}, h.QueueSize)
+       h.mtx.Lock()
+       h.lastDelay[queue] = 0
+       h.mtx.Unlock()
+       defer func() {
+               h.mtx.Lock()
+               delete(h.lastDelay, queue)
+               h.mtx.Unlock()
+       }()
+
+       sess, err := newSession(ws, queue)
+       if err != nil {
+               log.WithError(err).Error("newSession failed")
+               return
+       }
+
+       go func() {
+               buf := make([]byte, 2<<20)
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       default:
+                       }
+                       ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
+                       n, err := ws.Read(buf)
+                       buf := buf[:n]
+                       log.WithField("frame", string(buf[:n])).Debug("received frame")
+                       if err == nil && n == cap(buf) {
+                               err = errFrameTooBig
+                       }
+                       if err != nil {
+                               if err != io.EOF {
+                                       log.WithError(err).Info("read error")
+                               }
+                               cancel()
+                               return
+                       }
+                       err = sess.Receive(buf)
+                       if err != nil {
+                               log.WithError(err).Error("sess.Receive() failed")
+                               cancel()
+                               return
+                       }
+               }
+       }()
+
+       go func() {
+               for {
+                       var ok bool
+                       var data interface{}
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case data, ok = <-queue:
+                               if !ok {
+                                       return
+                               }
+                       }
+                       var e *event
+                       var buf []byte
+                       var err error
+                       log := log
+
+                       switch data := data.(type) {
+                       case []byte:
+                               buf = data
+                       case *event:
+                               e = data
+                               log = log.WithField("serial", e.Serial)
+                               buf, err = sess.EventMessage(e)
+                               if err != nil {
+                                       log.WithError(err).Error("EventMessage failed")
+                                       cancel()
+                                       break
+                               } else if len(buf) == 0 {
+                                       log.Debug("skip")
+                                       continue
+                               }
+                       default:
+                               log.WithField("data", data).Error("bad object in client queue")
+                               continue
+                       }
+
+                       log.WithField("frame", string(buf)).Debug("send event")
+                       ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+                       t0 := time.Now()
+                       _, err = ws.Write(buf)
+                       if err != nil {
+                               log.WithError(err).Error("write failed")
+                               cancel()
+                               break
+                       }
+                       log.Debug("sent")
+
+                       if e != nil {
+                               hStats.QueueDelayNs += t0.Sub(e.Ready)
+                               h.mtx.Lock()
+                               h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
+                               h.mtx.Unlock()
+                       }
+                       hStats.WriteDelayNs += time.Since(t0)
+                       hStats.EventBytes += uint64(len(buf))
+                       hStats.EventCount++
+               }
+       }()
+
+       // Filter incoming events against the current subscription
+       // list, and forward matching events to the outgoing message
+       // queue. Close the queue and return when the request context
+       // is done/cancelled or the incoming event stream ends. Shut
+       // down the handler if the outgoing queue fills up.
+       go func() {
+               ticker := time.NewTicker(h.PingTimeout)
+               defer ticker.Stop()
+
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case <-ticker.C:
+                               // If the outgoing queue is empty,
+                               // send an empty message. This can
+                               // help detect a disconnected network
+                               // socket, and prevent an idle socket
+                               // from being closed.
+                               if len(queue) == 0 {
+                                       select {
+                                       case queue <- []byte(`{}`):
+                                       default:
+                                       }
+                               }
+                               continue
+                       case e, ok := <-incoming.Channel():
+                               if !ok {
+                                       cancel()
+                                       return
+                               }
+                               if !sess.Filter(e) {
+                                       continue
+                               }
+                               select {
+                               case queue <- e:
+                               default:
+                                       log.WithError(errQueueFull).Error("terminate")
+                                       cancel()
+                                       return
+                               }
+                       }
+               }
+       }()
+
+       <-ctx.Done()
+       return
+}
+
+func (h *handler) DebugStatus() interface{} {
+       h.mtx.Lock()
+       defer h.mtx.Unlock()
+
+       var s struct {
+               QueueCount    int
+               QueueMin      int
+               QueueMax      int
+               QueueTotal    uint64
+               QueueDelayMin stats.Duration
+               QueueDelayMax stats.Duration
+       }
+       for q, lastDelay := range h.lastDelay {
+               s.QueueCount++
+               n := len(q)
+               s.QueueTotal += uint64(n)
+               if s.QueueMax < n {
+                       s.QueueMax = n
+               }
+               if s.QueueMin > n || s.QueueCount == 1 {
+                       s.QueueMin = n
+               }
+               if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
+                       s.QueueDelayMin = lastDelay
+               }
+               if s.QueueDelayMax < lastDelay {
+                       s.QueueDelayMax = lastDelay
+               }
+       }
+       return &s
+}
+
+func (h *handler) setup() {
+       h.lastDelay = make(map[chan interface{}]stats.Duration)
+}
diff --git a/services/ws/log.go b/services/ws/log.go
new file mode 100644 (file)
index 0000000..1fbfbad
--- /dev/null
@@ -0,0 +1,54 @@
+package main
+
+import (
+       "context"
+
+       "github.com/Sirupsen/logrus"
+)
+
+var (
+       loggerCtxKey = new(int)
+       rootLogger   = logrus.New()
+)
+
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
+// contextWithLogger returns a new child context such that
+// logger(child) returns the given logger.
+func contextWithLogger(ctx context.Context, logger *logrus.Entry) context.Context {
+       return context.WithValue(ctx, loggerCtxKey, logger)
+}
+
+// logger returns the logger suitable for the given context -- the one
+// attached by contextWithLogger() if applicable, otherwise the
+// top-level logger with no fields/values.
+func logger(ctx context.Context) *logrus.Entry {
+       if ctx != nil {
+               if logger, ok := ctx.Value(loggerCtxKey).(*logrus.Entry); ok {
+                       return logger
+               }
+       }
+       return rootLogger.WithFields(nil)
+}
+
+// loggerConfig sets up logging to behave as configured.
+func loggerConfig(cfg wsConfig) {
+       lvl, err := logrus.ParseLevel(cfg.LogLevel)
+       if err != nil {
+               logrus.Fatal(err)
+       }
+       rootLogger.Level = lvl
+       switch cfg.LogFormat {
+       case "text":
+               rootLogger.Formatter = &logrus.TextFormatter{
+                       FullTimestamp:   true,
+                       TimestampFormat: rfc3339NanoFixed,
+               }
+       case "json":
+               rootLogger.Formatter = &logrus.JSONFormatter{
+                       TimestampFormat: rfc3339NanoFixed,
+               }
+       default:
+               logrus.WithField("LogFormat", cfg.LogFormat).Fatal("unknown log format")
+       }
+}
diff --git a/services/ws/main.go b/services/ws/main.go
new file mode 100644 (file)
index 0000000..77ebf9e
--- /dev/null
@@ -0,0 +1,63 @@
+package main
+
+import (
+       "flag"
+       "fmt"
+       "net/http"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/config"
+       "github.com/coreos/go-systemd/daemon"
+)
+
+func main() {
+       log := logger(nil)
+
+       configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+       dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
+       cfg := defaultConfig()
+       flag.Parse()
+
+       err := config.LoadFile(&cfg, *configPath)
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       loggerConfig(cfg)
+
+       if *dumpConfig {
+               txt, err := config.Dump(&cfg)
+               if err != nil {
+                       log.Fatal(err)
+               }
+               fmt.Print(string(txt))
+               return
+       }
+
+       log.Info("started")
+       eventSource := &pgEventSource{
+               DataSource: cfg.Postgres.ConnectionString(),
+               QueueSize:  cfg.ServerEventQueue,
+       }
+       srv := &http.Server{
+               Addr:           cfg.Listen,
+               ReadTimeout:    time.Minute,
+               WriteTimeout:   time.Minute,
+               MaxHeaderBytes: 1 << 20,
+               Handler: &router{
+                       Config:         &cfg,
+                       eventSource:    eventSource,
+                       newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
+               },
+       }
+       // Bootstrap the eventSource by attaching a dummy subscriber
+       // and hanging up.
+       eventSource.NewSink().Stop()
+
+       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+               log.WithError(err).Warn("error notifying init daemon")
+       }
+
+       log.WithField("Listen", srv.Addr).Info("listening")
+       log.Fatal(srv.ListenAndServe())
+}
diff --git a/services/ws/permission.go b/services/ws/permission.go
new file mode 100644 (file)
index 0000000..e467e06
--- /dev/null
@@ -0,0 +1,94 @@
+package main
+
+import (
+       "net/http"
+       "net/url"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+const (
+       maxPermCacheAge = time.Hour
+       minPermCacheAge = 5 * time.Minute
+)
+
+type permChecker interface {
+       SetToken(token string)
+       Check(uuid string) (bool, error)
+}
+
+func newPermChecker(ac arvados.Client) permChecker {
+       ac.AuthToken = ""
+       return &cachingPermChecker{
+               Client:     &ac,
+               cache:      make(map[string]cacheEnt),
+               maxCurrent: 16,
+       }
+}
+
+type cacheEnt struct {
+       time.Time
+       allowed bool
+}
+
+type cachingPermChecker struct {
+       *arvados.Client
+       cache      map[string]cacheEnt
+       maxCurrent int
+}
+
+func (pc *cachingPermChecker) SetToken(token string) {
+       pc.Client.AuthToken = token
+}
+
+func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+       logger := logger(nil).
+               WithField("token", pc.Client.AuthToken).
+               WithField("uuid", uuid)
+       pc.tidy()
+       now := time.Now()
+       if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
+               logger.WithField("allowed", perm.allowed).Debug("cache hit")
+               return perm.allowed, nil
+       }
+       var buf map[string]interface{}
+       path, err := pc.PathForUUID("get", uuid)
+       if err != nil {
+               return false, err
+       }
+       err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
+               "select": {`["uuid"]`},
+       })
+
+       var allowed bool
+       if err == nil {
+               allowed = true
+       } else if txErr, ok := err.(*arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+               allowed = false
+       } else if txErr.StatusCode == http.StatusForbidden {
+               // Some requests are expressly forbidden for reasons
+               // other than "you aren't allowed to know whether this
+               // UUID exists" (404).
+               allowed = false
+       } else {
+               logger.WithError(err).Error("lookup error")
+               return false, err
+       }
+       logger.WithField("allowed", allowed).Debug("cache miss")
+       pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
+       return allowed, nil
+}
+
+func (pc *cachingPermChecker) tidy() {
+       if len(pc.cache) <= pc.maxCurrent*2 {
+               return
+       }
+       tooOld := time.Now().Add(-minPermCacheAge)
+       for uuid, t := range pc.cache {
+               if t.Before(tooOld) {
+                       delete(pc.cache, uuid)
+               }
+       }
+       pc.maxCurrent = len(pc.cache)
+}
diff --git a/services/ws/router.go b/services/ws/router.go
new file mode 100644 (file)
index 0000000..0ad8d7c
--- /dev/null
@@ -0,0 +1,142 @@
+package main
+
+import (
+       "database/sql"
+       "encoding/json"
+       "io"
+       "net/http"
+       "strconv"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/Sirupsen/logrus"
+       "golang.org/x/net/websocket"
+)
+
+type wsConn interface {
+       io.ReadWriter
+       Request() *http.Request
+       SetReadDeadline(time.Time) error
+       SetWriteDeadline(time.Time) error
+}
+
+type router struct {
+       Config         *wsConfig
+       eventSource    eventSource
+       newPermChecker func() permChecker
+
+       handler   *handler
+       mux       *http.ServeMux
+       setupOnce sync.Once
+
+       lastReqID  int64
+       lastReqMtx sync.Mutex
+
+       status routerDebugStatus
+}
+
+type routerDebugStatus struct {
+       ReqsReceived int64
+       ReqsActive   int64
+}
+
+type debugStatuser interface {
+       DebugStatus() interface{}
+}
+
+type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker) (session, error)
+
+func (rtr *router) setup() {
+       rtr.handler = &handler{
+               PingTimeout: rtr.Config.PingTimeout.Duration(),
+               QueueSize:   rtr.Config.ClientEventQueue,
+       }
+       rtr.mux = http.NewServeMux()
+       rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
+       rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
+       rtr.mux.HandleFunc("/debug.json", jsonHandler(rtr.DebugStatus))
+       rtr.mux.HandleFunc("/status.json", jsonHandler(rtr.Status))
+}
+
+func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
+       return &websocket.Server{
+               Handshake: func(c *websocket.Config, r *http.Request) error {
+                       return nil
+               },
+               Handler: websocket.Handler(func(ws *websocket.Conn) {
+                       t0 := time.Now()
+                       log := logger(ws.Request().Context())
+                       log.Info("connected")
+
+                       stats := rtr.handler.Handle(ws, rtr.eventSource,
+                               func(ws wsConn, sendq chan<- interface{}) (session, error) {
+                                       return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker())
+                               })
+
+                       log.WithFields(logrus.Fields{
+                               "elapsed": time.Now().Sub(t0).Seconds(),
+                               "stats":   stats,
+                       }).Info("disconnect")
+                       ws.Close()
+               }),
+       }
+}
+
+func (rtr *router) newReqID() string {
+       rtr.lastReqMtx.Lock()
+       defer rtr.lastReqMtx.Unlock()
+       id := time.Now().UnixNano()
+       if id <= rtr.lastReqID {
+               id = rtr.lastReqID + 1
+       }
+       return strconv.FormatInt(id, 36)
+}
+
+func (rtr *router) DebugStatus() interface{} {
+       s := map[string]interface{}{
+               "HTTP":     rtr.status,
+               "Outgoing": rtr.handler.DebugStatus(),
+       }
+       if es, ok := rtr.eventSource.(debugStatuser); ok {
+               s["EventSource"] = es.DebugStatus()
+       }
+       return s
+}
+
+func (rtr *router) Status() interface{} {
+       return map[string]interface{}{
+               "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
+       }
+}
+
+func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       rtr.setupOnce.Do(rtr.setup)
+       atomic.AddInt64(&rtr.status.ReqsReceived, 1)
+       atomic.AddInt64(&rtr.status.ReqsActive, 1)
+       defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
+
+       logger := logger(req.Context()).
+               WithField("RequestID", rtr.newReqID())
+       ctx := contextWithLogger(req.Context(), logger)
+       req = req.WithContext(ctx)
+       logger.WithFields(logrus.Fields{
+               "remoteAddr":      req.RemoteAddr,
+               "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+       }).Info("accept request")
+       rtr.mux.ServeHTTP(resp, req)
+}
+
+func jsonHandler(fn func() interface{}) http.HandlerFunc {
+       return func(resp http.ResponseWriter, req *http.Request) {
+               logger := logger(req.Context())
+               resp.Header().Set("Content-Type", "application/json")
+               enc := json.NewEncoder(resp)
+               err := enc.Encode(fn())
+               if err != nil {
+                       msg := "encode failed"
+                       logger.WithError(err).Error(msg)
+                       http.Error(resp, msg, http.StatusInternalServerError)
+               }
+       }
+}
diff --git a/services/ws/session.go b/services/ws/session.go
new file mode 100644 (file)
index 0000000..d469737
--- /dev/null
@@ -0,0 +1,25 @@
+package main
+
+type session interface {
+       // Receive processes a message received from the client. If a
+       // non-nil error is returned, the connection will be
+       // terminated.
+       Receive([]byte) error
+
+       // Filter returns true if the event should be queued for
+       // sending to the client. It should return as fast as
+       // possible, and must not block.
+       Filter(*event) bool
+
+       // EventMessage encodes the given event (from the front of the
+       // queue) into a form suitable to send to the client. If a
+       // non-nil error is returned, the connection is terminated. If
+       // the returned buffer is empty, nothing is sent to the client
+       // and the event is not counted in statistics.
+       //
+       // Unlike Filter, EventMessage can block without affecting
+       // other connections. If EventMessage is slow, additional
+       // incoming events will be queued. If the event queue fills
+       // up, the connection will be dropped.
+       EventMessage(*event) ([]byte, error)
+}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
new file mode 100644 (file)
index 0000000..1cb3021
--- /dev/null
@@ -0,0 +1,286 @@
+package main
+
+import (
+       "database/sql"
+       "encoding/json"
+       "errors"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/Sirupsen/logrus"
+)
+
+var (
+       errQueueFull   = errors.New("client queue full")
+       errFrameTooBig = errors.New("frame too big")
+
+       sendObjectAttributes = []string{"state", "name"}
+
+       v0subscribeOK   = []byte(`{"status":200}`)
+       v0subscribeFail = []byte(`{"status":400}`)
+)
+
+type v0session struct {
+       ws            wsConn
+       sendq         chan<- interface{}
+       db            *sql.DB
+       permChecker   permChecker
+       subscriptions []v0subscribe
+       lastMsgID     uint64
+       log           *logrus.Entry
+       mtx           sync.Mutex
+       setupOnce     sync.Once
+}
+
+func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker) (session, error) {
+       sess := &v0session{
+               sendq:       sendq,
+               ws:          ws,
+               db:          db,
+               permChecker: pc,
+               log:         logger(ws.Request().Context()),
+       }
+
+       err := ws.Request().ParseForm()
+       if err != nil {
+               sess.log.WithError(err).Error("ParseForm failed")
+               return nil, err
+       }
+       token := ws.Request().Form.Get("api_token")
+       sess.permChecker.SetToken(token)
+       sess.log.WithField("token", token).Debug("set token")
+
+       return sess, nil
+}
+
+func (sess *v0session) Receive(buf []byte) error {
+       var sub v0subscribe
+       if err := json.Unmarshal(buf, &sub); err != nil {
+               sess.log.WithError(err).Info("invalid message from client")
+       } else if sub.Method == "subscribe" {
+               sub.prepare(sess)
+               sess.log.WithField("sub", sub).Debug("sub prepared")
+               sess.sendq <- v0subscribeOK
+               sess.mtx.Lock()
+               sess.subscriptions = append(sess.subscriptions, sub)
+               sess.mtx.Unlock()
+               sub.sendOldEvents(sess)
+               return nil
+       } else {
+               sess.log.WithField("Method", sub.Method).Info("unknown method")
+       }
+       sess.sendq <- v0subscribeFail
+       return nil
+}
+
+func (sess *v0session) EventMessage(e *event) ([]byte, error) {
+       detail := e.Detail()
+       if detail == nil {
+               return nil, nil
+       }
+
+       ok, err := sess.permChecker.Check(detail.ObjectUUID)
+       if err != nil || !ok {
+               return nil, err
+       }
+
+       msg := map[string]interface{}{
+               "msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
+               "id":                detail.ID,
+               "uuid":              detail.UUID,
+               "object_uuid":       detail.ObjectUUID,
+               "object_owner_uuid": detail.ObjectOwnerUUID,
+               "event_type":        detail.EventType,
+       }
+       if detail.Properties != nil && detail.Properties["text"] != nil {
+               msg["properties"] = detail.Properties
+       } else {
+               msgProps := map[string]map[string]interface{}{}
+               for _, ak := range []string{"old_attributes", "new_attributes"} {
+                       eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
+                       if !ok {
+                               continue
+                       }
+                       msgAttrs := map[string]interface{}{}
+                       for _, k := range sendObjectAttributes {
+                               if v, ok := eventAttrs[k]; ok {
+                                       msgAttrs[k] = v
+                               }
+                       }
+                       msgProps[ak] = msgAttrs
+               }
+               msg["properties"] = msgProps
+       }
+       return json.Marshal(msg)
+}
+
+func (sess *v0session) Filter(e *event) bool {
+       sess.mtx.Lock()
+       defer sess.mtx.Unlock()
+       for _, sub := range sess.subscriptions {
+               if sub.match(sess, e) {
+                       return true
+               }
+       }
+       return false
+}
+
+func (sub *v0subscribe) sendOldEvents(sess *v0session) {
+       if sub.LastLogID == 0 {
+               return
+       }
+       sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
+       // Here we do a "select id" query and queue an event for every
+       // log since the given ID, then use (*event)Detail() to
+       // retrieve the whole row and decide whether to send it. This
+       // approach is very inefficient if the subscriber asks for
+       // last_log_id==1, even if the filters end up matching very
+       // few events.
+       //
+       // To mitigate this, filter on "created > 10 minutes ago" when
+       // retrieving the list of old event IDs to consider.
+       rows, err := sess.db.Query(
+               `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
+               sub.LastLogID,
+               time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
+       if err != nil {
+               sess.log.WithError(err).Error("db.Query failed")
+               return
+       }
+       for rows.Next() {
+               var id uint64
+               err := rows.Scan(&id)
+               if err != nil {
+                       sess.log.WithError(err).Error("row Scan failed")
+                       continue
+               }
+               for len(sess.sendq)*2 > cap(sess.sendq) {
+                       // Ugly... but if we fill up the whole client
+                       // queue with a backlog of old events, a
+                       // single new event will overflow it and
+                       // terminate the connection, and then the
+                       // client will probably reconnect and do the
+                       // same thing all over again.
+                       time.Sleep(100 * time.Millisecond)
+               }
+               now := time.Now()
+               e := &event{
+                       LogID:    id,
+                       Received: now,
+                       Ready:    now,
+                       db:       sess.db,
+               }
+               if sub.match(sess, e) {
+                       select {
+                       case sess.sendq <- e:
+                       case <-sess.ws.Request().Context().Done():
+                               return
+                       }
+               }
+       }
+       if err := rows.Err(); err != nil {
+               sess.log.WithError(err).Error("db.Query failed")
+       }
+}
+
+type v0subscribe struct {
+       Method    string
+       Filters   []v0filter
+       LastLogID int64 `json:"last_log_id"`
+
+       funcs []func(*event) bool
+}
+
+type v0filter [3]interface{}
+
+func (sub *v0subscribe) match(sess *v0session, e *event) bool {
+       log := sess.log.WithField("LogID", e.LogID)
+       detail := e.Detail()
+       if detail == nil {
+               log.Error("match failed, no detail")
+               return false
+       }
+       log = log.WithField("funcs", len(sub.funcs))
+       for i, f := range sub.funcs {
+               if !f(e) {
+                       log.WithField("func", i).Debug("match failed")
+                       return false
+               }
+       }
+       log.Debug("match passed")
+       return true
+}
+
+func (sub *v0subscribe) prepare(sess *v0session) {
+       for _, f := range sub.Filters {
+               if len(f) != 3 {
+                       continue
+               }
+               if col, ok := f[0].(string); ok && col == "event_type" {
+                       op, ok := f[1].(string)
+                       if !ok || op != "in" {
+                               continue
+                       }
+                       arr, ok := f[2].([]interface{})
+                       if !ok {
+                               continue
+                       }
+                       var strs []string
+                       for _, s := range arr {
+                               if s, ok := s.(string); ok {
+                                       strs = append(strs, s)
+                               }
+                       }
+                       sub.funcs = append(sub.funcs, func(e *event) bool {
+                               for _, s := range strs {
+                                       if s == e.Detail().EventType {
+                                               return true
+                                       }
+                               }
+                               return false
+                       })
+               } else if ok && col == "created_at" {
+                       op, ok := f[1].(string)
+                       if !ok {
+                               continue
+                       }
+                       tstr, ok := f[2].(string)
+                       if !ok {
+                               continue
+                       }
+                       t, err := time.Parse(time.RFC3339Nano, tstr)
+                       if err != nil {
+                               sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
+                               continue
+                       }
+                       var fn func(*event) bool
+                       switch op {
+                       case ">=":
+                               fn = func(e *event) bool {
+                                       return !e.Detail().CreatedAt.Before(t)
+                               }
+                       case "<=":
+                               fn = func(e *event) bool {
+                                       return !e.Detail().CreatedAt.After(t)
+                               }
+                       case ">":
+                               fn = func(e *event) bool {
+                                       return e.Detail().CreatedAt.After(t)
+                               }
+                       case "<":
+                               fn = func(e *event) bool {
+                                       return e.Detail().CreatedAt.Before(t)
+                               }
+                       case "=":
+                               fn = func(e *event) bool {
+                                       return e.Detail().CreatedAt.Equal(t)
+                               }
+                       default:
+                               sess.log.WithField("operator", op).Info("bogus operator")
+                               continue
+                       }
+                       sub.funcs = append(sub.funcs, fn)
+               }
+       }
+}
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
new file mode 100644 (file)
index 0000000..701dce0
--- /dev/null
@@ -0,0 +1,10 @@
+package main
+
+import (
+       "database/sql"
+       "errors"
+)
+
+func newSessionV1(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker) (session, error) {
+       return nil, errors.New("Not implemented")
+}