"Keep storage daemon, accessible to clients on the LAN"
package_go_binary services/keep-web keep-web \
"Static web hosting service for user data stored in Arvados Keep"
+package_go_binary services/ws arvados-ws \
+ "Arvados Websocket server"
package_go_binary tools/keep-block-check keep-block-check \
"Verify that all data from one set of Keep servers to another was copied"
package_go_binary tools/keep-rsync keep-rsync \
services/crunch-run
services/crunch-dispatch-local
services/crunch-dispatch-slurm
+services/ws
sdk/cli
sdk/pam
sdk/python
sdk/go/manifest
sdk/go/blockdigest
sdk/go/streamer
+sdk/go/stats
sdk/go/crunchrunner
sdk/cwl
tools/crunchstat-summary
&& eval $(python sdk/python/tests/run_test_server.py start --auth admin) \
&& export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
&& export ARVADOS_TEST_API_INSTALLED="$$" \
+ && python sdk/python/tests/run_test_server.py start_ws \
+ && python sdk/python/tests/run_test_server.py start_nginx \
&& (env | egrep ^ARVADOS)
}
start_nginx_proxy_services() {
- echo 'Starting keepproxy, keep-web, arv-git-httpd, and nginx ssl proxy...'
+ echo 'Starting keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py start_keep_proxy \
&& python sdk/python/tests/run_test_server.py start_keep-web \
&& python sdk/python/tests/run_test_server.py start_arv-git-httpd \
+ && python sdk/python/tests/run_test_server.py start_ws \
&& python sdk/python/tests/run_test_server.py start_nginx \
&& export ARVADOS_TEST_PROXY_SERVICES=1
}
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py stop_nginx \
&& python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
+ && python sdk/python/tests/run_test_server.py stop_ws \
&& python sdk/python/tests/run_test_server.py stop_keep-web \
&& python sdk/python/tests/run_test_server.py stop_keep_proxy
fi
if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
unset ARVADOS_TEST_API_HOST
cd "$WORKSPACE" \
+ && python sdk/python/tests/run_test_server.py stop_nginx \
+ && python sdk/python/tests/run_test_server.py stop_ws \
&& python sdk/python/tests/run_test_server.py stop
fi
}
sdk/go/manifest
sdk/go/streamer
sdk/go/crunchrunner
+ sdk/go/stats
lib/crunchstat
services/arv-git-httpd
services/crunchstat
services/crunch-dispatch-local
services/crunch-dispatch-slurm
services/crunch-run
+ services/ws
tools/keep-block-check
tools/keep-exercise
tools/keep-rsync
--- /dev/null
+---
+layout: default
+navsection: installguide
+title: Install the websocket server
+...
+
+{% include 'notebox_begin_warning' %}
+
+This websocket server is an alternative to the puma server that comes with the API server. It is available as an *experimental pre-release* and is not recommended for production sites.
+
+{% include 'notebox_end' %}
+
+The arvados-ws server provides event notifications to websocket clients. It can be installed anywhere with access to Postgres database and the Arvados API server, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for additional information.
+
+By convention, we use the following hostname for the websocket service.
+
+<notextile>
+<pre><code>ws.<span class="userinput">uuid_prefix.your.domain</span></code></pre>
+</notextile>
+
+The above hostname should resolve from anywhere on the internet.
+
+h2. Install arvados-ws
+
+Typically arvados-ws runs on the same host as the API server.
+
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install arvados-ws</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install arvados-ws</span>
+</code></pre>
+</notextile>
+
+Verify that @arvados-ws@ is functional:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arvados-ws -h</span>
+Usage of arvados-ws:
+ -config path
+ path to config file (default "/etc/arvados/ws/ws.yml")
+ -dump-config
+ show current configuration and exit
+</code></pre>
+</notextile>
+
+h3. Create a configuration file
+
+Create @/etc/arvados/ws/ws.yml@ using the following template. Replace @xxxxxxxx@ with the "password you generated during database setup":install-postgresql.html#api.
+
+<notextile>
+<pre><code>Client:
+ APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
+Listen: ":<span class="userinput">9003</span>"
+Postgres:
+ dbname: arvados_production
+ host: localhost
+ password: <span class="userinput">xxxxxxxx</span>
+ user: arvados
+</code></pre>
+</notextile>
+
+h3. Start the service (option 1: systemd)
+
+If your system does not use systemd, skip this section and follow the "runit instructions":#runit instead.
+
+If your system uses systemd, the arvados-ws service should already be set up. Start it and check its status:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo systemctl restart arvados-ws</span>
+~$ <span class="userinput">sudo systemctl status arvados-ws</span>
+● arvados-ws.service - Arvados websocket server
+ Loaded: loaded (/lib/systemd/system/arvados-ws.service; enabled)
+ Active: active (running) since Tue 2016-12-06 11:20:48 EST; 10s ago
+ Docs: https://doc.arvados.org/
+ Main PID: 9421 (arvados-ws)
+ CGroup: /system.slice/arvados-ws.service
+ └─9421 /usr/bin/arvados-ws
+
+Dec 06 11:20:48 zzzzz arvados-ws[9421]: {"level":"info","msg":"started","time":"2016-12-06T11:20:48.207617188-05:00"}
+Dec 06 11:20:48 zzzzz arvados-ws[9421]: {"Listen":":9003","level":"info","msg":"listening","time":"2016-12-06T11:20:48.244956506-05:00"}
+Dec 06 11:20:48 zzzzz systemd[1]: Started Arvados websocket server.
+</code></pre>
+</notextile>
+
+If it is not running, use @journalctl@ to check logs for errors:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo journalctl -n10 -u arvados-ws</span>
+...
+Dec 06 11:12:48 zzzzz systemd[1]: Starting Arvados websocket server...
+Dec 06 11:12:48 zzzzz arvados-ws[8918]: {"level":"info","msg":"started","time":"2016-12-06T11:12:48.030496636-05:00"}
+Dec 06 11:12:48 zzzzz arvados-ws[8918]: {"error":"pq: password authentication failed for user \"arvados\"","level":"fatal","msg":"db.Ping failed","time":"2016-12-06T11:12:48.058206400-05:00"}
+</code></pre>
+</notextile>
+
+Skip ahead to "confirm the service is working":#confirm.
+
+h3(#runit). Start the service (option 2: runit)
+
+Install runit to supervise the arvados-ws daemon. {% include 'install_runit' %}
+
+Create a supervised service.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo mkdir /etc/service/arvados-ws</span>
+~$ <span class="userinput">cd /etc/service/arvados-ws</span>
+~$ <span class="userinput">sudo mkdir log log/main</span>
+~$ <span class="userinput">printf '#!/bin/sh\nexec arvados-ws 2>&1\n' | sudo tee run</span>
+~$ <span class="userinput">printf '#!/bin/sh\nexec svlogd main\n' | sudo tee log/run</span>
+~$ <span class="userinput">sudo chmod +x run log/run</span>
+~$ <span class="userinput">sudo sv exit .</span>
+~$ <span class="userinput">cd -</span>
+</code></pre>
+</notextile>
+
+Use @sv stat@ and check the log file to verify the service is running.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sv stat /etc/service/arvados-ws</span>
+run: /etc/service/arvados-ws: (pid 12520) 2s; run: log: (pid 12519) 2s
+~$ <span class="userinput">tail /etc/service/arvados-ws/log/main/current</span>
+{"level":"info","msg":"started","time":"2016-12-06T11:56:20.669171449-05:00"}
+{"Listen":":9003","level":"info","msg":"listening","time":"2016-12-06T11:56:20.708847627-05:00"}
+</code></pre>
+</notextile>
+
+h3(#confirm). Confirm the service is working
+
+Confirm the service is listening on its assigned port and responding to requests.
+
+<notextile>
+<pre><code>~$ <span class="userinput">curl http://0.0.0.0:<b>9003</b>/status.json</span>
+{"Clients":1}
+</code></pre>
+</notextile>
+
+h3. Set up a reverse proxy with SSL support
+
+The arvados-ws service will be accessible from anywhere on the internet, so we recommend using SSL for transport encryption.
+
+This is best achieved by putting a reverse proxy with SSL support in front of arvados-ws, running on port 443 and passing requests to arvados-ws on port 9003 (or whatever port you chose in your configuration file).
+
+For example, using Nginx:
+
+<notextile><pre>
+upstream arvados-ws {
+ server 127.0.0.1:<span class="userinput">9003</span>;
+}
+
+server {
+ listen <span class="userinput">[your public IP address]</span>:443 ssl;
+ server_name ws.<span class="userinput">uuid_prefix.your.domain</span>;
+
+ proxy_connect_timeout 90s;
+ proxy_read_timeout 300s;
+
+ ssl on;
+ ssl_certificate <span class="userinput"/>YOUR/PATH/TO/cert.pem</span>;
+ ssl_certificate_key <span class="userinput"/>YOUR/PATH/TO/cert.key</span>;
+
+ location / {
+ proxy_pass http://arvados-ws;
+ proxy_set_header Upgrade $http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host $host;
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ }
+}
+</pre></notextile>
+
+If Nginx is already configured to proxy @ws@ requests to puma, move that configuration out of the way or change its @server_name@ so it doesn't conflict.
+
+h3. Update API server configuration
+
+Ensure the websocket server address is correct in the API server configuration file @/etc/arvados/api/application.yml@.
+
+<notextile>
+<pre><code>websocket_address: wss://ws.<span class="userinput">uuid_prefix.your.domain</span>/websocket
+</code></pre>
+</notextile>
+
+Restart Nginx to reload the API server configuration.
+
+<notextile>
+<pre><code>$ sudo nginx -s reload</span>
+</code></pre>
+</notextile>
+
+h3. Verify DNS and proxy setup
+
+Use a host elsewhere on the Internet to confirm that your DNS, proxy, and SSL are configured correctly.
+
+<notextile>
+<pre><code>$ <span class="userinput">curl https://ws.<b>uuid_prefix.your.domain</b>/status.json</span>
+{"Clients":1}
+</code></pre>
+</notextile>
// callers who use a Client to initialize an
// arvadosclient.ArvadosClient.)
KeepServiceURIs []string `json:",omitempty"`
+
+ dd *DiscoveryDocument
}
// The default http.Client used by a Client with Insecure==true and
// DiscoveryDocument is the Arvados server's description of itself.
type DiscoveryDocument struct {
- DefaultCollectionReplication int `json:"defaultCollectionReplication"`
- BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+ BasePath string `json:"basePath"`
+ DefaultCollectionReplication int `json:"defaultCollectionReplication"`
+ BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+ Schemas map[string]Schema `json:"schemas"`
+ Resources map[string]Resource `json:"resources"`
+}
+
+type Resource struct {
+ Methods map[string]ResourceMethod `json:"methods"`
+}
+
+type ResourceMethod struct {
+ HTTPMethod string `json:"httpMethod"`
+ Path string `json:"path"`
+ Response MethodResponse `json:"response"`
+}
+
+type MethodResponse struct {
+ Ref string `json:"$ref"`
+}
+
+type Schema struct {
+ UUIDPrefix string `json:"uuidPrefix"`
}
// DiscoveryDocument returns a *DiscoveryDocument. The returned object
// should not be modified: the same object may be returned by
// subsequent calls.
func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+ if c.dd != nil {
+ return c.dd, nil
+ }
var dd DiscoveryDocument
- return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+ err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+ if err != nil {
+ return nil, err
+ }
+ c.dd = &dd
+ return c.dd, nil
+}
+
+func (c *Client) PathForUUID(method, uuid string) (string, error) {
+ if len(uuid) != 27 {
+ return "", fmt.Errorf("invalid UUID: %q", uuid)
+ }
+ dd, err := c.DiscoveryDocument()
+ if err != nil {
+ return "", err
+ }
+ infix := uuid[6:11]
+ var model string
+ for m, s := range dd.Schemas {
+ if s.UUIDPrefix == infix {
+ model = m
+ break
+ }
+ }
+ if model == "" {
+ return "", fmt.Errorf("unrecognized UUID infix: %q", infix)
+ }
+ var resource string
+ for r, rsc := range dd.Resources {
+ if rsc.Methods["get"].Response.Ref == model {
+ resource = r
+ break
+ }
+ }
+ if resource == "" {
+ return "", fmt.Errorf("no resource for model: %q", model)
+ }
+ m, ok := dd.Resources[resource].Methods[method]
+ if !ok {
+ return "", fmt.Errorf("no method %q for resource %q", method, resource)
+ }
+ path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1)
+ if path[0] == '/' {
+ path = path[1:]
+ }
+ return path, nil
}
--- /dev/null
+package arvados
+
+import (
+ "time"
+)
+
+// Log is an arvados#log record
+type Log struct {
+ ID uint64 `json:"id"`
+ UUID string `json:"uuid"`
+ ObjectUUID string `json:"object_uuid"`
+ ObjectOwnerUUID string `json:"object_owner_uuid"`
+ EventType string `json:"event_type"`
+ Properties map[string]interface{} `json:"properties"`
+ CreatedAt *time.Time `json:"created_at,omitempty"`
+}
+
+// LogList is an arvados#logList resource.
+type LogList struct {
+ Items []Log `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
}
return nil
}
+
+// Dump returns a YAML representation of cfg.
+func Dump(cfg interface{}) ([]byte, error) {
+ return yaml.Marshal(cfg)
+}
--- /dev/null
+package stats
+
+import (
+ "fmt"
+ "strconv"
+ "time"
+)
+
+// Duration is a duration that is displayed as a number of seconds in
+// fixed-point notation.
+type Duration time.Duration
+
+// MarshalJSON implements json.Marshaler.
+func (d Duration) MarshalJSON() ([]byte, error) {
+ return []byte(d.String()), nil
+}
+
+// String implements fmt.Stringer.
+func (d Duration) String() string {
+ return fmt.Sprintf("%.6f", time.Duration(d).Seconds())
+}
+
+// UnmarshalJSON implements json.Unmarshaler
+func (d *Duration) UnmarshalJSON(data []byte) error {
+ return d.Set(string(data))
+}
+
+// Value implements flag.Value
+func (d *Duration) Set(s string) error {
+ sec, err := strconv.ParseFloat(s, 64)
+ if err == nil {
+ *d = Duration(sec * float64(time.Second))
+ }
+ return err
+}
--- /dev/null
+package stats
+
+import (
+ "testing"
+ "time"
+)
+
+func TestString(t *testing.T) {
+ d := Duration(123123123123 * time.Nanosecond)
+ if s, expect := d.String(), "123.123123"; s != expect {
+ t.Errorf("got %s, expect %s", s, expect)
+ }
+}
+
+func TestSet(t *testing.T) {
+ var d Duration
+ if err := d.Set("123.456"); err != nil {
+ t.Fatal(err)
+ }
+ if got, expect := time.Duration(d).Nanoseconds(), int64(123456000000); got != expect {
+ t.Errorf("got %d, expect %d", got, expect)
+ }
+}
proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/;
}
}
+ upstream ws {
+ server localhost:{{WSPORT}};
+ }
+ server {
+ listen *:{{WSSPORT}} ssl default_server;
+ server_name ~^(?<request_host>.*)$;
+ ssl_certificate {{SSLCERT}};
+ ssl_certificate_key {{SSLKEY}};
+ location / {
+ proxy_pass http://ws;
+ proxy_set_header Upgrade $http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host $request_host:{{WSPORT}};
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ }
+ }
}
my_api_host = None
_cached_config = {}
+_cached_db_config = {}
def find_server_pid(PID_PATH, wait=10):
now = time.time()
os.makedirs(gitdir)
subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball])
+ # The nginx proxy isn't listening here yet, but we need to choose
+ # the wss:// port now so we can write the API server config file.
+ wss_port = find_available_port()
+ _setport('wss', wss_port)
+
port = find_available_port()
env = os.environ.copy()
env['RAILS_ENV'] = 'test'
- env['ARVADOS_WEBSOCKETS'] = 'yes'
+ env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
+ env.pop('ARVADOS_WEBSOCKETS', None)
env.pop('ARVADOS_TEST_API_HOST', None)
env.pop('ARVADOS_API_HOST', None)
env.pop('ARVADOS_API_HOST_INSECURE', None)
kill_server_pid(_pidfile('api'))
my_api_host = None
+def run_ws():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ stop_ws()
+ port = find_available_port()
+ conf = os.path.join(TEST_TMPDIR, 'ws.yml')
+ with open(conf, 'w') as f:
+ f.write("""
+Client:
+ APIHost: {}
+ Insecure: true
+Listen: :{}
+LogLevel: {}
+Postgres:
+ host: {}
+ dbname: {}
+ user: {}
+ password: {}
+ sslmode: require
+ """.format(os.environ['ARVADOS_API_HOST'],
+ port,
+ ('info' if os.environ.get('ARVADOS_DEBUG', '') in ['','0'] else 'debug'),
+ _dbconfig('host'),
+ _dbconfig('database'),
+ _dbconfig('username'),
+ _dbconfig('password')))
+ logf = open(_fifo2stderr('ws'), 'w')
+ ws = subprocess.Popen(
+ ["ws", "-config", conf],
+ stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+ with open(_pidfile('ws'), 'w') as f:
+ f.write(str(ws.pid))
+ _wait_until_port_listens(port)
+ _setport('ws', port)
+ return port
+
+def stop_ws():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ kill_server_pid(_pidfile('ws'))
+
def _start_keep(n, keep_args):
keep0 = tempfile.mkdtemp()
port = find_available_port()
def run_nginx():
if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
return
+ stop_nginx()
nginxconf = {}
nginxconf['KEEPWEBPORT'] = _getport('keep-web')
nginxconf['KEEPWEBDLSSLPORT'] = find_available_port()
nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
nginxconf['GITPORT'] = _getport('arv-git-httpd')
nginxconf['GITSSLPORT'] = find_available_port()
+ nginxconf['WSPORT'] = _getport('ws')
+ nginxconf['WSSPORT'] = _getport('wss')
nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
except IOError:
return 9
+def _dbconfig(key):
+ global _cached_db_config
+ if not _cached_db_config:
+ _cached_db_config = yaml.load(open(os.path.join(
+ SERVICES_SRC_DIR, 'api', 'config', 'database.yml')))
+ return _cached_db_config['test'][key]
+
def _apiconfig(key):
+ global _cached_config
if _cached_config:
return _cached_config[key]
def _load(f, required=True):
original environment.
"""
MAIN_SERVER = None
+ WS_SERVER = None
KEEP_SERVER = None
KEEP_PROXY_SERVER = None
KEEP_WEB_SERVER = None
os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
for server_kwargs, start_func, stop_func in (
(cls.MAIN_SERVER, run, reset),
+ (cls.WS_SERVER, run_ws, stop_ws),
(cls.KEEP_SERVER, run_keep, stop_keep),
(cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
(cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
if __name__ == "__main__":
actions = [
'start', 'stop',
+ 'start_ws', 'stop_ws',
'start_keep', 'stop_keep',
'start_keep_proxy', 'stop_keep_proxy',
'start_keep-web', 'stop_keep-web',
print(host)
elif args.action == 'stop':
stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
+ elif args.action == 'start_ws':
+ run_ws()
+ elif args.action == 'stop_ws':
+ stop_ws()
elif args.action == 'start_keep':
run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
elif args.action == 'stop_keep':
self.assertEqual(200, events.get(True, 5)['status'])
human = arvados.api('v1').humans().create(body={}).execute()
- log_object_uuids = []
- for i in range(0, expected):
- log_object_uuids.append(events.get(True, 5)['object_uuid'])
-
+ want_uuids = []
if expected > 0:
- self.assertIn(human['uuid'], log_object_uuids)
-
+ want_uuids.append(human['uuid'])
if expected > 1:
- self.assertIn(ancestor['uuid'], log_object_uuids)
+ want_uuids.append(ancestor['uuid'])
+ log_object_uuids = []
+ while set(want_uuids) - set(log_object_uuids):
+ log_object_uuids.append(events.get(True, 5)['object_uuid'])
- with self.assertRaises(Queue.Empty):
- # assertEqual just serves to show us what unexpected thing
- # comes out of the queue when the assertRaises fails; when
- # the test passes, this assertEqual doesn't get called.
- self.assertEqual(events.get(True, 2), None)
+ if expected < 2:
+ with self.assertRaises(Queue.Empty):
+ # assertEqual just serves to show us what unexpected
+ # thing comes out of the queue when the assertRaises
+ # fails; when the test passes, this assertEqual
+ # doesn't get called.
+ self.assertEqual(events.get(True, 2), None)
def test_subscribe_websocket(self):
self._test_subscribe(
return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
def isotz(self, offset):
- """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
- return '{:+03d}{:02d}'.format(offset/60, offset%60)
+ """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
+ return '{:+03d}:{:02d}'.format(offset/60, offset%60)
# Test websocket reconnection on (un)execpted close
def _test_websocket_reconnect(self, close_unexpected):
workbench_address: https://localhost:3001/
git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %>
git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %>
+ websocket_address: "wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"
import (
"context"
- "fmt"
"net/http"
"strings"
"time"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
log "github.com/Sirupsen/logrus"
)
}
lgr.WithFields(log.Fields{
- "timeTotal": loggedDuration(tDone.Sub(tStart)),
- "timeToStatus": loggedDuration(resp.sentHdr.Sub(tStart)),
- "timeWriteBody": loggedDuration(tDone.Sub(resp.sentHdr)),
+ "timeTotal": stats.Duration(tDone.Sub(tStart)),
+ "timeToStatus": stats.Duration(resp.sentHdr.Sub(tStart)),
+ "timeWriteBody": stats.Duration(tDone.Sub(resp.sentHdr)),
"respStatusCode": resp.Status,
"respStatus": statusText,
"respBytes": resp.Length,
}).Info("response")
}
-
-type loggedDuration time.Duration
-
-// MarshalJSON formats a duration as a number of seconds, using
-// fixed-point notation with no more than 6 decimal places.
-func (d loggedDuration) MarshalJSON() ([]byte, error) {
- return []byte(d.String()), nil
-}
-
-// String formats a duration as a number of seconds, using
-// fixed-point notation with no more than 6 decimal places.
-func (d loggedDuration) String() string {
- return fmt.Sprintf("%.6f", time.Duration(d).Seconds())
-}
--- /dev/null
+[Unit]
+Description=Arvados websocket server
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/ws/ws.yml
+
+[Service]
+Type=notify
+ExecStart=/usr/bin/arvados-ws
+Restart=always
+
+[Install]
+WantedBy=multi-user.target
--- /dev/null
+package main
+
+import (
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type wsConfig struct {
+ Client arvados.Client
+ Postgres pgConfig
+ Listen string
+ LogLevel string
+ LogFormat string
+
+ PingTimeout arvados.Duration
+ ClientEventQueue int
+ ServerEventQueue int
+}
+
+func defaultConfig() wsConfig {
+ return wsConfig{
+ Client: arvados.Client{
+ APIHost: "localhost:443",
+ },
+ Postgres: pgConfig{
+ "dbname": "arvados_production",
+ "user": "arvados",
+ "password": "xyzzy",
+ "host": "localhost",
+ "connect_timeout": "30",
+ "sslmode": "require",
+ },
+ LogLevel: "info",
+ LogFormat: "json",
+ PingTimeout: arvados.Duration(time.Minute),
+ ClientEventQueue: 64,
+ ServerEventQueue: 4,
+ }
+}
--- /dev/null
+// Arvados-ws exposes Arvados APIs (currently just one, the
+// cache-invalidation event feed at "ws://.../websocket") to
+// websocket clients.
+//
+// See https://doc.arvados.org/install/install-ws.html.
+//
+// Usage
+//
+// arvados-ws [-config /etc/arvados/ws/ws.yml] [-dump-config]
+//
+// Minimal configuration
+//
+// Client:
+// APIHost: localhost:443
+// Listen: ":1234"
+// Postgres:
+// dbname: arvados_production
+// host: localhost
+// password: xyzzy
+// user: arvados
+//
+// Options
+//
+// -config path
+//
+// Load configuration from the given file instead of the default
+// /etc/arvados/ws/ws.yml
+//
+// -dump-config
+//
+// Print the loaded configuration to stdout and exit.
+//
+// Logs
+//
+// Logs are printed to stderr, formatted as JSON.
+//
+// A log is printed each time a client connects or disconnects.
+//
+// Enable additional logs by configuring:
+//
+// LogLevel: debug
+//
+// Runtime status
+//
+// GET /debug.json responds with debug stats.
+//
+// GET /status.json responds with health check results and
+// activity/usage metrics.
+package main
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/ghodss/yaml"
+)
+
+type eventSink interface {
+ Channel() <-chan *event
+ Stop()
+}
+
+type eventSource interface {
+ NewSink() eventSink
+ DB() *sql.DB
+}
+
+type event struct {
+ LogID uint64
+ Received time.Time
+ Ready time.Time
+ Serial uint64
+
+ db *sql.DB
+ logRow *arvados.Log
+ err error
+ mtx sync.Mutex
+}
+
+// Detail returns the database row corresponding to the event. It can
+// be called safely from multiple goroutines. Only one attempt will be
+// made. If the database row cannot be retrieved, Detail returns nil.
+func (e *event) Detail() *arvados.Log {
+ e.mtx.Lock()
+ defer e.mtx.Unlock()
+ if e.logRow != nil || e.err != nil {
+ return e.logRow
+ }
+ var logRow arvados.Log
+ var propYAML []byte
+ e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
+ &logRow.ID,
+ &logRow.UUID,
+ &logRow.ObjectUUID,
+ &logRow.ObjectOwnerUUID,
+ &logRow.EventType,
+ &logRow.CreatedAt,
+ &propYAML)
+ if e.err != nil {
+ logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
+ return nil
+ }
+ e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
+ if e.err != nil {
+ logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
+ return nil
+ }
+ e.logRow = &logRow
+ return e.logRow
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/stats"
+ "github.com/lib/pq"
+)
+
+type pgConfig map[string]string
+
+func (c pgConfig) ConnectionString() string {
+ s := ""
+ for k, v := range c {
+ s += k
+ s += "='"
+ s += strings.Replace(
+ strings.Replace(v, `\`, `\\`, -1),
+ `'`, `\'`, -1)
+ s += "' "
+ }
+ return s
+}
+
+type pgEventSource struct {
+ DataSource string
+ QueueSize int
+
+ db *sql.DB
+ pqListener *pq.Listener
+ queue chan *event
+ sinks map[*pgEventSink]bool
+ setupOnce sync.Once
+ mtx sync.Mutex
+ shutdown chan error
+
+ lastQDelay time.Duration
+ eventsIn uint64
+ eventsOut uint64
+}
+
+var _ debugStatuser = (*pgEventSource)(nil)
+
+func (ps *pgEventSource) setup() {
+ ps.shutdown = make(chan error, 1)
+ ps.sinks = make(map[*pgEventSink]bool)
+
+ db, err := sql.Open("postgres", ps.DataSource)
+ if err != nil {
+ logger(nil).WithError(err).Fatal("sql.Open failed")
+ }
+ if err = db.Ping(); err != nil {
+ logger(nil).WithError(err).Fatal("db.Ping failed")
+ }
+ ps.db = db
+
+ ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+ if err != nil {
+ // Until we have a mechanism for catching up
+ // on missed events, we cannot recover from a
+ // dropped connection without breaking our
+ // promises to clients.
+ logger(nil).WithError(err).Error("listener problem")
+ ps.shutdown <- err
+ }
+ })
+ err = ps.pqListener.Listen("logs")
+ if err != nil {
+ logger(nil).WithError(err).Fatal("pq Listen failed")
+ }
+ logger(nil).Debug("pgEventSource listening")
+
+ go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+ ps.queue = make(chan *event, ps.QueueSize)
+
+ go func() {
+ for e := range ps.queue {
+ // Wait for the "select ... from logs" call to
+ // finish. This limits max concurrent queries
+ // to ps.QueueSize. Without this, max
+ // concurrent queries would be bounded by
+ // client_count X client_queue_size.
+ e.Detail()
+
+ logger(nil).
+ WithField("serial", e.Serial).
+ WithField("detail", e.Detail()).
+ Debug("event ready")
+ e.Ready = time.Now()
+ ps.lastQDelay = e.Ready.Sub(e.Received)
+
+ ps.mtx.Lock()
+ atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
+ for sink := range ps.sinks {
+ sink.channel <- e
+ }
+ ps.mtx.Unlock()
+ }
+ }()
+
+ var serial uint64
+ ticker := time.NewTicker(time.Minute)
+ defer ticker.Stop()
+ for {
+ select {
+ case err, ok := <-ps.shutdown:
+ if ok {
+ logger(nil).WithError(err).Info("shutdown")
+ }
+ close(ps.queue)
+ return
+
+ case <-ticker.C:
+ logger(nil).Debug("listener ping")
+ ps.pqListener.Ping()
+
+ case pqEvent, ok := <-ps.pqListener.Notify:
+ if !ok {
+ close(ps.queue)
+ return
+ }
+ if pqEvent.Channel != "logs" {
+ continue
+ }
+ logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+ if err != nil {
+ logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
+ continue
+ }
+ serial++
+ e := &event{
+ LogID: logID,
+ Received: time.Now(),
+ Serial: serial,
+ db: ps.db,
+ }
+ logger(nil).WithField("event", e).Debug("incoming")
+ atomic.AddUint64(&ps.eventsIn, 1)
+ ps.queue <- e
+ go e.Detail()
+ }
+ }
+}
+
+// NewSink subscribes to the event source. NewSink returns an
+// eventSink, whose Channel() method returns a channel: a pointer to
+// each subsequent event will be sent to that channel.
+//
+// The caller must ensure events are received from the sink channel as
+// quickly as possible because when one sink stops being ready, all
+// other sinks block.
+func (ps *pgEventSource) NewSink() eventSink {
+ ps.setupOnce.Do(ps.setup)
+ sink := &pgEventSink{
+ channel: make(chan *event, 1),
+ source: ps,
+ }
+ ps.mtx.Lock()
+ ps.sinks[sink] = true
+ ps.mtx.Unlock()
+ return sink
+}
+
+func (ps *pgEventSource) DB() *sql.DB {
+ ps.setupOnce.Do(ps.setup)
+ return ps.db
+}
+
+func (ps *pgEventSource) DebugStatus() interface{} {
+ ps.mtx.Lock()
+ defer ps.mtx.Unlock()
+ blocked := 0
+ for sink := range ps.sinks {
+ blocked += len(sink.channel)
+ }
+ return map[string]interface{}{
+ "EventsIn": atomic.LoadUint64(&ps.eventsIn),
+ "EventsOut": atomic.LoadUint64(&ps.eventsOut),
+ "Queue": len(ps.queue),
+ "QueueLimit": cap(ps.queue),
+ "QueueDelay": stats.Duration(ps.lastQDelay),
+ "Sinks": len(ps.sinks),
+ "SinksBlocked": blocked,
+ }
+}
+
+type pgEventSink struct {
+ channel chan *event
+ source *pgEventSource
+}
+
+func (sink *pgEventSink) Channel() <-chan *event {
+ return sink.channel
+}
+
+func (sink *pgEventSink) Stop() {
+ go func() {
+ // Ensure this sink cannot fill up and block the
+ // server-side queue (which otherwise could in turn
+ // block our mtx.Lock() here)
+ for _ = range sink.channel {
+ }
+ }()
+ sink.source.mtx.Lock()
+ delete(sink.source.sinks, sink)
+ sink.source.mtx.Unlock()
+ close(sink.channel)
+}
--- /dev/null
+package main
+
+import (
+ "context"
+ "io"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
+)
+
+type handler struct {
+ Client arvados.Client
+ PingTimeout time.Duration
+ QueueSize int
+
+ mtx sync.Mutex
+ lastDelay map[chan interface{}]stats.Duration
+ setupOnce sync.Once
+}
+
+type handlerStats struct {
+ QueueDelayNs time.Duration
+ WriteDelayNs time.Duration
+ EventBytes uint64
+ EventCount uint64
+}
+
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
+ h.setupOnce.Do(h.setup)
+
+ ctx, cancel := context.WithCancel(ws.Request().Context())
+ defer cancel()
+ log := logger(ctx)
+
+ incoming := eventSource.NewSink()
+ defer incoming.Stop()
+
+ queue := make(chan interface{}, h.QueueSize)
+ h.mtx.Lock()
+ h.lastDelay[queue] = 0
+ h.mtx.Unlock()
+ defer func() {
+ h.mtx.Lock()
+ delete(h.lastDelay, queue)
+ h.mtx.Unlock()
+ }()
+
+ sess, err := newSession(ws, queue)
+ if err != nil {
+ log.WithError(err).Error("newSession failed")
+ return
+ }
+
+ go func() {
+ buf := make([]byte, 2<<20)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
+ n, err := ws.Read(buf)
+ buf := buf[:n]
+ log.WithField("frame", string(buf[:n])).Debug("received frame")
+ if err == nil && n == cap(buf) {
+ err = errFrameTooBig
+ }
+ if err != nil {
+ if err != io.EOF {
+ log.WithError(err).Info("read error")
+ }
+ cancel()
+ return
+ }
+ err = sess.Receive(buf)
+ if err != nil {
+ log.WithError(err).Error("sess.Receive() failed")
+ cancel()
+ return
+ }
+ }
+ }()
+
+ go func() {
+ for {
+ var ok bool
+ var data interface{}
+ select {
+ case <-ctx.Done():
+ return
+ case data, ok = <-queue:
+ if !ok {
+ return
+ }
+ }
+ var e *event
+ var buf []byte
+ var err error
+ log := log
+
+ switch data := data.(type) {
+ case []byte:
+ buf = data
+ case *event:
+ e = data
+ log = log.WithField("serial", e.Serial)
+ buf, err = sess.EventMessage(e)
+ if err != nil {
+ log.WithError(err).Error("EventMessage failed")
+ cancel()
+ break
+ } else if len(buf) == 0 {
+ log.Debug("skip")
+ continue
+ }
+ default:
+ log.WithField("data", data).Error("bad object in client queue")
+ continue
+ }
+
+ log.WithField("frame", string(buf)).Debug("send event")
+ ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+ t0 := time.Now()
+ _, err = ws.Write(buf)
+ if err != nil {
+ log.WithError(err).Error("write failed")
+ cancel()
+ break
+ }
+ log.Debug("sent")
+
+ if e != nil {
+ hStats.QueueDelayNs += t0.Sub(e.Ready)
+ h.mtx.Lock()
+ h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
+ h.mtx.Unlock()
+ }
+ hStats.WriteDelayNs += time.Since(t0)
+ hStats.EventBytes += uint64(len(buf))
+ hStats.EventCount++
+ }
+ }()
+
+ // Filter incoming events against the current subscription
+ // list, and forward matching events to the outgoing message
+ // queue. Close the queue and return when the request context
+ // is done/cancelled or the incoming event stream ends. Shut
+ // down the handler if the outgoing queue fills up.
+ go func() {
+ ticker := time.NewTicker(h.PingTimeout)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ // If the outgoing queue is empty,
+ // send an empty message. This can
+ // help detect a disconnected network
+ // socket, and prevent an idle socket
+ // from being closed.
+ if len(queue) == 0 {
+ select {
+ case queue <- []byte(`{}`):
+ default:
+ }
+ }
+ continue
+ case e, ok := <-incoming.Channel():
+ if !ok {
+ cancel()
+ return
+ }
+ if !sess.Filter(e) {
+ continue
+ }
+ select {
+ case queue <- e:
+ default:
+ log.WithError(errQueueFull).Error("terminate")
+ cancel()
+ return
+ }
+ }
+ }
+ }()
+
+ <-ctx.Done()
+ return
+}
+
+func (h *handler) DebugStatus() interface{} {
+ h.mtx.Lock()
+ defer h.mtx.Unlock()
+
+ var s struct {
+ QueueCount int
+ QueueMin int
+ QueueMax int
+ QueueTotal uint64
+ QueueDelayMin stats.Duration
+ QueueDelayMax stats.Duration
+ }
+ for q, lastDelay := range h.lastDelay {
+ s.QueueCount++
+ n := len(q)
+ s.QueueTotal += uint64(n)
+ if s.QueueMax < n {
+ s.QueueMax = n
+ }
+ if s.QueueMin > n || s.QueueCount == 1 {
+ s.QueueMin = n
+ }
+ if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
+ s.QueueDelayMin = lastDelay
+ }
+ if s.QueueDelayMax < lastDelay {
+ s.QueueDelayMax = lastDelay
+ }
+ }
+ return &s
+}
+
+func (h *handler) setup() {
+ h.lastDelay = make(map[chan interface{}]stats.Duration)
+}
--- /dev/null
+package main
+
+import (
+ "context"
+
+ "github.com/Sirupsen/logrus"
+)
+
+var (
+ loggerCtxKey = new(int)
+ rootLogger = logrus.New()
+)
+
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
+// contextWithLogger returns a new child context such that
+// logger(child) returns the given logger.
+func contextWithLogger(ctx context.Context, logger *logrus.Entry) context.Context {
+ return context.WithValue(ctx, loggerCtxKey, logger)
+}
+
+// logger returns the logger suitable for the given context -- the one
+// attached by contextWithLogger() if applicable, otherwise the
+// top-level logger with no fields/values.
+func logger(ctx context.Context) *logrus.Entry {
+ if ctx != nil {
+ if logger, ok := ctx.Value(loggerCtxKey).(*logrus.Entry); ok {
+ return logger
+ }
+ }
+ return rootLogger.WithFields(nil)
+}
+
+// loggerConfig sets up logging to behave as configured.
+func loggerConfig(cfg wsConfig) {
+ lvl, err := logrus.ParseLevel(cfg.LogLevel)
+ if err != nil {
+ logrus.Fatal(err)
+ }
+ rootLogger.Level = lvl
+ switch cfg.LogFormat {
+ case "text":
+ rootLogger.Formatter = &logrus.TextFormatter{
+ FullTimestamp: true,
+ TimestampFormat: rfc3339NanoFixed,
+ }
+ case "json":
+ rootLogger.Formatter = &logrus.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ }
+ default:
+ logrus.WithField("LogFormat", cfg.LogFormat).Fatal("unknown log format")
+ }
+}
--- /dev/null
+package main
+
+import (
+ "flag"
+ "fmt"
+ "net/http"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/config"
+ "github.com/coreos/go-systemd/daemon"
+)
+
+func main() {
+ log := logger(nil)
+
+ configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+ dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
+ cfg := defaultConfig()
+ flag.Parse()
+
+ err := config.LoadFile(&cfg, *configPath)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ loggerConfig(cfg)
+
+ if *dumpConfig {
+ txt, err := config.Dump(&cfg)
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Print(string(txt))
+ return
+ }
+
+ log.Info("started")
+ eventSource := &pgEventSource{
+ DataSource: cfg.Postgres.ConnectionString(),
+ QueueSize: cfg.ServerEventQueue,
+ }
+ srv := &http.Server{
+ Addr: cfg.Listen,
+ ReadTimeout: time.Minute,
+ WriteTimeout: time.Minute,
+ MaxHeaderBytes: 1 << 20,
+ Handler: &router{
+ Config: &cfg,
+ eventSource: eventSource,
+ newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
+ },
+ }
+ // Bootstrap the eventSource by attaching a dummy subscriber
+ // and hanging up.
+ eventSource.NewSink().Stop()
+
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+ log.WithError(err).Warn("error notifying init daemon")
+ }
+
+ log.WithField("Listen", srv.Addr).Info("listening")
+ log.Fatal(srv.ListenAndServe())
+}
--- /dev/null
+package main
+
+import (
+ "net/http"
+ "net/url"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+const (
+ maxPermCacheAge = time.Hour
+ minPermCacheAge = 5 * time.Minute
+)
+
+type permChecker interface {
+ SetToken(token string)
+ Check(uuid string) (bool, error)
+}
+
+func newPermChecker(ac arvados.Client) permChecker {
+ ac.AuthToken = ""
+ return &cachingPermChecker{
+ Client: &ac,
+ cache: make(map[string]cacheEnt),
+ maxCurrent: 16,
+ }
+}
+
+type cacheEnt struct {
+ time.Time
+ allowed bool
+}
+
+type cachingPermChecker struct {
+ *arvados.Client
+ cache map[string]cacheEnt
+ maxCurrent int
+}
+
+func (pc *cachingPermChecker) SetToken(token string) {
+ pc.Client.AuthToken = token
+}
+
+func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+ logger := logger(nil).
+ WithField("token", pc.Client.AuthToken).
+ WithField("uuid", uuid)
+ pc.tidy()
+ now := time.Now()
+ if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
+ logger.WithField("allowed", perm.allowed).Debug("cache hit")
+ return perm.allowed, nil
+ }
+ var buf map[string]interface{}
+ path, err := pc.PathForUUID("get", uuid)
+ if err != nil {
+ return false, err
+ }
+ err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
+ "select": {`["uuid"]`},
+ })
+
+ var allowed bool
+ if err == nil {
+ allowed = true
+ } else if txErr, ok := err.(*arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+ allowed = false
+ } else if txErr.StatusCode == http.StatusForbidden {
+ // Some requests are expressly forbidden for reasons
+ // other than "you aren't allowed to know whether this
+ // UUID exists" (404).
+ allowed = false
+ } else {
+ logger.WithError(err).Error("lookup error")
+ return false, err
+ }
+ logger.WithField("allowed", allowed).Debug("cache miss")
+ pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
+ return allowed, nil
+}
+
+func (pc *cachingPermChecker) tidy() {
+ if len(pc.cache) <= pc.maxCurrent*2 {
+ return
+ }
+ tooOld := time.Now().Add(-minPermCacheAge)
+ for uuid, t := range pc.cache {
+ if t.Before(tooOld) {
+ delete(pc.cache, uuid)
+ }
+ }
+ pc.maxCurrent = len(pc.cache)
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "encoding/json"
+ "io"
+ "net/http"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/Sirupsen/logrus"
+ "golang.org/x/net/websocket"
+)
+
+type wsConn interface {
+ io.ReadWriter
+ Request() *http.Request
+ SetReadDeadline(time.Time) error
+ SetWriteDeadline(time.Time) error
+}
+
+type router struct {
+ Config *wsConfig
+ eventSource eventSource
+ newPermChecker func() permChecker
+
+ handler *handler
+ mux *http.ServeMux
+ setupOnce sync.Once
+
+ lastReqID int64
+ lastReqMtx sync.Mutex
+
+ status routerDebugStatus
+}
+
+type routerDebugStatus struct {
+ ReqsReceived int64
+ ReqsActive int64
+}
+
+type debugStatuser interface {
+ DebugStatus() interface{}
+}
+
+type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker) (session, error)
+
+func (rtr *router) setup() {
+ rtr.handler = &handler{
+ PingTimeout: rtr.Config.PingTimeout.Duration(),
+ QueueSize: rtr.Config.ClientEventQueue,
+ }
+ rtr.mux = http.NewServeMux()
+ rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
+ rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
+ rtr.mux.HandleFunc("/debug.json", jsonHandler(rtr.DebugStatus))
+ rtr.mux.HandleFunc("/status.json", jsonHandler(rtr.Status))
+}
+
+func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
+ return &websocket.Server{
+ Handshake: func(c *websocket.Config, r *http.Request) error {
+ return nil
+ },
+ Handler: websocket.Handler(func(ws *websocket.Conn) {
+ t0 := time.Now()
+ log := logger(ws.Request().Context())
+ log.Info("connected")
+
+ stats := rtr.handler.Handle(ws, rtr.eventSource,
+ func(ws wsConn, sendq chan<- interface{}) (session, error) {
+ return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker())
+ })
+
+ log.WithFields(logrus.Fields{
+ "elapsed": time.Now().Sub(t0).Seconds(),
+ "stats": stats,
+ }).Info("disconnect")
+ ws.Close()
+ }),
+ }
+}
+
+func (rtr *router) newReqID() string {
+ rtr.lastReqMtx.Lock()
+ defer rtr.lastReqMtx.Unlock()
+ id := time.Now().UnixNano()
+ if id <= rtr.lastReqID {
+ id = rtr.lastReqID + 1
+ }
+ return strconv.FormatInt(id, 36)
+}
+
+func (rtr *router) DebugStatus() interface{} {
+ s := map[string]interface{}{
+ "HTTP": rtr.status,
+ "Outgoing": rtr.handler.DebugStatus(),
+ }
+ if es, ok := rtr.eventSource.(debugStatuser); ok {
+ s["EventSource"] = es.DebugStatus()
+ }
+ return s
+}
+
+func (rtr *router) Status() interface{} {
+ return map[string]interface{}{
+ "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
+ }
+}
+
+func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ rtr.setupOnce.Do(rtr.setup)
+ atomic.AddInt64(&rtr.status.ReqsReceived, 1)
+ atomic.AddInt64(&rtr.status.ReqsActive, 1)
+ defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
+
+ logger := logger(req.Context()).
+ WithField("RequestID", rtr.newReqID())
+ ctx := contextWithLogger(req.Context(), logger)
+ req = req.WithContext(ctx)
+ logger.WithFields(logrus.Fields{
+ "remoteAddr": req.RemoteAddr,
+ "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+ }).Info("accept request")
+ rtr.mux.ServeHTTP(resp, req)
+}
+
+func jsonHandler(fn func() interface{}) http.HandlerFunc {
+ return func(resp http.ResponseWriter, req *http.Request) {
+ logger := logger(req.Context())
+ resp.Header().Set("Content-Type", "application/json")
+ enc := json.NewEncoder(resp)
+ err := enc.Encode(fn())
+ if err != nil {
+ msg := "encode failed"
+ logger.WithError(err).Error(msg)
+ http.Error(resp, msg, http.StatusInternalServerError)
+ }
+ }
+}
--- /dev/null
+package main
+
+type session interface {
+ // Receive processes a message received from the client. If a
+ // non-nil error is returned, the connection will be
+ // terminated.
+ Receive([]byte) error
+
+ // Filter returns true if the event should be queued for
+ // sending to the client. It should return as fast as
+ // possible, and must not block.
+ Filter(*event) bool
+
+ // EventMessage encodes the given event (from the front of the
+ // queue) into a form suitable to send to the client. If a
+ // non-nil error is returned, the connection is terminated. If
+ // the returned buffer is empty, nothing is sent to the client
+ // and the event is not counted in statistics.
+ //
+ // Unlike Filter, EventMessage can block without affecting
+ // other connections. If EventMessage is slow, additional
+ // incoming events will be queued. If the event queue fills
+ // up, the connection will be dropped.
+ EventMessage(*event) ([]byte, error)
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "encoding/json"
+ "errors"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/Sirupsen/logrus"
+)
+
+var (
+ errQueueFull = errors.New("client queue full")
+ errFrameTooBig = errors.New("frame too big")
+
+ sendObjectAttributes = []string{"state", "name"}
+
+ v0subscribeOK = []byte(`{"status":200}`)
+ v0subscribeFail = []byte(`{"status":400}`)
+)
+
+type v0session struct {
+ ws wsConn
+ sendq chan<- interface{}
+ db *sql.DB
+ permChecker permChecker
+ subscriptions []v0subscribe
+ lastMsgID uint64
+ log *logrus.Entry
+ mtx sync.Mutex
+ setupOnce sync.Once
+}
+
+func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker) (session, error) {
+ sess := &v0session{
+ sendq: sendq,
+ ws: ws,
+ db: db,
+ permChecker: pc,
+ log: logger(ws.Request().Context()),
+ }
+
+ err := ws.Request().ParseForm()
+ if err != nil {
+ sess.log.WithError(err).Error("ParseForm failed")
+ return nil, err
+ }
+ token := ws.Request().Form.Get("api_token")
+ sess.permChecker.SetToken(token)
+ sess.log.WithField("token", token).Debug("set token")
+
+ return sess, nil
+}
+
+func (sess *v0session) Receive(buf []byte) error {
+ var sub v0subscribe
+ if err := json.Unmarshal(buf, &sub); err != nil {
+ sess.log.WithError(err).Info("invalid message from client")
+ } else if sub.Method == "subscribe" {
+ sub.prepare(sess)
+ sess.log.WithField("sub", sub).Debug("sub prepared")
+ sess.sendq <- v0subscribeOK
+ sess.mtx.Lock()
+ sess.subscriptions = append(sess.subscriptions, sub)
+ sess.mtx.Unlock()
+ sub.sendOldEvents(sess)
+ return nil
+ } else {
+ sess.log.WithField("Method", sub.Method).Info("unknown method")
+ }
+ sess.sendq <- v0subscribeFail
+ return nil
+}
+
+func (sess *v0session) EventMessage(e *event) ([]byte, error) {
+ detail := e.Detail()
+ if detail == nil {
+ return nil, nil
+ }
+
+ ok, err := sess.permChecker.Check(detail.ObjectUUID)
+ if err != nil || !ok {
+ return nil, err
+ }
+
+ msg := map[string]interface{}{
+ "msgID": atomic.AddUint64(&sess.lastMsgID, 1),
+ "id": detail.ID,
+ "uuid": detail.UUID,
+ "object_uuid": detail.ObjectUUID,
+ "object_owner_uuid": detail.ObjectOwnerUUID,
+ "event_type": detail.EventType,
+ }
+ if detail.Properties != nil && detail.Properties["text"] != nil {
+ msg["properties"] = detail.Properties
+ } else {
+ msgProps := map[string]map[string]interface{}{}
+ for _, ak := range []string{"old_attributes", "new_attributes"} {
+ eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
+ if !ok {
+ continue
+ }
+ msgAttrs := map[string]interface{}{}
+ for _, k := range sendObjectAttributes {
+ if v, ok := eventAttrs[k]; ok {
+ msgAttrs[k] = v
+ }
+ }
+ msgProps[ak] = msgAttrs
+ }
+ msg["properties"] = msgProps
+ }
+ return json.Marshal(msg)
+}
+
+func (sess *v0session) Filter(e *event) bool {
+ sess.mtx.Lock()
+ defer sess.mtx.Unlock()
+ for _, sub := range sess.subscriptions {
+ if sub.match(sess, e) {
+ return true
+ }
+ }
+ return false
+}
+
+func (sub *v0subscribe) sendOldEvents(sess *v0session) {
+ if sub.LastLogID == 0 {
+ return
+ }
+ sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
+ // Here we do a "select id" query and queue an event for every
+ // log since the given ID, then use (*event)Detail() to
+ // retrieve the whole row and decide whether to send it. This
+ // approach is very inefficient if the subscriber asks for
+ // last_log_id==1, even if the filters end up matching very
+ // few events.
+ //
+ // To mitigate this, filter on "created > 10 minutes ago" when
+ // retrieving the list of old event IDs to consider.
+ rows, err := sess.db.Query(
+ `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
+ sub.LastLogID,
+ time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
+ if err != nil {
+ sess.log.WithError(err).Error("db.Query failed")
+ return
+ }
+ for rows.Next() {
+ var id uint64
+ err := rows.Scan(&id)
+ if err != nil {
+ sess.log.WithError(err).Error("row Scan failed")
+ continue
+ }
+ for len(sess.sendq)*2 > cap(sess.sendq) {
+ // Ugly... but if we fill up the whole client
+ // queue with a backlog of old events, a
+ // single new event will overflow it and
+ // terminate the connection, and then the
+ // client will probably reconnect and do the
+ // same thing all over again.
+ time.Sleep(100 * time.Millisecond)
+ }
+ now := time.Now()
+ e := &event{
+ LogID: id,
+ Received: now,
+ Ready: now,
+ db: sess.db,
+ }
+ if sub.match(sess, e) {
+ select {
+ case sess.sendq <- e:
+ case <-sess.ws.Request().Context().Done():
+ return
+ }
+ }
+ }
+ if err := rows.Err(); err != nil {
+ sess.log.WithError(err).Error("db.Query failed")
+ }
+}
+
+type v0subscribe struct {
+ Method string
+ Filters []v0filter
+ LastLogID int64 `json:"last_log_id"`
+
+ funcs []func(*event) bool
+}
+
+type v0filter [3]interface{}
+
+func (sub *v0subscribe) match(sess *v0session, e *event) bool {
+ log := sess.log.WithField("LogID", e.LogID)
+ detail := e.Detail()
+ if detail == nil {
+ log.Error("match failed, no detail")
+ return false
+ }
+ log = log.WithField("funcs", len(sub.funcs))
+ for i, f := range sub.funcs {
+ if !f(e) {
+ log.WithField("func", i).Debug("match failed")
+ return false
+ }
+ }
+ log.Debug("match passed")
+ return true
+}
+
+func (sub *v0subscribe) prepare(sess *v0session) {
+ for _, f := range sub.Filters {
+ if len(f) != 3 {
+ continue
+ }
+ if col, ok := f[0].(string); ok && col == "event_type" {
+ op, ok := f[1].(string)
+ if !ok || op != "in" {
+ continue
+ }
+ arr, ok := f[2].([]interface{})
+ if !ok {
+ continue
+ }
+ var strs []string
+ for _, s := range arr {
+ if s, ok := s.(string); ok {
+ strs = append(strs, s)
+ }
+ }
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ for _, s := range strs {
+ if s == e.Detail().EventType {
+ return true
+ }
+ }
+ return false
+ })
+ } else if ok && col == "created_at" {
+ op, ok := f[1].(string)
+ if !ok {
+ continue
+ }
+ tstr, ok := f[2].(string)
+ if !ok {
+ continue
+ }
+ t, err := time.Parse(time.RFC3339Nano, tstr)
+ if err != nil {
+ sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
+ continue
+ }
+ var fn func(*event) bool
+ switch op {
+ case ">=":
+ fn = func(e *event) bool {
+ return !e.Detail().CreatedAt.Before(t)
+ }
+ case "<=":
+ fn = func(e *event) bool {
+ return !e.Detail().CreatedAt.After(t)
+ }
+ case ">":
+ fn = func(e *event) bool {
+ return e.Detail().CreatedAt.After(t)
+ }
+ case "<":
+ fn = func(e *event) bool {
+ return e.Detail().CreatedAt.Before(t)
+ }
+ case "=":
+ fn = func(e *event) bool {
+ return e.Detail().CreatedAt.Equal(t)
+ }
+ default:
+ sess.log.WithField("operator", op).Info("bogus operator")
+ continue
+ }
+ sub.funcs = append(sub.funcs, fn)
+ }
+ }
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "errors"
+)
+
+func newSessionV1(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker) (session, error) {
+ return nil, errors.New("Not implemented")
+}