From: Tom Clegg Date: Tue, 15 Nov 2016 18:25:03 +0000 (-0500) Subject: 8460: Merge branch 'master' into 8460-websocket-go X-Git-Tag: 1.1.0~538^2~49 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/5a420beeb6c64efc3ca0ef13d4ab9ac6c654c3ab?hp=3af6db5dc4e2f08b2ebb49a82109c4325ad7fcc4 8460: Merge branch 'master' into 8460-websocket-go --- diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh index 320f9d445c..0a4559f95b 100755 --- a/build/run-build-packages.sh +++ b/build/run-build-packages.sh @@ -427,6 +427,8 @@ package_go_binary services/keepstore keepstore \ "Keep storage daemon, accessible to clients on the LAN" package_go_binary services/keep-web keep-web \ "Static web hosting service for user data stored in Arvados Keep" +package_go_binary services/ws arvados-ws \ + "Arvados Websocket server" package_go_binary tools/keep-block-check keep-block-check \ "Verify that all data from one set of Keep servers to another was copied" package_go_binary tools/keep-rsync keep-rsync \ diff --git a/build/run-tests.sh b/build/run-tests.sh index 8959cfbe09..c771bdc0ad 100755 --- a/build/run-tests.sh +++ b/build/run-tests.sh @@ -79,6 +79,7 @@ services/nodemanager services/crunch-run services/crunch-dispatch-local services/crunch-dispatch-slurm +services/ws sdk/cli sdk/pam sdk/python @@ -268,10 +269,11 @@ start_api() { } start_nginx_proxy_services() { - echo 'Starting keepproxy, keep-web, arv-git-httpd, and nginx ssl proxy...' + echo 'Starting keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...' cd "$WORKSPACE" \ && python sdk/python/tests/run_test_server.py start_keep_proxy \ && python sdk/python/tests/run_test_server.py start_keep-web \ + && python sdk/python/tests/run_test_server.py start_ws \ && python sdk/python/tests/run_test_server.py start_arv-git-httpd \ && python sdk/python/tests/run_test_server.py start_nginx \ && export ARVADOS_TEST_PROXY_SERVICES=1 @@ -283,6 +285,7 @@ stop_services() { cd "$WORKSPACE" \ && python sdk/python/tests/run_test_server.py stop_nginx \ && python sdk/python/tests/run_test_server.py stop_arv-git-httpd \ + && python sdk/python/tests/run_test_server.py stop_ws \ && python sdk/python/tests/run_test_server.py stop_keep-web \ && python sdk/python/tests/run_test_server.py stop_keep_proxy fi @@ -765,6 +768,7 @@ gostuff=( services/crunch-dispatch-local services/crunch-dispatch-slurm services/crunch-run + services/ws tools/keep-block-check tools/keep-exercise tools/keep-rsync diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go index 36f4eb52ae..0c18d38974 100644 --- a/sdk/go/arvados/client.go +++ b/sdk/go/arvados/client.go @@ -41,6 +41,8 @@ type Client struct { // callers who use a Client to initialize an // arvadosclient.ArvadosClient.) KeepServiceURIs []string `json:",omitempty"` + + dd *DiscoveryDocument } // The default http.Client used by a Client with Insecure==true and @@ -198,14 +200,83 @@ func (c *Client) apiURL(path string) string { // DiscoveryDocument is the Arvados server's description of itself. type DiscoveryDocument struct { - DefaultCollectionReplication int `json:"defaultCollectionReplication"` - BlobSignatureTTL int64 `json:"blobSignatureTtl"` + BasePath string `json:"basePath"` + DefaultCollectionReplication int `json:"defaultCollectionReplication"` + BlobSignatureTTL int64 `json:"blobSignatureTtl"` + Schemas map[string]Schema `json:"schemas"` + Resources map[string]Resource `json:"resources"` +} + +type Resource struct { + Methods map[string]ResourceMethod `json:"methods"` +} + +type ResourceMethod struct { + HTTPMethod string `json:"httpMethod"` + Path string `json:"path"` + Response MethodResponse `json:"response"` +} + +type MethodResponse struct { + Ref string `json:"$ref"` +} + +type Schema struct { + UUIDPrefix string `json:"uuidPrefix"` } // DiscoveryDocument returns a *DiscoveryDocument. The returned object // should not be modified: the same object may be returned by // subsequent calls. func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) { + if c.dd != nil { + return c.dd, nil + } var dd DiscoveryDocument - return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil) + err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil) + if err != nil { + return nil, err + } + c.dd = &dd + return c.dd, nil +} + +func (c *Client) PathForUUID(method, uuid string) (string, error) { + if len(uuid) != 27 { + return "", fmt.Errorf("invalid UUID: %q", uuid) + } + dd, err := c.DiscoveryDocument() + if err != nil { + return "", err + } + infix := uuid[6:11] + var model string + for m, s := range dd.Schemas { + if s.UUIDPrefix == infix { + model = m + break + } + } + if model == "" { + return "", fmt.Errorf("unrecognized UUID infix: %q", infix) + } + var resource string + for r, rsc := range dd.Resources { + if rsc.Methods["get"].Response.Ref == model { + resource = r + break + } + } + if resource == "" { + return "", fmt.Errorf("no resource for model: %q", model) + } + m, ok := dd.Resources[resource].Methods[method] + if !ok { + return "", fmt.Errorf("no method %q for resource %q", method, resource) + } + path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1) + if path[0] == '/' { + path = path[1:] + } + return path, nil } diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go new file mode 100644 index 0000000000..caea04c82a --- /dev/null +++ b/sdk/go/arvados/log.go @@ -0,0 +1,16 @@ +package arvados + +import ( + "time" +) + +// Log is an arvados#log record +type Log struct { + ID uint64 `json:"id"` + UUID string `json:"uuid"` + ObjectUUID string `json:"object_uuid"` + ObjectOwnerUUID string `json:"object_owner_uuid"` + EventType string `json:"event_type"` + Properties map[string]interface{} `json:"properties"` + CreatedAt *time.Time `json:"created_at,omitempty"` +} diff --git a/sdk/go/config/load.go b/sdk/go/config/load.go index 9c65d65e84..2bbb440fb3 100644 --- a/sdk/go/config/load.go +++ b/sdk/go/config/load.go @@ -22,3 +22,8 @@ func LoadFile(cfg interface{}, configPath string) error { } return nil } + +// Dump returns a YAML representation of cfg. +func Dump(cfg interface{}) ([]byte, error) { + return yaml.Marshal(cfg) +} diff --git a/sdk/python/tests/nginx.conf b/sdk/python/tests/nginx.conf index 2b8b6ca1c4..006604077d 100644 --- a/sdk/python/tests/nginx.conf +++ b/sdk/python/tests/nginx.conf @@ -54,4 +54,20 @@ http { proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/; } } + upstream ws { + server localhost:{{WSPORT}}; + } + server { + listen *:{{WSSPORT}} ssl default_server; + server_name ~^(?.*)$; + ssl_certificate {{SSLCERT}}; + ssl_certificate_key {{SSLKEY}}; + location / { + proxy_pass http://ws; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_set_header Host $request_host:{{WSPORT}}; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + } } diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py index 642b7ccbad..5ef5e2a9f5 100644 --- a/sdk/python/tests/run_test_server.py +++ b/sdk/python/tests/run_test_server.py @@ -44,6 +44,7 @@ if not os.path.exists(TEST_TMPDIR): my_api_host = None _cached_config = {} +_cached_db_config = {} def find_server_pid(PID_PATH, wait=10): now = time.time() @@ -284,10 +285,16 @@ def run(leave_running_atexit=False): os.makedirs(gitdir) subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball]) + # The nginx proxy isn't listening here yet, but we need to choose + # the wss:// port now so we can write the API server config file. + wss_port = find_available_port() + _setport('wss', wss_port) + port = find_available_port() env = os.environ.copy() env['RAILS_ENV'] = 'test' - env['ARVADOS_WEBSOCKETS'] = 'yes' + env['ARVADOS_TEST_WSS_PORT'] = str(wss_port) + env.pop('ARVADOS_WEBSOCKETS', None) env.pop('ARVADOS_TEST_API_HOST', None) env.pop('ARVADOS_API_HOST', None) env.pop('ARVADOS_API_HOST_INSECURE', None) @@ -360,6 +367,45 @@ def stop(force=False): kill_server_pid(_pidfile('api')) my_api_host = None +def run_ws(): + if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ: + return + stop_ws() + port = find_available_port() + conf = os.path.join(TEST_TMPDIR, 'ws.yml') + with open(conf, 'w') as f: + f.write(""" +Client: + APIHost: {} + Insecure: true +Listen: :{} +Postgres: + host: {} + dbname: {} + user: {} + password: {} + sslmode: require + """.format(os.environ['ARVADOS_API_HOST'], + port, + _dbconfig('host'), + _dbconfig('database'), + _dbconfig('username'), + _dbconfig('password'))) + logf = open(_fifo2stderr('ws'), 'w') + ws = subprocess.Popen( + ["ws", "-config", conf], + stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True) + with open(_pidfile('ws'), 'w') as f: + f.write(str(ws.pid)) + _wait_until_port_listens(port) + _setport('ws', port) + return port + +def stop_ws(): + if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ: + return + kill_server_pid(_pidfile('ws')) + def _start_keep(n, keep_args): keep0 = tempfile.mkdtemp() port = find_available_port() @@ -545,6 +591,8 @@ def run_nginx(): nginxconf['KEEPPROXYSSLPORT'] = find_available_port() nginxconf['GITPORT'] = _getport('arv-git-httpd') nginxconf['GITSSLPORT'] = find_available_port() + nginxconf['WSPORT'] = _getport('ws') + nginxconf['WSSPORT'] = _getport('wss') nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem') nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key') nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log') @@ -593,7 +641,15 @@ def _getport(program): except IOError: return 9 +def _dbconfig(key): + global _cached_db_config + if not _cached_db_config: + _cached_db_config = yaml.load(open(os.path.join( + SERVICES_SRC_DIR, 'api', 'config', 'database.yml'))) + return _cached_db_config['test'][key] + def _apiconfig(key): + global _cached_config if _cached_config: return _cached_config[key] def _load(f, required=True): @@ -647,6 +703,7 @@ class TestCaseWithServers(unittest.TestCase): original environment. """ MAIN_SERVER = None + WS_SERVER = None KEEP_SERVER = None KEEP_PROXY_SERVER = None KEEP_WEB_SERVER = None @@ -667,6 +724,7 @@ class TestCaseWithServers(unittest.TestCase): os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None) for server_kwargs, start_func, stop_func in ( (cls.MAIN_SERVER, run, reset), + (cls.WS_SERVER, run_ws, stop_ws), (cls.KEEP_SERVER, run_keep, stop_keep), (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy), (cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)): @@ -693,6 +751,7 @@ class TestCaseWithServers(unittest.TestCase): if __name__ == "__main__": actions = [ 'start', 'stop', + 'start_ws', 'stop_ws', 'start_keep', 'stop_keep', 'start_keep_proxy', 'stop_keep_proxy', 'start_keep-web', 'stop_keep-web', @@ -725,6 +784,10 @@ if __name__ == "__main__": print(host) elif args.action == 'stop': stop(force=('ARVADOS_TEST_API_HOST' not in os.environ)) + elif args.action == 'start_ws': + run_ws() + elif args.action == 'stop_ws': + stop_ws() elif args.action == 'start_keep': run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers) elif args.action == 'stop_keep': diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml index a9aa953f9f..ab560d7f6b 100644 --- a/services/api/config/application.default.yml +++ b/services/api/config/application.default.yml @@ -444,3 +444,4 @@ test: workbench_address: https://localhost:3001/ git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %> git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %> + websocket_address: "wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket" diff --git a/services/ws/config.go b/services/ws/config.go new file mode 100644 index 0000000000..9c2e80a172 --- /dev/null +++ b/services/ws/config.go @@ -0,0 +1,37 @@ +package main + +import ( + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +type Config struct { + Client arvados.Client + Postgres pgConfig + Listen string + Debug bool + + PingTimeout arvados.Duration + ClientEventQueue int + ServerEventQueue int +} + +func DefaultConfig() Config { + return Config{ + Client: arvados.Client{ + APIHost: "localhost:443", + }, + Postgres: pgConfig{ + "dbname": "arvados_production", + "user": "arvados", + "password": "xyzzy", + "host": "localhost", + "connect_timeout": "30", + "sslmode": "require", + }, + PingTimeout: arvados.Duration(time.Minute), + ClientEventQueue: 64, + ServerEventQueue: 4, + } +} diff --git a/services/ws/event.go b/services/ws/event.go new file mode 100644 index 0000000000..09c9d0f0a4 --- /dev/null +++ b/services/ws/event.go @@ -0,0 +1,63 @@ +package main + +import ( + "database/sql" + "log" + "sync" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/ghodss/yaml" +) + +type eventSink interface { + Channel() <-chan *event + Stop() +} + +type eventSource interface { + NewSink() eventSink +} + +type event struct { + LogID uint64 + Received time.Time + Serial uint64 + + db *sql.DB + logRow *arvados.Log + err error + mtx sync.Mutex +} + +// Detail returns the database row corresponding to the event. It can +// be called safely from multiple goroutines. Only one attempt will be +// made. If the database row cannot be retrieved, Detail returns nil. +func (e *event) Detail() *arvados.Log { + e.mtx.Lock() + defer e.mtx.Unlock() + if e.logRow != nil || e.err != nil { + return e.logRow + } + var logRow arvados.Log + var propYAML []byte + e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan( + &logRow.ID, + &logRow.UUID, + &logRow.ObjectUUID, + &logRow.ObjectOwnerUUID, + &logRow.EventType, + &logRow.CreatedAt, + &propYAML) + if e.err != nil { + log.Printf("retrieving log row %d: %s", e.LogID, e.err) + return nil + } + e.err = yaml.Unmarshal(propYAML, &logRow.Properties) + if e.err != nil { + log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err) + return nil + } + e.logRow = &logRow + return e.logRow +} diff --git a/services/ws/handler.go b/services/ws/handler.go new file mode 100644 index 0000000000..1c9d5ba61d --- /dev/null +++ b/services/ws/handler.go @@ -0,0 +1,156 @@ +package main + +import ( + "encoding/json" + "io" + "log" + "net/http" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +type wsConn interface { + io.ReadWriter + Request() *http.Request + SetReadDeadline(time.Time) error + SetWriteDeadline(time.Time) error +} + +type handler struct { + Client arvados.Client + PingTimeout time.Duration + QueueSize int + NewSession func(wsConn, arvados.Client) (session, error) +} + +func (h *handler) Handle(ws wsConn, events <-chan *event) { + sess, err := h.NewSession(ws, h.Client) + if err != nil { + log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err) + return + } + + queue := make(chan *event, h.QueueSize) + + stopped := make(chan struct{}) + stop := make(chan error, 5) + + go func() { + buf := make([]byte, 2<<20) + for { + select { + case <-stopped: + return + default: + } + ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour)) + n, err := ws.Read(buf) + sess.debugLogf("received frame: %q", buf[:n]) + if err == nil && n == len(buf) { + err = errFrameTooBig + } + if err != nil { + if err != io.EOF { + sess.debugLogf("handler: read: %s", err) + } + stop <- err + return + } + msg := make(map[string]interface{}) + err = json.Unmarshal(buf[:n], &msg) + if err != nil { + sess.debugLogf("handler: unmarshal: %s", err) + stop <- err + return + } + sess.Receive(msg, buf[:n]) + } + }() + + go func() { + for e := range queue { + if e == nil { + ws.SetWriteDeadline(time.Now().Add(h.PingTimeout)) + _, err := ws.Write([]byte("{}")) + if err != nil { + sess.debugLogf("handler: write {}: %s", err) + stop <- err + break + } + continue + } + + buf, err := sess.EventMessage(e) + if err != nil { + sess.debugLogf("EventMessage %d: err %s", err) + stop <- err + break + } else if len(buf) == 0 { + sess.debugLogf("EventMessage %d: skip", e.Serial) + continue + } + + sess.debugLogf("handler: send event %d: %q", e.Serial, buf) + ws.SetWriteDeadline(time.Now().Add(h.PingTimeout)) + _, err = ws.Write(buf) + if err != nil { + sess.debugLogf("handler: write: %s", err) + stop <- err + break + } + sess.debugLogf("handler: sent event %d", e.Serial) + } + for _ = range queue { + } + }() + + // Filter incoming events against the current subscription + // list, and forward matching events to the outgoing message + // queue. Close the queue and return when the "stopped" + // channel closes or the incoming event stream ends. Shut down + // the handler if the outgoing queue fills up. + go func() { + send := func(e *event) { + select { + case queue <- e: + default: + stop <- errQueueFull + } + } + + ticker := time.NewTicker(h.PingTimeout) + defer ticker.Stop() + + for { + var e *event + var ok bool + select { + case <-stopped: + close(queue) + return + case <-ticker.C: + // If the outgoing queue is empty, + // send an empty message. This can + // help detect a disconnected network + // socket, and prevent an idle socket + // from being closed. + if len(queue) == 0 { + queue <- nil + } + continue + case e, ok = <-events: + if !ok { + close(queue) + return + } + } + if sess.Filter(e) { + send(e) + } + } + }() + + <-stop + close(stopped) +} diff --git a/services/ws/main.go b/services/ws/main.go new file mode 100644 index 0000000000..a143ae935b --- /dev/null +++ b/services/ws/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "flag" + "fmt" + "log" + "net/http" + "time" + + "git.curoverse.com/arvados.git/sdk/go/config" +) + +var debugLogf = func(string, ...interface{}) {} + +func main() { + configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file") + dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit") + cfg := DefaultConfig() + flag.Parse() + + err := config.LoadFile(&cfg, *configPath) + if err != nil { + log.Fatal(err) + } + if cfg.Debug { + debugLogf = log.Printf + } + + if *dumpConfig { + txt, err := config.Dump(&cfg) + if err != nil { + log.Fatal(err) + } + fmt.Print(string(txt)) + return + } + + eventSource := &pgEventSource{ + DataSource: cfg.Postgres.ConnectionString(), + QueueSize: cfg.ServerEventQueue, + } + srv := &http.Server{ + Addr: cfg.Listen, + ReadTimeout: time.Minute, + WriteTimeout: time.Minute, + MaxHeaderBytes: 1 << 20, + Handler: &router{ + Config: &cfg, + eventSource: eventSource, + }, + } + eventSource.NewSink().Stop() + + log.Printf("listening at %s", srv.Addr) + log.Fatal(srv.ListenAndServe()) +} diff --git a/services/ws/permission.go b/services/ws/permission.go new file mode 100644 index 0000000000..b2b962c7ce --- /dev/null +++ b/services/ws/permission.go @@ -0,0 +1,78 @@ +package main + +import ( + "net/http" + "net/url" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +const ( + maxPermCacheAge = time.Hour + minPermCacheAge = 5 * time.Minute +) + +type permChecker interface { + SetToken(token string) + Check(uuid string) (bool, error) +} + +func NewPermChecker(ac arvados.Client) permChecker { + ac.AuthToken = "" + return &cachingPermChecker{ + Client: &ac, + cache: make(map[string]time.Time), + maxCurrent: 16, + } +} + +type cachingPermChecker struct { + *arvados.Client + cache map[string]time.Time + maxCurrent int +} + +func (pc *cachingPermChecker) SetToken(token string) { + pc.Client.AuthToken = token +} + +func (pc *cachingPermChecker) Check(uuid string) (bool, error) { + pc.tidy() + if t, ok := pc.cache[uuid]; ok && time.Now().Sub(t) < maxPermCacheAge { + debugLogf("perm ok (cached): %+q %+q", pc.Client.AuthToken, uuid) + return true, nil + } + var buf map[string]interface{} + path, err := pc.PathForUUID("get", uuid) + if err != nil { + return false, err + } + err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{ + "select": {`["uuid"]`}, + }) + if err, ok := err.(arvados.TransactionError); ok && err.StatusCode == http.StatusNotFound { + debugLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err) + return false, nil + } + if err != nil { + debugLogf("perm !ok: %+q %+q", pc.Client.AuthToken, uuid) + return false, err + } + debugLogf("perm ok: %+q %+q", pc.Client.AuthToken, uuid) + pc.cache[uuid] = time.Now() + return true, nil +} + +func (pc *cachingPermChecker) tidy() { + if len(pc.cache) <= pc.maxCurrent*2 { + return + } + tooOld := time.Now().Add(-minPermCacheAge) + for uuid, t := range pc.cache { + if t.Before(tooOld) { + delete(pc.cache, uuid) + } + } + pc.maxCurrent = len(pc.cache) +} diff --git a/services/ws/pg.go b/services/ws/pg.go new file mode 100644 index 0000000000..a5af9f765b --- /dev/null +++ b/services/ws/pg.go @@ -0,0 +1,176 @@ +package main + +import ( + "database/sql" + "fmt" + "log" + "strconv" + "strings" + "sync" + "time" + + "github.com/lib/pq" +) + +type pgConfig map[string]string + +func (c pgConfig) ConnectionString() string { + s := "" + for k, v := range c { + s += k + s += "='" + s += strings.Replace( + strings.Replace(v, `\`, `\\`, -1), + `'`, `\'`, -1) + s += "' " + } + return s +} + +type pgEventSource struct { + DataSource string + QueueSize int + + db *sql.DB + pqListener *pq.Listener + sinks map[*pgEventSink]bool + setupOnce sync.Once + mtx sync.Mutex + shutdown chan error +} + +func (ps *pgEventSource) setup() { + ps.shutdown = make(chan error, 1) + ps.sinks = make(map[*pgEventSink]bool) + + db, err := sql.Open("postgres", ps.DataSource) + if err != nil { + log.Fatalf("sql.Open: %s", err) + } + if err = db.Ping(); err != nil { + log.Fatalf("db.Ping: %s", err) + } + ps.db = db + + ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) { + if err != nil { + // Until we have a mechanism for catching up + // on missed events, we cannot recover from a + // dropped connection without breaking our + // promises to clients. + ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err) + } + }) + err = ps.pqListener.Listen("logs") + if err != nil { + log.Fatal(err) + } + debugLogf("pgEventSource listening") + + go ps.run() +} + +func (ps *pgEventSource) run() { + eventQueue := make(chan *event, ps.QueueSize) + + go func() { + for e := range eventQueue { + // Wait for the "select ... from logs" call to + // finish. This limits max concurrent queries + // to ps.QueueSize. Without this, max + // concurrent queries would be bounded by + // client_count X client_queue_size. + e.Detail() + debugLogf("event %d detail %+v", e.Serial, e.Detail()) + ps.mtx.Lock() + for sink := range ps.sinks { + sink.channel <- e + } + ps.mtx.Unlock() + } + }() + + var serial uint64 + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case err, ok := <-ps.shutdown: + if ok { + debugLogf("shutdown on error: %s", err) + } + close(eventQueue) + return + + case <-ticker.C: + debugLogf("pgEventSource listener ping") + ps.pqListener.Ping() + + case pqEvent, ok := <-ps.pqListener.Notify: + if !ok { + close(eventQueue) + return + } + if pqEvent.Channel != "logs" { + continue + } + logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64) + if err != nil { + log.Printf("bad notify payload: %+v", pqEvent) + continue + } + serial++ + e := &event{ + LogID: logID, + Received: time.Now(), + Serial: serial, + db: ps.db, + } + debugLogf("event %d %+v", e.Serial, e) + eventQueue <- e + go e.Detail() + } + } +} + +// NewSink subscribes to the event source. NewSink returns an +// eventSink, whose Channel() method returns a channel: a pointer to +// each subsequent event will be sent to that channel. +// +// The caller must ensure events are received from the sink channel as +// quickly as possible because when one sink stops being ready, all +// other sinks block. +func (ps *pgEventSource) NewSink() eventSink { + ps.setupOnce.Do(ps.setup) + sink := &pgEventSink{ + channel: make(chan *event, 1), + source: ps, + } + ps.mtx.Lock() + ps.sinks[sink] = true + ps.mtx.Unlock() + return sink +} + +type pgEventSink struct { + channel chan *event + source *pgEventSource +} + +func (sink *pgEventSink) Channel() <-chan *event { + return sink.channel +} + +func (sink *pgEventSink) Stop() { + go func() { + // Ensure this sink cannot fill up and block the + // server-side queue (which otherwise could in turn + // block our mtx.Lock() here) + for _ = range sink.channel { + } + }() + sink.source.mtx.Lock() + delete(sink.source.sinks, sink) + sink.source.mtx.Unlock() + close(sink.channel) +} diff --git a/services/ws/router.go b/services/ws/router.go new file mode 100644 index 0000000000..69654cbe52 --- /dev/null +++ b/services/ws/router.go @@ -0,0 +1,74 @@ +package main + +import ( + "encoding/json" + "log" + "net/http" + "sync" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "golang.org/x/net/websocket" +) + +type router struct { + Config *Config + + eventSource eventSource + mux *http.ServeMux + setupOnce sync.Once +} + +func (rtr *router) setup() { + rtr.mux = http.NewServeMux() + rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0)) + rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1)) +} + +func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session, error)) *websocket.Server { + handler := &handler{ + Client: rtr.Config.Client, + PingTimeout: rtr.Config.PingTimeout.Duration(), + QueueSize: rtr.Config.ClientEventQueue, + NewSession: newSession, + } + return &websocket.Server{ + Handshake: func(c *websocket.Config, r *http.Request) error { + return nil + }, + Handler: websocket.Handler(func(ws *websocket.Conn) { + sink := rtr.eventSource.NewSink() + handler.Handle(ws, sink.Channel()) + sink.Stop() + ws.Close() + }), + } +} + +func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + rtr.setupOnce.Do(rtr.setup) + t0 := time.Now() + reqLog(map[string]interface{}{ + "Connect": req.RemoteAddr, + "RemoteAddr": req.RemoteAddr, + "X-Forwarded-For": req.Header.Get("X-Forwarded-For"), + "Time": t0.UTC(), + }) + rtr.mux.ServeHTTP(resp, req) + t1 := time.Now() + reqLog(map[string]interface{}{ + "Disconnect": req.RemoteAddr, + "RemoteAddr": req.RemoteAddr, + "X-Forwarded-For": req.Header.Get("X-Forwarded-For"), + "Time": t1.UTC(), + "Elapsed": time.Now().Sub(t0).Seconds(), + }) +} + +func reqLog(m map[string]interface{}) { + j, err := json.Marshal(m) + if err != nil { + log.Fatal(err) + } + log.Print(string(j)) +} diff --git a/services/ws/session.go b/services/ws/session.go new file mode 100644 index 0000000000..98164e3549 --- /dev/null +++ b/services/ws/session.go @@ -0,0 +1,8 @@ +package main + +type session interface { + Receive(map[string]interface{}, []byte) + EventMessage(*event) ([]byte, error) + Filter(*event) bool + debugLogf(string, ...interface{}) +} diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go new file mode 100644 index 0000000000..467d156ee5 --- /dev/null +++ b/services/ws/session_v0.go @@ -0,0 +1,163 @@ +package main + +import ( + "encoding/json" + "errors" + "log" + "sync" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +var ( + errQueueFull = errors.New("client queue full") + errFrameTooBig = errors.New("frame too big") +) + +type sessionV0 struct { + ws wsConn + permChecker permChecker + subscribed map[string]bool + eventTypes map[string]bool + mtx sync.Mutex + setupOnce sync.Once +} + +type v0subscribe struct { + Method string + Filters []v0filter +} + +type v0filter []interface{} + +func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) { + sess := &sessionV0{ + ws: ws, + permChecker: NewPermChecker(ac), + subscribed: make(map[string]bool), + eventTypes: make(map[string]bool), + } + + err := ws.Request().ParseForm() + if err != nil { + log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err) + return nil, err + } + token := ws.Request().Form.Get("api_token") + sess.permChecker.SetToken(token) + sess.debugLogf("token = %+q", token) + + return sess, nil +} + +func (sess *sessionV0) debugLogf(s string, args ...interface{}) { + args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...) + debugLogf("%s "+s, args...) +} + +// If every client subscription message includes filters consisting +// only of [["event_type","in",...]] then send only the requested +// event types. Otherwise, clear sess.eventTypes and send all event +// types from now on. +func (sess *sessionV0) checkFilters(filters []v0filter) { + if sess.eventTypes == nil { + // Already received a subscription request without + // event_type filters. + return + } + eventTypes := sess.eventTypes + sess.eventTypes = nil + if len(filters) == 0 { + return + } + useFilters := false + for _, f := range filters { + col, ok := f[0].(string) + if !ok || col != "event_type" { + continue + } + op, ok := f[1].(string) + if !ok || op != "in" { + return + } + arr, ok := f[2].([]interface{}) + if !ok { + return + } + useFilters = true + for _, s := range arr { + if s, ok := s.(string); ok { + eventTypes[s] = true + } else { + return + } + } + } + if useFilters { + sess.debugLogf("eventTypes %+v", eventTypes) + sess.eventTypes = eventTypes + } +} + +func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) { + sess.debugLogf("received message: %+v", msg) + var sub v0subscribe + if err := json.Unmarshal(buf, &sub); err != nil { + sess.debugLogf("ignored unrecognized request: %s", err) + return + } + if sub.Method == "subscribe" { + sess.debugLogf("subscribing to *") + sess.mtx.Lock() + sess.checkFilters(sub.Filters) + sess.subscribed["*"] = true + sess.mtx.Unlock() + } +} + +func (sess *sessionV0) EventMessage(e *event) ([]byte, error) { + detail := e.Detail() + if detail == nil { + return nil, nil + } + + ok, err := sess.permChecker.Check(detail.ObjectUUID) + if err != nil || !ok { + return nil, err + } + + msg := map[string]interface{}{ + "msgID": e.Serial, + "id": detail.ID, + "uuid": detail.UUID, + "object_uuid": detail.ObjectUUID, + "object_owner_uuid": detail.ObjectOwnerUUID, + "event_type": detail.EventType, + } + if detail.Properties != nil && detail.Properties["text"] != nil { + msg["properties"] = detail.Properties + } + return json.Marshal(msg) +} + +func (sess *sessionV0) Filter(e *event) bool { + detail := e.Detail() + sess.mtx.Lock() + defer sess.mtx.Unlock() + switch { + case sess.eventTypes != nil && detail == nil: + return false + case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]: + return false + case sess.subscribed["*"]: + return true + case detail == nil: + return false + case sess.subscribed[detail.ObjectUUID]: + return true + case sess.subscribed[detail.ObjectOwnerUUID]: + return true + default: + return false + } +} diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go new file mode 100644 index 0000000000..bc09ed0db7 --- /dev/null +++ b/services/ws/session_v1.go @@ -0,0 +1,11 @@ +package main + +import ( + "errors" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +func NewSessionV1(ws wsConn, ac arvados.Client) (session, error) { + return nil, errors.New("Not implemented") +}