"Keep storage daemon, accessible to clients on the LAN"
package_go_binary services/keep-web keep-web \
"Static web hosting service for user data stored in Arvados Keep"
+package_go_binary services/ws arvados-ws \
+ "Arvados Websocket server"
package_go_binary tools/keep-block-check keep-block-check \
"Verify that all data from one set of Keep servers to another was copied"
package_go_binary tools/keep-rsync keep-rsync \
services/crunch-run
services/crunch-dispatch-local
services/crunch-dispatch-slurm
+services/ws
sdk/cli
sdk/pam
sdk/python
}
start_nginx_proxy_services() {
- echo 'Starting keepproxy, keep-web, arv-git-httpd, and nginx ssl proxy...'
+ echo 'Starting keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py start_keep_proxy \
&& python sdk/python/tests/run_test_server.py start_keep-web \
+ && python sdk/python/tests/run_test_server.py start_ws \
&& python sdk/python/tests/run_test_server.py start_arv-git-httpd \
&& python sdk/python/tests/run_test_server.py start_nginx \
&& export ARVADOS_TEST_PROXY_SERVICES=1
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py stop_nginx \
&& python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
+ && python sdk/python/tests/run_test_server.py stop_ws \
&& python sdk/python/tests/run_test_server.py stop_keep-web \
&& python sdk/python/tests/run_test_server.py stop_keep_proxy
fi
services/crunch-dispatch-local
services/crunch-dispatch-slurm
services/crunch-run
+ services/ws
tools/keep-block-check
tools/keep-exercise
tools/keep-rsync
// callers who use a Client to initialize an
// arvadosclient.ArvadosClient.)
KeepServiceURIs []string `json:",omitempty"`
+
+ dd *DiscoveryDocument
}
// The default http.Client used by a Client with Insecure==true and
// DiscoveryDocument is the Arvados server's description of itself.
type DiscoveryDocument struct {
- DefaultCollectionReplication int `json:"defaultCollectionReplication"`
- BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+ BasePath string `json:"basePath"`
+ DefaultCollectionReplication int `json:"defaultCollectionReplication"`
+ BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+ Schemas map[string]Schema `json:"schemas"`
+ Resources map[string]Resource `json:"resources"`
+}
+
+type Resource struct {
+ Methods map[string]ResourceMethod `json:"methods"`
+}
+
+type ResourceMethod struct {
+ HTTPMethod string `json:"httpMethod"`
+ Path string `json:"path"`
+ Response MethodResponse `json:"response"`
+}
+
+type MethodResponse struct {
+ Ref string `json:"$ref"`
+}
+
+type Schema struct {
+ UUIDPrefix string `json:"uuidPrefix"`
}
// DiscoveryDocument returns a *DiscoveryDocument. The returned object
// should not be modified: the same object may be returned by
// subsequent calls.
func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+ if c.dd != nil {
+ return c.dd, nil
+ }
var dd DiscoveryDocument
- return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+ err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+ if err != nil {
+ return nil, err
+ }
+ c.dd = &dd
+ return c.dd, nil
+}
+
+func (c *Client) PathForUUID(method, uuid string) (string, error) {
+ if len(uuid) != 27 {
+ return "", fmt.Errorf("invalid UUID: %q", uuid)
+ }
+ dd, err := c.DiscoveryDocument()
+ if err != nil {
+ return "", err
+ }
+ infix := uuid[6:11]
+ var model string
+ for m, s := range dd.Schemas {
+ if s.UUIDPrefix == infix {
+ model = m
+ break
+ }
+ }
+ if model == "" {
+ return "", fmt.Errorf("unrecognized UUID infix: %q", infix)
+ }
+ var resource string
+ for r, rsc := range dd.Resources {
+ if rsc.Methods["get"].Response.Ref == model {
+ resource = r
+ break
+ }
+ }
+ if resource == "" {
+ return "", fmt.Errorf("no resource for model: %q", model)
+ }
+ m, ok := dd.Resources[resource].Methods[method]
+ if !ok {
+ return "", fmt.Errorf("no method %q for resource %q", method, resource)
+ }
+ path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1)
+ if path[0] == '/' {
+ path = path[1:]
+ }
+ return path, nil
}
--- /dev/null
+package arvados
+
+import (
+ "time"
+)
+
+// Log is an arvados#log record
+type Log struct {
+ ID uint64 `json:"id"`
+ UUID string `json:"uuid"`
+ ObjectUUID string `json:"object_uuid"`
+ ObjectOwnerUUID string `json:"object_owner_uuid"`
+ EventType string `json:"event_type"`
+ Properties map[string]interface{} `json:"properties"`
+ CreatedAt *time.Time `json:"created_at,omitempty"`
+}
}
return nil
}
+
+// Dump returns a YAML representation of cfg.
+func Dump(cfg interface{}) ([]byte, error) {
+ return yaml.Marshal(cfg)
+}
proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/;
}
}
+ upstream ws {
+ server localhost:{{WSPORT}};
+ }
+ server {
+ listen *:{{WSSPORT}} ssl default_server;
+ server_name ~^(?<request_host>.*)$;
+ ssl_certificate {{SSLCERT}};
+ ssl_certificate_key {{SSLKEY}};
+ location / {
+ proxy_pass http://ws;
+ proxy_set_header Upgrade $http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host $request_host:{{WSPORT}};
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ }
+ }
}
my_api_host = None
_cached_config = {}
+_cached_db_config = {}
def find_server_pid(PID_PATH, wait=10):
now = time.time()
os.makedirs(gitdir)
subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball])
+ # The nginx proxy isn't listening here yet, but we need to choose
+ # the wss:// port now so we can write the API server config file.
+ wss_port = find_available_port()
+ _setport('wss', wss_port)
+
port = find_available_port()
env = os.environ.copy()
env['RAILS_ENV'] = 'test'
- env['ARVADOS_WEBSOCKETS'] = 'yes'
+ env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
+ env.pop('ARVADOS_WEBSOCKETS', None)
env.pop('ARVADOS_TEST_API_HOST', None)
env.pop('ARVADOS_API_HOST', None)
env.pop('ARVADOS_API_HOST_INSECURE', None)
kill_server_pid(_pidfile('api'))
my_api_host = None
+def run_ws():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ stop_ws()
+ port = find_available_port()
+ conf = os.path.join(TEST_TMPDIR, 'ws.yml')
+ with open(conf, 'w') as f:
+ f.write("""
+Client:
+ APIHost: {}
+ Insecure: true
+Listen: :{}
+Postgres:
+ host: {}
+ dbname: {}
+ user: {}
+ password: {}
+ sslmode: require
+ """.format(os.environ['ARVADOS_API_HOST'],
+ port,
+ _dbconfig('host'),
+ _dbconfig('database'),
+ _dbconfig('username'),
+ _dbconfig('password')))
+ logf = open(_fifo2stderr('ws'), 'w')
+ ws = subprocess.Popen(
+ ["ws", "-config", conf],
+ stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+ with open(_pidfile('ws'), 'w') as f:
+ f.write(str(ws.pid))
+ _wait_until_port_listens(port)
+ _setport('ws', port)
+ return port
+
+def stop_ws():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ kill_server_pid(_pidfile('ws'))
+
def _start_keep(n, keep_args):
keep0 = tempfile.mkdtemp()
port = find_available_port()
nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
nginxconf['GITPORT'] = _getport('arv-git-httpd')
nginxconf['GITSSLPORT'] = find_available_port()
+ nginxconf['WSPORT'] = _getport('ws')
+ nginxconf['WSSPORT'] = _getport('wss')
nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
except IOError:
return 9
+def _dbconfig(key):
+ global _cached_db_config
+ if not _cached_db_config:
+ _cached_db_config = yaml.load(open(os.path.join(
+ SERVICES_SRC_DIR, 'api', 'config', 'database.yml')))
+ return _cached_db_config['test'][key]
+
def _apiconfig(key):
+ global _cached_config
if _cached_config:
return _cached_config[key]
def _load(f, required=True):
original environment.
"""
MAIN_SERVER = None
+ WS_SERVER = None
KEEP_SERVER = None
KEEP_PROXY_SERVER = None
KEEP_WEB_SERVER = None
os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
for server_kwargs, start_func, stop_func in (
(cls.MAIN_SERVER, run, reset),
+ (cls.WS_SERVER, run_ws, stop_ws),
(cls.KEEP_SERVER, run_keep, stop_keep),
(cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
(cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
if __name__ == "__main__":
actions = [
'start', 'stop',
+ 'start_ws', 'stop_ws',
'start_keep', 'stop_keep',
'start_keep_proxy', 'stop_keep_proxy',
'start_keep-web', 'stop_keep-web',
print(host)
elif args.action == 'stop':
stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
+ elif args.action == 'start_ws':
+ run_ws()
+ elif args.action == 'stop_ws':
+ stop_ws()
elif args.action == 'start_keep':
run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
elif args.action == 'stop_keep':
workbench_address: https://localhost:3001/
git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %>
git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %>
+ websocket_address: "wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"
--- /dev/null
+package main
+
+import (
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type Config struct {
+ Client arvados.Client
+ Postgres pgConfig
+ Listen string
+ Debug bool
+
+ PingTimeout arvados.Duration
+ ClientEventQueue int
+ ServerEventQueue int
+}
+
+func DefaultConfig() Config {
+ return Config{
+ Client: arvados.Client{
+ APIHost: "localhost:443",
+ },
+ Postgres: pgConfig{
+ "dbname": "arvados_production",
+ "user": "arvados",
+ "password": "xyzzy",
+ "host": "localhost",
+ "connect_timeout": "30",
+ "sslmode": "require",
+ },
+ PingTimeout: arvados.Duration(time.Minute),
+ ClientEventQueue: 64,
+ ServerEventQueue: 4,
+ }
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "log"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/ghodss/yaml"
+)
+
+type eventSink interface {
+ Channel() <-chan *event
+ Stop()
+}
+
+type eventSource interface {
+ NewSink() eventSink
+}
+
+type event struct {
+ LogID uint64
+ Received time.Time
+ Serial uint64
+
+ db *sql.DB
+ logRow *arvados.Log
+ err error
+ mtx sync.Mutex
+}
+
+// Detail returns the database row corresponding to the event. It can
+// be called safely from multiple goroutines. Only one attempt will be
+// made. If the database row cannot be retrieved, Detail returns nil.
+func (e *event) Detail() *arvados.Log {
+ e.mtx.Lock()
+ defer e.mtx.Unlock()
+ if e.logRow != nil || e.err != nil {
+ return e.logRow
+ }
+ var logRow arvados.Log
+ var propYAML []byte
+ e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
+ &logRow.ID,
+ &logRow.UUID,
+ &logRow.ObjectUUID,
+ &logRow.ObjectOwnerUUID,
+ &logRow.EventType,
+ &logRow.CreatedAt,
+ &propYAML)
+ if e.err != nil {
+ log.Printf("retrieving log row %d: %s", e.LogID, e.err)
+ return nil
+ }
+ e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
+ if e.err != nil {
+ log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err)
+ return nil
+ }
+ e.logRow = &logRow
+ return e.logRow
+}
--- /dev/null
+package main
+
+import (
+ "encoding/json"
+ "io"
+ "log"
+ "net/http"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type wsConn interface {
+ io.ReadWriter
+ Request() *http.Request
+ SetReadDeadline(time.Time) error
+ SetWriteDeadline(time.Time) error
+}
+
+type handler struct {
+ Client arvados.Client
+ PingTimeout time.Duration
+ QueueSize int
+ NewSession func(wsConn, arvados.Client) (session, error)
+}
+
+func (h *handler) Handle(ws wsConn, events <-chan *event) {
+ sess, err := h.NewSession(ws, h.Client)
+ if err != nil {
+ log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
+ return
+ }
+
+ queue := make(chan *event, h.QueueSize)
+
+ stopped := make(chan struct{})
+ stop := make(chan error, 5)
+
+ go func() {
+ buf := make([]byte, 2<<20)
+ for {
+ select {
+ case <-stopped:
+ return
+ default:
+ }
+ ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
+ n, err := ws.Read(buf)
+ sess.debugLogf("received frame: %q", buf[:n])
+ if err == nil && n == len(buf) {
+ err = errFrameTooBig
+ }
+ if err != nil {
+ if err != io.EOF {
+ sess.debugLogf("handler: read: %s", err)
+ }
+ stop <- err
+ return
+ }
+ msg := make(map[string]interface{})
+ err = json.Unmarshal(buf[:n], &msg)
+ if err != nil {
+ sess.debugLogf("handler: unmarshal: %s", err)
+ stop <- err
+ return
+ }
+ sess.Receive(msg, buf[:n])
+ }
+ }()
+
+ go func() {
+ for e := range queue {
+ if e == nil {
+ ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+ _, err := ws.Write([]byte("{}"))
+ if err != nil {
+ sess.debugLogf("handler: write {}: %s", err)
+ stop <- err
+ break
+ }
+ continue
+ }
+
+ buf, err := sess.EventMessage(e)
+ if err != nil {
+ sess.debugLogf("EventMessage %d: err %s", err)
+ stop <- err
+ break
+ } else if len(buf) == 0 {
+ sess.debugLogf("EventMessage %d: skip", e.Serial)
+ continue
+ }
+
+ sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
+ ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+ _, err = ws.Write(buf)
+ if err != nil {
+ sess.debugLogf("handler: write: %s", err)
+ stop <- err
+ break
+ }
+ sess.debugLogf("handler: sent event %d", e.Serial)
+ }
+ for _ = range queue {
+ }
+ }()
+
+ // Filter incoming events against the current subscription
+ // list, and forward matching events to the outgoing message
+ // queue. Close the queue and return when the "stopped"
+ // channel closes or the incoming event stream ends. Shut down
+ // the handler if the outgoing queue fills up.
+ go func() {
+ send := func(e *event) {
+ select {
+ case queue <- e:
+ default:
+ stop <- errQueueFull
+ }
+ }
+
+ ticker := time.NewTicker(h.PingTimeout)
+ defer ticker.Stop()
+
+ for {
+ var e *event
+ var ok bool
+ select {
+ case <-stopped:
+ close(queue)
+ return
+ case <-ticker.C:
+ // If the outgoing queue is empty,
+ // send an empty message. This can
+ // help detect a disconnected network
+ // socket, and prevent an idle socket
+ // from being closed.
+ if len(queue) == 0 {
+ queue <- nil
+ }
+ continue
+ case e, ok = <-events:
+ if !ok {
+ close(queue)
+ return
+ }
+ }
+ if sess.Filter(e) {
+ send(e)
+ }
+ }
+ }()
+
+ <-stop
+ close(stopped)
+}
--- /dev/null
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "net/http"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/config"
+)
+
+var debugLogf = func(string, ...interface{}) {}
+
+func main() {
+ configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+ dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
+ cfg := DefaultConfig()
+ flag.Parse()
+
+ err := config.LoadFile(&cfg, *configPath)
+ if err != nil {
+ log.Fatal(err)
+ }
+ if cfg.Debug {
+ debugLogf = log.Printf
+ }
+
+ if *dumpConfig {
+ txt, err := config.Dump(&cfg)
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Print(string(txt))
+ return
+ }
+
+ eventSource := &pgEventSource{
+ DataSource: cfg.Postgres.ConnectionString(),
+ QueueSize: cfg.ServerEventQueue,
+ }
+ srv := &http.Server{
+ Addr: cfg.Listen,
+ ReadTimeout: time.Minute,
+ WriteTimeout: time.Minute,
+ MaxHeaderBytes: 1 << 20,
+ Handler: &router{
+ Config: &cfg,
+ eventSource: eventSource,
+ },
+ }
+ eventSource.NewSink().Stop()
+
+ log.Printf("listening at %s", srv.Addr)
+ log.Fatal(srv.ListenAndServe())
+}
--- /dev/null
+package main
+
+import (
+ "net/http"
+ "net/url"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+const (
+ maxPermCacheAge = time.Hour
+ minPermCacheAge = 5 * time.Minute
+)
+
+type permChecker interface {
+ SetToken(token string)
+ Check(uuid string) (bool, error)
+}
+
+func NewPermChecker(ac arvados.Client) permChecker {
+ ac.AuthToken = ""
+ return &cachingPermChecker{
+ Client: &ac,
+ cache: make(map[string]time.Time),
+ maxCurrent: 16,
+ }
+}
+
+type cachingPermChecker struct {
+ *arvados.Client
+ cache map[string]time.Time
+ maxCurrent int
+}
+
+func (pc *cachingPermChecker) SetToken(token string) {
+ pc.Client.AuthToken = token
+}
+
+func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+ pc.tidy()
+ if t, ok := pc.cache[uuid]; ok && time.Now().Sub(t) < maxPermCacheAge {
+ debugLogf("perm ok (cached): %+q %+q", pc.Client.AuthToken, uuid)
+ return true, nil
+ }
+ var buf map[string]interface{}
+ path, err := pc.PathForUUID("get", uuid)
+ if err != nil {
+ return false, err
+ }
+ err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
+ "select": {`["uuid"]`},
+ })
+ if err, ok := err.(arvados.TransactionError); ok && err.StatusCode == http.StatusNotFound {
+ debugLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
+ return false, nil
+ }
+ if err != nil {
+ debugLogf("perm !ok: %+q %+q", pc.Client.AuthToken, uuid)
+ return false, err
+ }
+ debugLogf("perm ok: %+q %+q", pc.Client.AuthToken, uuid)
+ pc.cache[uuid] = time.Now()
+ return true, nil
+}
+
+func (pc *cachingPermChecker) tidy() {
+ if len(pc.cache) <= pc.maxCurrent*2 {
+ return
+ }
+ tooOld := time.Now().Add(-minPermCacheAge)
+ for uuid, t := range pc.cache {
+ if t.Before(tooOld) {
+ delete(pc.cache, uuid)
+ }
+ }
+ pc.maxCurrent = len(pc.cache)
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "fmt"
+ "log"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/lib/pq"
+)
+
+type pgConfig map[string]string
+
+func (c pgConfig) ConnectionString() string {
+ s := ""
+ for k, v := range c {
+ s += k
+ s += "='"
+ s += strings.Replace(
+ strings.Replace(v, `\`, `\\`, -1),
+ `'`, `\'`, -1)
+ s += "' "
+ }
+ return s
+}
+
+type pgEventSource struct {
+ DataSource string
+ QueueSize int
+
+ db *sql.DB
+ pqListener *pq.Listener
+ sinks map[*pgEventSink]bool
+ setupOnce sync.Once
+ mtx sync.Mutex
+ shutdown chan error
+}
+
+func (ps *pgEventSource) setup() {
+ ps.shutdown = make(chan error, 1)
+ ps.sinks = make(map[*pgEventSink]bool)
+
+ db, err := sql.Open("postgres", ps.DataSource)
+ if err != nil {
+ log.Fatalf("sql.Open: %s", err)
+ }
+ if err = db.Ping(); err != nil {
+ log.Fatalf("db.Ping: %s", err)
+ }
+ ps.db = db
+
+ ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+ if err != nil {
+ // Until we have a mechanism for catching up
+ // on missed events, we cannot recover from a
+ // dropped connection without breaking our
+ // promises to clients.
+ ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
+ }
+ })
+ err = ps.pqListener.Listen("logs")
+ if err != nil {
+ log.Fatal(err)
+ }
+ debugLogf("pgEventSource listening")
+
+ go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+ eventQueue := make(chan *event, ps.QueueSize)
+
+ go func() {
+ for e := range eventQueue {
+ // Wait for the "select ... from logs" call to
+ // finish. This limits max concurrent queries
+ // to ps.QueueSize. Without this, max
+ // concurrent queries would be bounded by
+ // client_count X client_queue_size.
+ e.Detail()
+ debugLogf("event %d detail %+v", e.Serial, e.Detail())
+ ps.mtx.Lock()
+ for sink := range ps.sinks {
+ sink.channel <- e
+ }
+ ps.mtx.Unlock()
+ }
+ }()
+
+ var serial uint64
+ ticker := time.NewTicker(time.Minute)
+ defer ticker.Stop()
+ for {
+ select {
+ case err, ok := <-ps.shutdown:
+ if ok {
+ debugLogf("shutdown on error: %s", err)
+ }
+ close(eventQueue)
+ return
+
+ case <-ticker.C:
+ debugLogf("pgEventSource listener ping")
+ ps.pqListener.Ping()
+
+ case pqEvent, ok := <-ps.pqListener.Notify:
+ if !ok {
+ close(eventQueue)
+ return
+ }
+ if pqEvent.Channel != "logs" {
+ continue
+ }
+ logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+ if err != nil {
+ log.Printf("bad notify payload: %+v", pqEvent)
+ continue
+ }
+ serial++
+ e := &event{
+ LogID: logID,
+ Received: time.Now(),
+ Serial: serial,
+ db: ps.db,
+ }
+ debugLogf("event %d %+v", e.Serial, e)
+ eventQueue <- e
+ go e.Detail()
+ }
+ }
+}
+
+// NewSink subscribes to the event source. NewSink returns an
+// eventSink, whose Channel() method returns a channel: a pointer to
+// each subsequent event will be sent to that channel.
+//
+// The caller must ensure events are received from the sink channel as
+// quickly as possible because when one sink stops being ready, all
+// other sinks block.
+func (ps *pgEventSource) NewSink() eventSink {
+ ps.setupOnce.Do(ps.setup)
+ sink := &pgEventSink{
+ channel: make(chan *event, 1),
+ source: ps,
+ }
+ ps.mtx.Lock()
+ ps.sinks[sink] = true
+ ps.mtx.Unlock()
+ return sink
+}
+
+type pgEventSink struct {
+ channel chan *event
+ source *pgEventSource
+}
+
+func (sink *pgEventSink) Channel() <-chan *event {
+ return sink.channel
+}
+
+func (sink *pgEventSink) Stop() {
+ go func() {
+ // Ensure this sink cannot fill up and block the
+ // server-side queue (which otherwise could in turn
+ // block our mtx.Lock() here)
+ for _ = range sink.channel {
+ }
+ }()
+ sink.source.mtx.Lock()
+ delete(sink.source.sinks, sink)
+ sink.source.mtx.Unlock()
+ close(sink.channel)
+}
--- /dev/null
+package main
+
+import (
+ "encoding/json"
+ "log"
+ "net/http"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "golang.org/x/net/websocket"
+)
+
+type router struct {
+ Config *Config
+
+ eventSource eventSource
+ mux *http.ServeMux
+ setupOnce sync.Once
+}
+
+func (rtr *router) setup() {
+ rtr.mux = http.NewServeMux()
+ rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
+ rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
+}
+
+func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session, error)) *websocket.Server {
+ handler := &handler{
+ Client: rtr.Config.Client,
+ PingTimeout: rtr.Config.PingTimeout.Duration(),
+ QueueSize: rtr.Config.ClientEventQueue,
+ NewSession: newSession,
+ }
+ return &websocket.Server{
+ Handshake: func(c *websocket.Config, r *http.Request) error {
+ return nil
+ },
+ Handler: websocket.Handler(func(ws *websocket.Conn) {
+ sink := rtr.eventSource.NewSink()
+ handler.Handle(ws, sink.Channel())
+ sink.Stop()
+ ws.Close()
+ }),
+ }
+}
+
+func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ rtr.setupOnce.Do(rtr.setup)
+ t0 := time.Now()
+ reqLog(map[string]interface{}{
+ "Connect": req.RemoteAddr,
+ "RemoteAddr": req.RemoteAddr,
+ "X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
+ "Time": t0.UTC(),
+ })
+ rtr.mux.ServeHTTP(resp, req)
+ t1 := time.Now()
+ reqLog(map[string]interface{}{
+ "Disconnect": req.RemoteAddr,
+ "RemoteAddr": req.RemoteAddr,
+ "X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
+ "Time": t1.UTC(),
+ "Elapsed": time.Now().Sub(t0).Seconds(),
+ })
+}
+
+func reqLog(m map[string]interface{}) {
+ j, err := json.Marshal(m)
+ if err != nil {
+ log.Fatal(err)
+ }
+ log.Print(string(j))
+}
--- /dev/null
+package main
+
+type session interface {
+ Receive(map[string]interface{}, []byte)
+ EventMessage(*event) ([]byte, error)
+ Filter(*event) bool
+ debugLogf(string, ...interface{})
+}
--- /dev/null
+package main
+
+import (
+ "encoding/json"
+ "errors"
+ "log"
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var (
+ errQueueFull = errors.New("client queue full")
+ errFrameTooBig = errors.New("frame too big")
+)
+
+type sessionV0 struct {
+ ws wsConn
+ permChecker permChecker
+ subscribed map[string]bool
+ eventTypes map[string]bool
+ mtx sync.Mutex
+ setupOnce sync.Once
+}
+
+type v0subscribe struct {
+ Method string
+ Filters []v0filter
+}
+
+type v0filter []interface{}
+
+func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
+ sess := &sessionV0{
+ ws: ws,
+ permChecker: NewPermChecker(ac),
+ subscribed: make(map[string]bool),
+ eventTypes: make(map[string]bool),
+ }
+
+ err := ws.Request().ParseForm()
+ if err != nil {
+ log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+ return nil, err
+ }
+ token := ws.Request().Form.Get("api_token")
+ sess.permChecker.SetToken(token)
+ sess.debugLogf("token = %+q", token)
+
+ return sess, nil
+}
+
+func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
+ args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
+ debugLogf("%s "+s, args...)
+}
+
+// If every client subscription message includes filters consisting
+// only of [["event_type","in",...]] then send only the requested
+// event types. Otherwise, clear sess.eventTypes and send all event
+// types from now on.
+func (sess *sessionV0) checkFilters(filters []v0filter) {
+ if sess.eventTypes == nil {
+ // Already received a subscription request without
+ // event_type filters.
+ return
+ }
+ eventTypes := sess.eventTypes
+ sess.eventTypes = nil
+ if len(filters) == 0 {
+ return
+ }
+ useFilters := false
+ for _, f := range filters {
+ col, ok := f[0].(string)
+ if !ok || col != "event_type" {
+ continue
+ }
+ op, ok := f[1].(string)
+ if !ok || op != "in" {
+ return
+ }
+ arr, ok := f[2].([]interface{})
+ if !ok {
+ return
+ }
+ useFilters = true
+ for _, s := range arr {
+ if s, ok := s.(string); ok {
+ eventTypes[s] = true
+ } else {
+ return
+ }
+ }
+ }
+ if useFilters {
+ sess.debugLogf("eventTypes %+v", eventTypes)
+ sess.eventTypes = eventTypes
+ }
+}
+
+func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
+ sess.debugLogf("received message: %+v", msg)
+ var sub v0subscribe
+ if err := json.Unmarshal(buf, &sub); err != nil {
+ sess.debugLogf("ignored unrecognized request: %s", err)
+ return
+ }
+ if sub.Method == "subscribe" {
+ sess.debugLogf("subscribing to *")
+ sess.mtx.Lock()
+ sess.checkFilters(sub.Filters)
+ sess.subscribed["*"] = true
+ sess.mtx.Unlock()
+ }
+}
+
+func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
+ detail := e.Detail()
+ if detail == nil {
+ return nil, nil
+ }
+
+ ok, err := sess.permChecker.Check(detail.ObjectUUID)
+ if err != nil || !ok {
+ return nil, err
+ }
+
+ msg := map[string]interface{}{
+ "msgID": e.Serial,
+ "id": detail.ID,
+ "uuid": detail.UUID,
+ "object_uuid": detail.ObjectUUID,
+ "object_owner_uuid": detail.ObjectOwnerUUID,
+ "event_type": detail.EventType,
+ }
+ if detail.Properties != nil && detail.Properties["text"] != nil {
+ msg["properties"] = detail.Properties
+ }
+ return json.Marshal(msg)
+}
+
+func (sess *sessionV0) Filter(e *event) bool {
+ detail := e.Detail()
+ sess.mtx.Lock()
+ defer sess.mtx.Unlock()
+ switch {
+ case sess.eventTypes != nil && detail == nil:
+ return false
+ case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
+ return false
+ case sess.subscribed["*"]:
+ return true
+ case detail == nil:
+ return false
+ case sess.subscribed[detail.ObjectUUID]:
+ return true
+ case sess.subscribed[detail.ObjectOwnerUUID]:
+ return true
+ default:
+ return false
+ }
+}
--- /dev/null
+package main
+
+import (
+ "errors"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+func NewSessionV1(ws wsConn, ac arvados.Client) (session, error) {
+ return nil, errors.New("Not implemented")
+}