"Keep storage daemon, accessible to clients on the LAN"
package_go_binary services/keep-web keep-web \
"Static web hosting service for user data stored in Arvados Keep"
+package_go_binary services/ws arvados-ws \
+ "Arvados Websocket server"
package_go_binary tools/keep-block-check keep-block-check \
"Verify that all data from one set of Keep servers to another was copied"
package_go_binary tools/keep-rsync keep-rsync \
services/crunch-run
services/crunch-dispatch-local
services/crunch-dispatch-slurm
+services/ws
sdk/cli
sdk/pam
sdk/python
&& eval $(python sdk/python/tests/run_test_server.py start --auth admin) \
&& export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
&& export ARVADOS_TEST_API_INSTALLED="$$" \
+ && python sdk/python/tests/run_test_server.py start_ws \
+ && python sdk/python/tests/run_test_server.py start_nginx \
&& (env | egrep ^ARVADOS)
}
start_nginx_proxy_services() {
- echo 'Starting keepproxy, keep-web, arv-git-httpd, and nginx ssl proxy...'
+ echo 'Starting keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py start_keep_proxy \
&& python sdk/python/tests/run_test_server.py start_keep-web \
&& python sdk/python/tests/run_test_server.py start_arv-git-httpd \
+ && python sdk/python/tests/run_test_server.py start_ws \
&& python sdk/python/tests/run_test_server.py start_nginx \
&& export ARVADOS_TEST_PROXY_SERVICES=1
}
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py stop_nginx \
&& python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
+ && python sdk/python/tests/run_test_server.py stop_ws \
&& python sdk/python/tests/run_test_server.py stop_keep-web \
&& python sdk/python/tests/run_test_server.py stop_keep_proxy
fi
if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
unset ARVADOS_TEST_API_HOST
cd "$WORKSPACE" \
+ && python sdk/python/tests/run_test_server.py stop_nginx \
+ && python sdk/python/tests/run_test_server.py stop_ws \
&& python sdk/python/tests/run_test_server.py stop
fi
}
services/crunch-dispatch-local
services/crunch-dispatch-slurm
services/crunch-run
+ services/ws
tools/keep-block-check
tools/keep-exercise
tools/keep-rsync
// callers who use a Client to initialize an
// arvadosclient.ArvadosClient.)
KeepServiceURIs []string `json:",omitempty"`
+
+ dd *DiscoveryDocument
}
// The default http.Client used by a Client with Insecure==true and
// DiscoveryDocument is the Arvados server's description of itself.
type DiscoveryDocument struct {
- DefaultCollectionReplication int `json:"defaultCollectionReplication"`
- BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+ BasePath string `json:"basePath"`
+ DefaultCollectionReplication int `json:"defaultCollectionReplication"`
+ BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+ Schemas map[string]Schema `json:"schemas"`
+ Resources map[string]Resource `json:"resources"`
+}
+
+type Resource struct {
+ Methods map[string]ResourceMethod `json:"methods"`
+}
+
+type ResourceMethod struct {
+ HTTPMethod string `json:"httpMethod"`
+ Path string `json:"path"`
+ Response MethodResponse `json:"response"`
+}
+
+type MethodResponse struct {
+ Ref string `json:"$ref"`
+}
+
+type Schema struct {
+ UUIDPrefix string `json:"uuidPrefix"`
}
// DiscoveryDocument returns a *DiscoveryDocument. The returned object
// should not be modified: the same object may be returned by
// subsequent calls.
func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+ if c.dd != nil {
+ return c.dd, nil
+ }
var dd DiscoveryDocument
- return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+ err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+ if err != nil {
+ return nil, err
+ }
+ c.dd = &dd
+ return c.dd, nil
+}
+
+func (c *Client) PathForUUID(method, uuid string) (string, error) {
+ if len(uuid) != 27 {
+ return "", fmt.Errorf("invalid UUID: %q", uuid)
+ }
+ dd, err := c.DiscoveryDocument()
+ if err != nil {
+ return "", err
+ }
+ infix := uuid[6:11]
+ var model string
+ for m, s := range dd.Schemas {
+ if s.UUIDPrefix == infix {
+ model = m
+ break
+ }
+ }
+ if model == "" {
+ return "", fmt.Errorf("unrecognized UUID infix: %q", infix)
+ }
+ var resource string
+ for r, rsc := range dd.Resources {
+ if rsc.Methods["get"].Response.Ref == model {
+ resource = r
+ break
+ }
+ }
+ if resource == "" {
+ return "", fmt.Errorf("no resource for model: %q", model)
+ }
+ m, ok := dd.Resources[resource].Methods[method]
+ if !ok {
+ return "", fmt.Errorf("no method %q for resource %q", method, resource)
+ }
+ path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1)
+ if path[0] == '/' {
+ path = path[1:]
+ }
+ return path, nil
}
--- /dev/null
+package arvados
+
+import (
+ "time"
+)
+
+// Log is an arvados#log record
+type Log struct {
+ ID uint64 `json:"id"`
+ UUID string `json:"uuid"`
+ ObjectUUID string `json:"object_uuid"`
+ ObjectOwnerUUID string `json:"object_owner_uuid"`
+ EventType string `json:"event_type"`
+ Properties map[string]interface{} `json:"properties"`
+ CreatedAt *time.Time `json:"created_at,omitempty"`
+}
+
+// LogList is an arvados#logList resource.
+type LogList struct {
+ Items []Log `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
}
return nil
}
+
+// Dump returns a YAML representation of cfg.
+func Dump(cfg interface{}) ([]byte, error) {
+ return yaml.Marshal(cfg)
+}
proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/;
}
}
+ upstream ws {
+ server localhost:{{WSPORT}};
+ }
+ server {
+ listen *:{{WSSPORT}} ssl default_server;
+ server_name ~^(?<request_host>.*)$;
+ ssl_certificate {{SSLCERT}};
+ ssl_certificate_key {{SSLKEY}};
+ location / {
+ proxy_pass http://ws;
+ proxy_set_header Upgrade $http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host $request_host:{{WSPORT}};
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ }
+ }
}
my_api_host = None
_cached_config = {}
+_cached_db_config = {}
def find_server_pid(PID_PATH, wait=10):
now = time.time()
os.makedirs(gitdir)
subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball])
+ # The nginx proxy isn't listening here yet, but we need to choose
+ # the wss:// port now so we can write the API server config file.
+ wss_port = find_available_port()
+ _setport('wss', wss_port)
+
port = find_available_port()
env = os.environ.copy()
env['RAILS_ENV'] = 'test'
- env['ARVADOS_WEBSOCKETS'] = 'yes'
+ env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
+ env.pop('ARVADOS_WEBSOCKETS', None)
env.pop('ARVADOS_TEST_API_HOST', None)
env.pop('ARVADOS_API_HOST', None)
env.pop('ARVADOS_API_HOST_INSECURE', None)
kill_server_pid(_pidfile('api'))
my_api_host = None
+def run_ws():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ stop_ws()
+ port = find_available_port()
+ conf = os.path.join(TEST_TMPDIR, 'ws.yml')
+ with open(conf, 'w') as f:
+ f.write("""
+Client:
+ APIHost: {}
+ Insecure: true
+Listen: :{}
+Postgres:
+ host: {}
+ dbname: {}
+ user: {}
+ password: {}
+ sslmode: require
+ """.format(os.environ['ARVADOS_API_HOST'],
+ port,
+ _dbconfig('host'),
+ _dbconfig('database'),
+ _dbconfig('username'),
+ _dbconfig('password')))
+ logf = open(_fifo2stderr('ws'), 'w')
+ ws = subprocess.Popen(
+ ["ws", "-config", conf],
+ stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+ with open(_pidfile('ws'), 'w') as f:
+ f.write(str(ws.pid))
+ _wait_until_port_listens(port)
+ _setport('ws', port)
+ return port
+
+def stop_ws():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ kill_server_pid(_pidfile('ws'))
+
def _start_keep(n, keep_args):
keep0 = tempfile.mkdtemp()
port = find_available_port()
nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
nginxconf['GITPORT'] = _getport('arv-git-httpd')
nginxconf['GITSSLPORT'] = find_available_port()
+ nginxconf['WSPORT'] = _getport('ws')
+ nginxconf['WSSPORT'] = _getport('wss')
nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
except IOError:
return 9
+def _dbconfig(key):
+ global _cached_db_config
+ if not _cached_db_config:
+ _cached_db_config = yaml.load(open(os.path.join(
+ SERVICES_SRC_DIR, 'api', 'config', 'database.yml')))
+ return _cached_db_config['test'][key]
+
def _apiconfig(key):
+ global _cached_config
if _cached_config:
return _cached_config[key]
def _load(f, required=True):
original environment.
"""
MAIN_SERVER = None
+ WS_SERVER = None
KEEP_SERVER = None
KEEP_PROXY_SERVER = None
KEEP_WEB_SERVER = None
os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
for server_kwargs, start_func, stop_func in (
(cls.MAIN_SERVER, run, reset),
+ (cls.WS_SERVER, run_ws, stop_ws),
(cls.KEEP_SERVER, run_keep, stop_keep),
(cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
(cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
if __name__ == "__main__":
actions = [
'start', 'stop',
+ 'start_ws', 'stop_ws',
'start_keep', 'stop_keep',
'start_keep_proxy', 'stop_keep_proxy',
'start_keep-web', 'stop_keep-web',
print(host)
elif args.action == 'stop':
stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
+ elif args.action == 'start_ws':
+ run_ws()
+ elif args.action == 'stop_ws':
+ stop_ws()
elif args.action == 'start_keep':
run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
elif args.action == 'stop_keep':
self.assertEqual(200, events.get(True, 5)['status'])
human = arvados.api('v1').humans().create(body={}).execute()
- log_object_uuids = []
- for i in range(0, expected):
- log_object_uuids.append(events.get(True, 5)['object_uuid'])
-
+ want_uuids = []
if expected > 0:
- self.assertIn(human['uuid'], log_object_uuids)
-
+ want_uuids.append(human['uuid'])
if expected > 1:
- self.assertIn(ancestor['uuid'], log_object_uuids)
+ want_uuids.append(ancestor['uuid'])
+ log_object_uuids = []
+ while set(want_uuids) - set(log_object_uuids):
+ log_object_uuids.append(events.get(True, 5)['object_uuid'])
- with self.assertRaises(Queue.Empty):
- # assertEqual just serves to show us what unexpected thing
- # comes out of the queue when the assertRaises fails; when
- # the test passes, this assertEqual doesn't get called.
- self.assertEqual(events.get(True, 2), None)
+ if expected < 2:
+ with self.assertRaises(Queue.Empty):
+ # assertEqual just serves to show us what unexpected
+ # thing comes out of the queue when the assertRaises
+ # fails; when the test passes, this assertEqual
+ # doesn't get called.
+ self.assertEqual(events.get(True, 2), None)
def test_subscribe_websocket(self):
self._test_subscribe(
return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
def isotz(self, offset):
- """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
- return '{:+03d}{:02d}'.format(offset/60, offset%60)
+ """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
+ return '{:+03d}:{:02d}'.format(offset/60, offset%60)
# Test websocket reconnection on (un)execpted close
def _test_websocket_reconnect(self, close_unexpected):
workbench_address: https://localhost:3001/
git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %>
git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %>
+ websocket_address: "wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"
--- /dev/null
+package main
+
+import (
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type Config struct {
+ Client arvados.Client
+ Postgres pgConfig
+ Listen string
+ Debug bool
+
+ PingTimeout arvados.Duration
+ ClientEventQueue int
+ ServerEventQueue int
+}
+
+func DefaultConfig() Config {
+ return Config{
+ Client: arvados.Client{
+ APIHost: "localhost:443",
+ },
+ Postgres: pgConfig{
+ "dbname": "arvados_production",
+ "user": "arvados",
+ "password": "xyzzy",
+ "host": "localhost",
+ "connect_timeout": "30",
+ "sslmode": "require",
+ },
+ PingTimeout: arvados.Duration(time.Minute),
+ ClientEventQueue: 64,
+ ServerEventQueue: 4,
+ }
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "log"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/ghodss/yaml"
+)
+
+type eventSink interface {
+ Channel() <-chan *event
+ Stop()
+}
+
+type eventSource interface {
+ NewSink() eventSink
+ DB() *sql.DB
+}
+
+type event struct {
+ LogID uint64
+ Received time.Time
+ Serial uint64
+
+ db *sql.DB
+ logRow *arvados.Log
+ err error
+ mtx sync.Mutex
+}
+
+// Detail returns the database row corresponding to the event. It can
+// be called safely from multiple goroutines. Only one attempt will be
+// made. If the database row cannot be retrieved, Detail returns nil.
+func (e *event) Detail() *arvados.Log {
+ e.mtx.Lock()
+ defer e.mtx.Unlock()
+ if e.logRow != nil || e.err != nil {
+ return e.logRow
+ }
+ var logRow arvados.Log
+ var propYAML []byte
+ e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
+ &logRow.ID,
+ &logRow.UUID,
+ &logRow.ObjectUUID,
+ &logRow.ObjectOwnerUUID,
+ &logRow.EventType,
+ &logRow.CreatedAt,
+ &propYAML)
+ if e.err != nil {
+ log.Printf("retrieving log row %d: %s", e.LogID, e.err)
+ return nil
+ }
+ e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
+ if e.err != nil {
+ log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err)
+ return nil
+ }
+ e.logRow = &logRow
+ return e.logRow
+}
--- /dev/null
+package main
+
+import (
+ "encoding/json"
+ "io"
+ "log"
+ "net/http"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type wsConn interface {
+ io.ReadWriter
+ Request() *http.Request
+ SetReadDeadline(time.Time) error
+ SetWriteDeadline(time.Time) error
+}
+
+type handler struct {
+ Client arvados.Client
+ PingTimeout time.Duration
+ QueueSize int
+ NewSession func(wsConn) (session, error)
+}
+
+type handlerStats struct {
+ QueueDelay time.Duration
+ WriteDelay time.Duration
+ EventBytes uint64
+ EventCount uint64
+}
+
+func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
+ sess, err := h.NewSession(ws)
+ if err != nil {
+ log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
+ return
+ }
+
+ queue := make(chan interface{}, h.QueueSize)
+
+ stopped := make(chan struct{})
+ stop := make(chan error, 5)
+
+ go func() {
+ buf := make([]byte, 2<<20)
+ for {
+ select {
+ case <-stopped:
+ return
+ default:
+ }
+ ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
+ n, err := ws.Read(buf)
+ sess.debugLogf("received frame: %q", buf[:n])
+ if err == nil && n == len(buf) {
+ err = errFrameTooBig
+ }
+ if err != nil {
+ if err != io.EOF {
+ sess.debugLogf("handler: read: %s", err)
+ }
+ stop <- err
+ return
+ }
+ msg := make(map[string]interface{})
+ err = json.Unmarshal(buf[:n], &msg)
+ if err != nil {
+ sess.debugLogf("handler: unmarshal: %s", err)
+ stop <- err
+ return
+ }
+ for _, buf := range sess.Receive(msg, buf[:n]) {
+ sess.debugLogf("handler: to queue: %s", string(buf))
+ queue <- buf
+ }
+ }
+ }()
+
+ go func() {
+ for e := range queue {
+ if buf, ok := e.([]byte); ok {
+ ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+ sess.debugLogf("handler: send msg: %s", string(buf))
+ _, err := ws.Write(buf)
+ if err != nil {
+ sess.debugLogf("handler: write {}: %s", err)
+ stop <- err
+ break
+ }
+ continue
+ }
+ e := e.(*event)
+
+ buf, err := sess.EventMessage(e)
+ if err != nil {
+ sess.debugLogf("EventMessage %d: err %s", err)
+ stop <- err
+ break
+ } else if len(buf) == 0 {
+ sess.debugLogf("EventMessage %d: skip", e.Serial)
+ continue
+ }
+
+ sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
+ ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+ t0 := time.Now()
+ _, err = ws.Write(buf)
+ if err != nil {
+ sess.debugLogf("handler: write: %s", err)
+ stop <- err
+ break
+ }
+ sess.debugLogf("handler: sent event %d", e.Serial)
+ stats.WriteDelay += time.Since(t0)
+ stats.QueueDelay += t0.Sub(e.Received)
+ stats.EventBytes += uint64(len(buf))
+ stats.EventCount++
+ }
+ for _ = range queue {
+ }
+ }()
+
+ // Filter incoming events against the current subscription
+ // list, and forward matching events to the outgoing message
+ // queue. Close the queue and return when the "stopped"
+ // channel closes or the incoming event stream ends. Shut down
+ // the handler if the outgoing queue fills up.
+ go func() {
+ send := func(e *event) {
+ select {
+ case queue <- e:
+ default:
+ stop <- errQueueFull
+ }
+ }
+
+ ticker := time.NewTicker(h.PingTimeout)
+ defer ticker.Stop()
+
+ for {
+ var e *event
+ var ok bool
+ select {
+ case <-stopped:
+ close(queue)
+ return
+ case <-ticker.C:
+ // If the outgoing queue is empty,
+ // send an empty message. This can
+ // help detect a disconnected network
+ // socket, and prevent an idle socket
+ // from being closed.
+ if len(queue) == 0 {
+ queue <- []byte(`{}`)
+ }
+ continue
+ case e, ok = <-events:
+ if !ok {
+ close(queue)
+ return
+ }
+ }
+ if sess.Filter(e) {
+ send(e)
+ }
+ }
+ }()
+
+ <-stop
+ close(stopped)
+
+ return
+}
--- /dev/null
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "log"
+ "time"
+)
+
+func init() {
+ log.SetFlags(0)
+}
+
+func errorLogf(f string, args ...interface{}) {
+ log.Print(`{"error":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
+}
+
+var debugLogf = func(f string, args ...interface{}) {
+ log.Print(`{"debug":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
+}
+
+func mustMarshal(v interface{}) []byte {
+ buf, err := json.Marshal(v)
+ if err != nil {
+ panic(err)
+ }
+ return buf
+}
+
+func logj(args ...interface{}) {
+ m := map[string]interface{}{"Time": time.Now().UTC()}
+ for i := 0; i < len(args)-1; i += 2 {
+ m[fmt.Sprintf("%s", args[i])] = args[i+1]
+ }
+ buf, err := json.Marshal(m)
+ if err != nil {
+ errorLogf("logj: %s", err)
+ return
+ }
+ log.Print(string(buf))
+}
--- /dev/null
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "net/http"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/config"
+)
+
+func main() {
+ configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+ dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
+ cfg := DefaultConfig()
+ flag.Parse()
+
+ err := config.LoadFile(&cfg, *configPath)
+ if err != nil {
+ log.Fatal(err)
+ }
+ if !cfg.Debug {
+ debugLogf = func(string, ...interface{}) {}
+ }
+
+ if *dumpConfig {
+ txt, err := config.Dump(&cfg)
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Print(string(txt))
+ return
+ }
+
+ eventSource := &pgEventSource{
+ DataSource: cfg.Postgres.ConnectionString(),
+ QueueSize: cfg.ServerEventQueue,
+ }
+ srv := &http.Server{
+ Addr: cfg.Listen,
+ ReadTimeout: time.Minute,
+ WriteTimeout: time.Minute,
+ MaxHeaderBytes: 1 << 20,
+ Handler: &router{
+ Config: &cfg,
+ eventSource: eventSource,
+ },
+ }
+ eventSource.NewSink().Stop()
+
+ log.Printf("listening at %s", srv.Addr)
+ log.Fatal(srv.ListenAndServe())
+}
--- /dev/null
+package main
+
+import (
+ "net/http"
+ "net/url"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+const (
+ maxPermCacheAge = time.Hour
+ minPermCacheAge = 5 * time.Minute
+)
+
+type permChecker interface {
+ SetToken(token string)
+ Check(uuid string) (bool, error)
+}
+
+func NewPermChecker(ac arvados.Client) permChecker {
+ ac.AuthToken = ""
+ return &cachingPermChecker{
+ Client: &ac,
+ cache: make(map[string]cacheEnt),
+ maxCurrent: 16,
+ }
+}
+
+type cacheEnt struct {
+ time.Time
+ allowed bool
+}
+
+type cachingPermChecker struct {
+ *arvados.Client
+ cache map[string]cacheEnt
+ maxCurrent int
+}
+
+func (pc *cachingPermChecker) SetToken(token string) {
+ pc.Client.AuthToken = token
+}
+
+func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+ pc.tidy()
+ now := time.Now()
+ if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
+ debugLogf("perm (cached): %+q %+q ...%v", pc.Client.AuthToken, uuid, perm.allowed)
+ return perm.allowed, nil
+ }
+ var buf map[string]interface{}
+ path, err := pc.PathForUUID("get", uuid)
+ if err != nil {
+ return false, err
+ }
+ err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
+ "select": {`["uuid"]`},
+ })
+
+ var allowed bool
+ if err == nil {
+ allowed = true
+ } else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+ allowed = false
+ } else {
+ errorLogf("perm err: %+q %+q: %T %s", pc.Client.AuthToken, uuid, err, err)
+ return false, err
+ }
+ debugLogf("perm: %+q %+q ...%v", pc.Client.AuthToken, uuid, allowed)
+ pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
+ return allowed, nil
+}
+
+func (pc *cachingPermChecker) tidy() {
+ if len(pc.cache) <= pc.maxCurrent*2 {
+ return
+ }
+ tooOld := time.Now().Add(-minPermCacheAge)
+ for uuid, t := range pc.cache {
+ if t.Before(tooOld) {
+ delete(pc.cache, uuid)
+ }
+ }
+ pc.maxCurrent = len(pc.cache)
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "fmt"
+ "log"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/lib/pq"
+)
+
+type pgConfig map[string]string
+
+func (c pgConfig) ConnectionString() string {
+ s := ""
+ for k, v := range c {
+ s += k
+ s += "='"
+ s += strings.Replace(
+ strings.Replace(v, `\`, `\\`, -1),
+ `'`, `\'`, -1)
+ s += "' "
+ }
+ return s
+}
+
+type pgEventSource struct {
+ DataSource string
+ QueueSize int
+
+ db *sql.DB
+ pqListener *pq.Listener
+ sinks map[*pgEventSink]bool
+ setupOnce sync.Once
+ mtx sync.Mutex
+ shutdown chan error
+}
+
+func (ps *pgEventSource) setup() {
+ ps.shutdown = make(chan error, 1)
+ ps.sinks = make(map[*pgEventSink]bool)
+
+ db, err := sql.Open("postgres", ps.DataSource)
+ if err != nil {
+ log.Fatalf("sql.Open: %s", err)
+ }
+ if err = db.Ping(); err != nil {
+ log.Fatalf("db.Ping: %s", err)
+ }
+ ps.db = db
+
+ ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+ if err != nil {
+ // Until we have a mechanism for catching up
+ // on missed events, we cannot recover from a
+ // dropped connection without breaking our
+ // promises to clients.
+ ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
+ }
+ })
+ err = ps.pqListener.Listen("logs")
+ if err != nil {
+ log.Fatal(err)
+ }
+ debugLogf("pgEventSource listening")
+
+ go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+ eventQueue := make(chan *event, ps.QueueSize)
+
+ go func() {
+ for e := range eventQueue {
+ // Wait for the "select ... from logs" call to
+ // finish. This limits max concurrent queries
+ // to ps.QueueSize. Without this, max
+ // concurrent queries would be bounded by
+ // client_count X client_queue_size.
+ e.Detail()
+ debugLogf("event %d detail %+v", e.Serial, e.Detail())
+ ps.mtx.Lock()
+ for sink := range ps.sinks {
+ sink.channel <- e
+ }
+ ps.mtx.Unlock()
+ }
+ }()
+
+ var serial uint64
+ ticker := time.NewTicker(time.Minute)
+ defer ticker.Stop()
+ for {
+ select {
+ case err, ok := <-ps.shutdown:
+ if ok {
+ debugLogf("shutdown on error: %s", err)
+ }
+ close(eventQueue)
+ return
+
+ case <-ticker.C:
+ debugLogf("pgEventSource listener ping")
+ ps.pqListener.Ping()
+
+ case pqEvent, ok := <-ps.pqListener.Notify:
+ if !ok {
+ close(eventQueue)
+ return
+ }
+ if pqEvent.Channel != "logs" {
+ continue
+ }
+ logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+ if err != nil {
+ log.Printf("bad notify payload: %+v", pqEvent)
+ continue
+ }
+ serial++
+ e := &event{
+ LogID: logID,
+ Received: time.Now(),
+ Serial: serial,
+ db: ps.db,
+ }
+ debugLogf("event %d %+v", e.Serial, e)
+ eventQueue <- e
+ go e.Detail()
+ }
+ }
+}
+
+// NewSink subscribes to the event source. NewSink returns an
+// eventSink, whose Channel() method returns a channel: a pointer to
+// each subsequent event will be sent to that channel.
+//
+// The caller must ensure events are received from the sink channel as
+// quickly as possible because when one sink stops being ready, all
+// other sinks block.
+func (ps *pgEventSource) NewSink() eventSink {
+ ps.setupOnce.Do(ps.setup)
+ sink := &pgEventSink{
+ channel: make(chan *event, 1),
+ source: ps,
+ }
+ ps.mtx.Lock()
+ ps.sinks[sink] = true
+ ps.mtx.Unlock()
+ return sink
+}
+
+func (ps *pgEventSource) DB() *sql.DB {
+ ps.setupOnce.Do(ps.setup)
+ return ps.db
+}
+
+type pgEventSink struct {
+ channel chan *event
+ source *pgEventSource
+}
+
+func (sink *pgEventSink) Channel() <-chan *event {
+ return sink.channel
+}
+
+func (sink *pgEventSink) Stop() {
+ go func() {
+ // Ensure this sink cannot fill up and block the
+ // server-side queue (which otherwise could in turn
+ // block our mtx.Lock() here)
+ for _ = range sink.channel {
+ }
+ }()
+ sink.source.mtx.Lock()
+ delete(sink.source.sinks, sink)
+ sink.source.mtx.Unlock()
+ close(sink.channel)
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "encoding/json"
+ "log"
+ "net/http"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "golang.org/x/net/websocket"
+)
+
+type router struct {
+ Config *Config
+
+ eventSource eventSource
+ mux *http.ServeMux
+ setupOnce sync.Once
+}
+
+func (rtr *router) setup() {
+ rtr.mux = http.NewServeMux()
+ rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
+ rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
+}
+
+func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (session, error)) *websocket.Server {
+ handler := &handler{
+ PingTimeout: rtr.Config.PingTimeout.Duration(),
+ QueueSize: rtr.Config.ClientEventQueue,
+ NewSession: func(ws wsConn) (session, error) {
+ return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
+ },
+ }
+ return &websocket.Server{
+ Handshake: func(c *websocket.Config, r *http.Request) error {
+ return nil
+ },
+ Handler: websocket.Handler(func(ws *websocket.Conn) {
+ logj("Type", "connect",
+ "RemoteAddr", ws.Request().RemoteAddr)
+ t0 := time.Now()
+
+ sink := rtr.eventSource.NewSink()
+ stats := handler.Handle(ws, sink.Channel())
+
+ logj("Type", "disconnect",
+ "RemoteAddr", ws.Request().RemoteAddr,
+ "Elapsed", time.Now().Sub(t0).Seconds(),
+ "Stats", stats)
+
+ sink.Stop()
+ ws.Close()
+ }),
+ }
+}
+
+func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ rtr.setupOnce.Do(rtr.setup)
+ logj("Type", "request",
+ "RemoteAddr", req.RemoteAddr,
+ "X-Forwarded-For", req.Header.Get("X-Forwarded-For"))
+ rtr.mux.ServeHTTP(resp, req)
+}
+
+func reqLog(m map[string]interface{}) {
+ j, err := json.Marshal(m)
+ if err != nil {
+ log.Fatal(err)
+ }
+ log.Print(string(j))
+}
--- /dev/null
+package main
+
+type session interface {
+ // Receive processes a message received from the client. If
+ // the returned list of messages is non-nil, they will be
+ // queued for sending to the client.
+ Receive(map[string]interface{}, []byte) [][]byte
+
+ // Filter returns true if the event should be queued for
+ // sending to the client. It should return as fast as
+ // possible, and must not block.
+ Filter(*event) bool
+
+ // EventMessage encodes the given event (from the front of the
+ // queue) into a form suitable to send to the client. If a
+ // non-nil error is returned, the connection is terminated. If
+ // the returned buffer is empty, nothing is sent to the client
+ // and the event is not counted in statistics.
+ //
+ // Unlike Filter, EventMessage can block without affecting
+ // other connections. If EventMessage is slow, additional
+ // incoming events will be queued. If the event queue fills
+ // up, the connection will be dropped.
+ EventMessage(*event) ([]byte, error)
+
+ debugLogf(string, ...interface{})
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "encoding/json"
+ "errors"
+ "log"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var (
+ errQueueFull = errors.New("client queue full")
+ errFrameTooBig = errors.New("frame too big")
+
+ sendObjectAttributes = []string{"state", "name"}
+
+ v0subscribeOK = []byte(`{"status":200}`)
+ v0subscribeFail = []byte(`{"status":400}`)
+)
+
+type v0session struct {
+ ws wsConn
+ db *sql.DB
+ permChecker permChecker
+ subscriptions []v0subscribe
+ mtx sync.Mutex
+ setupOnce sync.Once
+}
+
+func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
+ sess := &v0session{
+ ws: ws,
+ db: db,
+ permChecker: NewPermChecker(ac),
+ }
+
+ err := ws.Request().ParseForm()
+ if err != nil {
+ log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+ return nil, err
+ }
+ token := ws.Request().Form.Get("api_token")
+ sess.permChecker.SetToken(token)
+ sess.debugLogf("token = %+q", token)
+
+ return sess, nil
+}
+
+func (sess *v0session) debugLogf(s string, args ...interface{}) {
+ args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
+ debugLogf("%s "+s, args...)
+}
+
+func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
+ sess.debugLogf("received message: %+v", msg)
+ var sub v0subscribe
+ if err := json.Unmarshal(buf, &sub); err != nil {
+ sess.debugLogf("ignored unrecognized request: %s", err)
+ return nil
+ }
+ if sub.Method == "subscribe" {
+ sub.prepare()
+ sess.debugLogf("subscription: %v", sub)
+ sess.mtx.Lock()
+ sess.subscriptions = append(sess.subscriptions, sub)
+ sess.mtx.Unlock()
+
+ return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
+ }
+ return [][]byte{v0subscribeFail}
+}
+
+func (sess *v0session) EventMessage(e *event) ([]byte, error) {
+ detail := e.Detail()
+ if detail == nil {
+ return nil, nil
+ }
+
+ ok, err := sess.permChecker.Check(detail.ObjectUUID)
+ if err != nil || !ok {
+ return nil, err
+ }
+
+ msg := map[string]interface{}{
+ "msgID": e.Serial,
+ "id": detail.ID,
+ "uuid": detail.UUID,
+ "object_uuid": detail.ObjectUUID,
+ "object_owner_uuid": detail.ObjectOwnerUUID,
+ "event_type": detail.EventType,
+ }
+ if detail.Properties != nil && detail.Properties["text"] != nil {
+ msg["properties"] = detail.Properties
+ } else {
+ msgProps := map[string]map[string]interface{}{}
+ for _, ak := range []string{"old_attributes", "new_attributes"} {
+ eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
+ if !ok {
+ continue
+ }
+ msgAttrs := map[string]interface{}{}
+ for _, k := range sendObjectAttributes {
+ if v, ok := eventAttrs[k]; ok {
+ msgAttrs[k] = v
+ }
+ }
+ msgProps[ak] = msgAttrs
+ }
+ msg["properties"] = msgProps
+ }
+ return json.Marshal(msg)
+}
+
+func (sess *v0session) Filter(e *event) bool {
+ sess.mtx.Lock()
+ defer sess.mtx.Unlock()
+ for _, sub := range sess.subscriptions {
+ if sub.match(e) {
+ return true
+ }
+ }
+ return false
+}
+
+func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
+ if sub.LastLogID == 0 {
+ return
+ }
+ debugLogf("getOldEvents(%d)", sub.LastLogID)
+ // Here we do a "select id" query and queue an event for every
+ // log since the given ID, then use (*event)Detail() to
+ // retrieve the whole row and decide whether to send it. This
+ // approach is very inefficient if the subscriber asks for
+ // last_log_id==1, even if the filters end up matching very
+ // few events.
+ //
+ // To mitigate this, filter on "created > 10 minutes ago" when
+ // retrieving the list of old event IDs to consider.
+ rows, err := sess.db.Query(
+ `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
+ sub.LastLogID,
+ time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
+ if err != nil {
+ errorLogf("db.Query: %s", err)
+ return
+ }
+ for rows.Next() {
+ var id uint64
+ err := rows.Scan(&id)
+ if err != nil {
+ errorLogf("Scan: %s", err)
+ continue
+ }
+ e := &event{
+ LogID: id,
+ Received: time.Now(),
+ db: sess.db,
+ }
+ if !sub.match(e) {
+ debugLogf("skip old event %+v", e)
+ continue
+ }
+ msg, err := sess.EventMessage(e)
+ if err != nil {
+ debugLogf("event marshal: %s", err)
+ continue
+ }
+ debugLogf("old event: %s", string(msg))
+ msgs = append(msgs, msg)
+ }
+ if err := rows.Err(); err != nil {
+ errorLogf("db.Query: %s", err)
+ }
+ return
+}
+
+type v0subscribe struct {
+ Method string
+ Filters []v0filter
+ LastLogID int64 `json:"last_log_id"`
+
+ funcs []func(*event) bool
+}
+
+type v0filter [3]interface{}
+
+func (sub *v0subscribe) match(e *event) bool {
+ detail := e.Detail()
+ if detail == nil {
+ debugLogf("match(%d): failed on no detail", e.LogID)
+ return false
+ }
+ for i, f := range sub.funcs {
+ if !f(e) {
+ debugLogf("match(%d): failed on func %d", e.LogID, i)
+ return false
+ }
+ }
+ debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
+ return true
+}
+
+func (sub *v0subscribe) prepare() {
+ for _, f := range sub.Filters {
+ if len(f) != 3 {
+ continue
+ }
+ if col, ok := f[0].(string); ok && col == "event_type" {
+ op, ok := f[1].(string)
+ if !ok || op != "in" {
+ continue
+ }
+ arr, ok := f[2].([]interface{})
+ if !ok {
+ continue
+ }
+ var strs []string
+ for _, s := range arr {
+ if s, ok := s.(string); ok {
+ strs = append(strs, s)
+ }
+ }
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
+ for _, s := range strs {
+ if s == e.Detail().EventType {
+ return true
+ }
+ }
+ return false
+ })
+ } else if ok && col == "created_at" {
+ op, ok := f[1].(string)
+ if !ok {
+ continue
+ }
+ tstr, ok := f[2].(string)
+ if !ok {
+ continue
+ }
+ t, err := time.Parse(time.RFC3339Nano, tstr)
+ if err != nil {
+ debugLogf("time.Parse(%q): %s", tstr, err)
+ continue
+ }
+ switch op {
+ case ">=":
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
+ return !e.Detail().CreatedAt.Before(t)
+ })
+ case "<=":
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
+ return !e.Detail().CreatedAt.After(t)
+ })
+ case ">":
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
+ return e.Detail().CreatedAt.After(t)
+ })
+ case "<":
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
+ return e.Detail().CreatedAt.Before(t)
+ })
+ case "=":
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
+ return e.Detail().CreatedAt.Equal(t)
+ })
+ }
+ }
+ }
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "errors"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+func NewSessionV1(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
+ return nil, errors.New("Not implemented")
+}