end
end
+ def copy
+ src = @object
+
+ @object = ContainerRequest.new
+
+ @object.command = src.command
+ @object.container_image = src.container_image
+ @object.cwd = src.cwd
+ @object.description = src.description
+ @object.environment = src.environment
+ @object.mounts = src.mounts
+ @object.name = src.name
+ @object.output_path = src.output_path
+ @object.priority = 1
+ @object.properties[:template_uuid] = src.properties[:template_uuid]
+ @object.runtime_constraints = src.runtime_constraints
+ @object.scheduling_parameters = src.scheduling_parameters
+ @object.state = 'Uncommitted'
+ @object.use_existing = false
+
+ # set owner_uuid to that of source, provided it is a project and writable by current user
+ current_project = Group.find(src.owner_uuid) rescue nil
+ if (current_project && current_project.writable_by.andand.include?(current_user.uuid))
+ @object.owner_uuid = src.owner_uuid
+ end
+
+ super
+ end
end
end
@object.state = 'New'
- # set owner_uuid to that of source, provided it is a project and wriable by current user
+ # set owner_uuid to that of source, provided it is a project and writable by current user
current_project = Group.find(source.owner_uuid) rescue nil
if (current_project && current_project.writable_by.andand.include?(current_user.uuid))
@object.owner_uuid = source.owner_uuid
<% if @object.class.goes_in_projects? && @object.uuid != current_user.andand.uuid # Not the "Home" project %>
<% content_for :tab_line_buttons do %>
+ <% if current_user.andand.is_active %>
+ <%= render partial: 'extra_tab_line_buttons' %>
+ <% end %>
<% if current_user.andand.is_active && @object.class.copies_to_projects? %>
<%= link_to(
choose_projects_path(
--- /dev/null
+<% if @object.state == 'Final' %>
+ <%= link_to(copy_container_request_path('id' => @object.uuid),
+ class: 'btn btn-primary',
+ title: 'Re-run',
+ data: {toggle: :tooltip, placement: :top}, title: 'This will make a copy and take you there. You can then make any needed changes and run it',
+ method: :post,
+ ) do %>
+ <i class="fa fa-fw fa-play"></i> Re-run
+ <% end %>
+<% end %>
pipeline_2:
template_uuid: zzzzz-p5p6p-1xbobfobk94ppbv
input_paths: [zzzzz-4zz18-nz98douzhaa3jh2, zzzzz-4zz18-gpw9o5wpcti3nib]
+ container_requests_to_test:
+ container_request_1:
+ workflow_uuid: zzzzz-7fd4e-60e96shgwspt4mw
+ input_paths: []
+ max_wait_seconds: 10
# Below is a sample setting for performance testing.
# Configure workbench URL as "arvados_workbench_url"
resources :containers
resources :container_requests do
post 'cancel', :on => :member
+ post 'copy', on: :member
end
get '/virtual_machines/:id/webshell/:login' => 'virtual_machines#webshell', :as => :webshell_virtual_machine
resources :authorized_keys
--- /dev/null
+namespace :config do
+ desc 'Show site configuration'
+ task dump: :environment do
+ puts $application_config.to_yaml
+ end
+end
assert_includes @response.body, '<div id="event_log_div"'
assert_select 'Download the log', false
end
+
+ test "completed container request offers re-run option" do
+ use_token 'active'
+
+ uuid = api_fixture('container_requests')['completed']['uuid']
+
+ get :show, {id: uuid}, session_for(:active)
+ assert_response :success
+
+ assert_includes @response.body, "href=\"/container_requests/#{uuid}/copy\""
+ end
+
+ test "container request copy" do
+ completed_cr = api_fixture('container_requests')['completed']
+ post(:copy,
+ {
+ id: completed_cr['uuid']
+ },
+ session_for(:active))
+ assert_response 302
+ copied_cr = assigns(:object)
+ assert_not_nil copied_cr
+ assert_equal 'Uncommitted', copied_cr[:state]
+ assert_equal "Copy of #{completed_cr['name']}", copied_cr['name']
+ assert_equal completed_cr['cmd'], copied_cr['cmd']
+ assert_equal completed_cr['runtime_constraints']['ram'], copied_cr['runtime_constraints'][:ram]
+ end
end
--- /dev/null
+require 'diagnostics_test_helper'
+
+# This test assumes that the configured workflow_uuid corresponds to a cwl workflow.
+# Ex: configure a workflow using the steps below and use the resulting workflow uuid:
+# > cd arvados/doc/user/cwl/bwa-mem
+# > arvados-cwl-runner --create-workflow bwa-mem.cwl bwa-mem-input.yml
+
+class ContainerRequestTest < DiagnosticsTest
+ crs_to_test = Rails.configuration.container_requests_to_test.andand.keys
+
+ setup do
+ need_selenium 'to make websockets work'
+ end
+
+ crs_to_test.andand.each do |cr_to_test|
+ test "run container_request: #{cr_to_test}" do
+ cr_config = Rails.configuration.container_requests_to_test[cr_to_test]
+
+ visit_page_with_token 'active'
+
+ find('.btn', text: 'Run a process').click
+
+ within('.modal-dialog') do
+ page.find_field('Search').set cr_config['workflow_uuid']
+ wait_for_ajax
+ find('.selectable', text: 'bwa-mem.cwl').click
+ find('.btn', text: 'Next: choose inputs').click
+ end
+
+ page.assert_selector('a.disabled,button.disabled', text: 'Run') if cr_config['input_paths'].any?
+
+ # Choose input for the workflow
+ cr_config['input_paths'].each do |look_for|
+ select_input look_for
+ end
+ wait_for_ajax
+
+ # All needed input are already filled in. Run this workflow now
+ page.assert_no_selector('a.disabled,button.disabled', text: 'Run')
+ find('a,button', text: 'Run').click
+
+ # container_request is running. Run button is no longer available.
+ page.assert_no_selector('a', text: 'Run')
+
+ # Wait for container_request run to complete
+ wait_until_page_has 'completed', cr_config['max_wait_seconds']
+ end
+ end
+end
find('a,button', text: 'Components').click
find('a,button', text: 'Run').click
- # Pipeline is running. We have a "Stop" button instead now.
+ # Pipeline is running. We have a "Pause" button instead now.
page.assert_selector 'a,button', text: 'Pause'
# Wait for pipeline run to complete
wait_until_page_has 'completed', pipeline_config['max_wait_seconds']
end
end
-
- def select_input look_for
- inputs_needed = page.all('.btn', text: 'Choose')
- return if (!inputs_needed || !inputs_needed.any?)
-
- look_for_uuid = nil
- look_for_file = nil
- if look_for.andand.index('/').andand.>0
- partitions = look_for.partition('/')
- look_for_uuid = partitions[0]
- look_for_file = partitions[2]
- else
- look_for_uuid = look_for
- look_for_file = nil
- end
-
- assert_triggers_dom_event 'shown.bs.modal' do
- inputs_needed[0].click
- end
-
- within('.modal-dialog') do
- if look_for_uuid
- fill_in('Search', with: look_for_uuid, exact: true)
- wait_for_ajax
- end
-
- page.all('.selectable').first.click
- wait_for_ajax
- # ajax reload is wiping out input selection after search results; so, select again.
- page.all('.selectable').first.click
- wait_for_ajax
-
- if look_for_file
- wait_for_ajax
- within('.collection_files_name', text: look_for_file) do
- find('.fa-file').click
- end
- end
-
- find('button', text: 'OK').click
- wait_for_ajax
- end
- end
end
visit page_with_token(tokens[token_name], (workbench_url + path))
end
+ def select_input look_for
+ inputs_needed = page.all('.btn', text: 'Choose')
+ return if (!inputs_needed || !inputs_needed.any?)
+
+ look_for_uuid = nil
+ look_for_file = nil
+ if look_for.andand.index('/').andand.>0
+ partitions = look_for.partition('/')
+ look_for_uuid = partitions[0]
+ look_for_file = partitions[2]
+ else
+ look_for_uuid = look_for
+ look_for_file = nil
+ end
+
+ assert_triggers_dom_event 'shown.bs.modal' do
+ inputs_needed[0].click
+ end
+
+ within('.modal-dialog') do
+ if look_for_uuid
+ fill_in('Search', with: look_for_uuid, exact: true)
+ wait_for_ajax
+ end
+
+ page.all('.selectable').first.click
+ wait_for_ajax
+ # ajax reload is wiping out input selection after search results; so, select again.
+ page.all('.selectable').first.click
+ wait_for_ajax
+
+ if look_for_file
+ wait_for_ajax
+ within('.collection_files_name', text: look_for_file) do
+ find('.fa-file').click
+ end
+ end
+
+ find('button', text: 'OK').click
+ wait_for_ajax
+ end
+ end
+
# Looks for the text_to_look_for for up to the max_time provided
def wait_until_page_has text_to_look_for, max_time=30
max_time = 30 if (!max_time || (max_time.to_s != max_time.to_i.to_s))
"Keep storage daemon, accessible to clients on the LAN"
package_go_binary services/keep-web keep-web \
"Static web hosting service for user data stored in Arvados Keep"
+package_go_binary services/ws arvados-ws \
+ "Arvados Websocket server"
package_go_binary tools/keep-block-check keep-block-check \
"Verify that all data from one set of Keep servers to another was copied"
package_go_binary tools/keep-rsync keep-rsync \
services/crunch-run
services/crunch-dispatch-local
services/crunch-dispatch-slurm
+services/ws
sdk/cli
sdk/pam
sdk/python
sdk/go/manifest
sdk/go/blockdigest
sdk/go/streamer
+sdk/go/stats
sdk/go/crunchrunner
sdk/cwl
tools/crunchstat-summary
&& eval $(python sdk/python/tests/run_test_server.py start --auth admin) \
&& export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
&& export ARVADOS_TEST_API_INSTALLED="$$" \
+ && python sdk/python/tests/run_test_server.py start_ws \
+ && python sdk/python/tests/run_test_server.py start_nginx \
&& (env | egrep ^ARVADOS)
}
start_nginx_proxy_services() {
- echo 'Starting keepproxy, keep-web, arv-git-httpd, and nginx ssl proxy...'
+ echo 'Starting keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py start_keep_proxy \
&& python sdk/python/tests/run_test_server.py start_keep-web \
&& python sdk/python/tests/run_test_server.py start_arv-git-httpd \
+ && python sdk/python/tests/run_test_server.py start_ws \
&& python sdk/python/tests/run_test_server.py start_nginx \
&& export ARVADOS_TEST_PROXY_SERVICES=1
}
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py stop_nginx \
&& python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
+ && python sdk/python/tests/run_test_server.py stop_ws \
&& python sdk/python/tests/run_test_server.py stop_keep-web \
&& python sdk/python/tests/run_test_server.py stop_keep_proxy
fi
if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
unset ARVADOS_TEST_API_HOST
cd "$WORKSPACE" \
+ && python sdk/python/tests/run_test_server.py stop_nginx \
+ && python sdk/python/tests/run_test_server.py stop_ws \
&& python sdk/python/tests/run_test_server.py stop
fi
}
sdk/go/manifest
sdk/go/streamer
sdk/go/crunchrunner
+ sdk/go/stats
lib/crunchstat
services/arv-git-httpd
services/crunchstat
services/crunch-dispatch-local
services/crunch-dispatch-slurm
services/crunch-run
+ services/ws
tools/keep-block-check
tools/keep-exercise
tools/keep-rsync
--- /dev/null
+---
+layout: default
+navsection: installguide
+title: Install the websocket server
+...
+
+{% include 'notebox_begin_warning' %}
+
+This websocket server is an alternative to the puma server that comes with the API server. It is available as an *experimental pre-release* and is not recommended for production sites.
+
+{% include 'notebox_end' %}
+
+The arvados-ws server provides event notifications to websocket clients. It can be installed anywhere with access to Postgres database and the Arvados API server, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for additional information.
+
+By convention, we use the following hostname for the websocket service.
+
+<notextile>
+<pre><code>ws.<span class="userinput">uuid_prefix.your.domain</span></code></pre>
+</notextile>
+
+The above hostname should resolve from anywhere on the internet.
+
+h2. Install arvados-ws
+
+Typically arvados-ws runs on the same host as the API server.
+
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install arvados-ws</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install arvados-ws</span>
+</code></pre>
+</notextile>
+
+Verify that @arvados-ws@ is functional:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arvados-ws -h</span>
+Usage of arvados-ws:
+ -config path
+ path to config file (default "/etc/arvados/ws/ws.yml")
+ -dump-config
+ show current configuration and exit
+</code></pre>
+</notextile>
+
+h3. Create a configuration file
+
+Create @/etc/arvados/ws/ws.yml@ using the following template. Replace @xxxxxxxx@ with the "password you generated during database setup":install-postgresql.html#api.
+
+<notextile>
+<pre><code>Client:
+ APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
+Listen: ":<span class="userinput">9003</span>"
+Postgres:
+ dbname: arvados_production
+ host: localhost
+ password: <span class="userinput">xxxxxxxx</span>
+ user: arvados
+</code></pre>
+</notextile>
+
+h3. Start the service (option 1: systemd)
+
+If your system does not use systemd, skip this section and follow the "runit instructions":#runit instead.
+
+If your system uses systemd, the arvados-ws service should already be set up. Start it and check its status:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo systemctl restart arvados-ws</span>
+~$ <span class="userinput">sudo systemctl status arvados-ws</span>
+● arvados-ws.service - Arvados websocket server
+ Loaded: loaded (/lib/systemd/system/arvados-ws.service; enabled)
+ Active: active (running) since Tue 2016-12-06 11:20:48 EST; 10s ago
+ Docs: https://doc.arvados.org/
+ Main PID: 9421 (arvados-ws)
+ CGroup: /system.slice/arvados-ws.service
+ └─9421 /usr/bin/arvados-ws
+
+Dec 06 11:20:48 zzzzz arvados-ws[9421]: {"level":"info","msg":"started","time":"2016-12-06T11:20:48.207617188-05:00"}
+Dec 06 11:20:48 zzzzz arvados-ws[9421]: {"Listen":":9003","level":"info","msg":"listening","time":"2016-12-06T11:20:48.244956506-05:00"}
+Dec 06 11:20:48 zzzzz systemd[1]: Started Arvados websocket server.
+</code></pre>
+</notextile>
+
+If it is not running, use @journalctl@ to check logs for errors:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo journalctl -n10 -u arvados-ws</span>
+...
+Dec 06 11:12:48 zzzzz systemd[1]: Starting Arvados websocket server...
+Dec 06 11:12:48 zzzzz arvados-ws[8918]: {"level":"info","msg":"started","time":"2016-12-06T11:12:48.030496636-05:00"}
+Dec 06 11:12:48 zzzzz arvados-ws[8918]: {"error":"pq: password authentication failed for user \"arvados\"","level":"fatal","msg":"db.Ping failed","time":"2016-12-06T11:12:48.058206400-05:00"}
+</code></pre>
+</notextile>
+
+Skip ahead to "confirm the service is working":#confirm.
+
+h3(#runit). Start the service (option 2: runit)
+
+Install runit to supervise the arvados-ws daemon. {% include 'install_runit' %}
+
+Create a supervised service.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo mkdir /etc/service/arvados-ws</span>
+~$ <span class="userinput">cd /etc/service/arvados-ws</span>
+~$ <span class="userinput">sudo mkdir log log/main</span>
+~$ <span class="userinput">printf '#!/bin/sh\nexec arvados-ws 2>&1\n' | sudo tee run</span>
+~$ <span class="userinput">printf '#!/bin/sh\nexec svlogd main\n' | sudo tee log/run</span>
+~$ <span class="userinput">sudo chmod +x run log/run</span>
+~$ <span class="userinput">sudo sv exit .</span>
+~$ <span class="userinput">cd -</span>
+</code></pre>
+</notextile>
+
+Use @sv stat@ and check the log file to verify the service is running.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sv stat /etc/service/arvados-ws</span>
+run: /etc/service/arvados-ws: (pid 12520) 2s; run: log: (pid 12519) 2s
+~$ <span class="userinput">tail /etc/service/arvados-ws/log/main/current</span>
+{"level":"info","msg":"started","time":"2016-12-06T11:56:20.669171449-05:00"}
+{"Listen":":9003","level":"info","msg":"listening","time":"2016-12-06T11:56:20.708847627-05:00"}
+</code></pre>
+</notextile>
+
+h3(#confirm). Confirm the service is working
+
+Confirm the service is listening on its assigned port and responding to requests.
+
+<notextile>
+<pre><code>~$ <span class="userinput">curl http://0.0.0.0:<b>9003</b>/status.json</span>
+{"Clients":1}
+</code></pre>
+</notextile>
+
+h3. Set up a reverse proxy with SSL support
+
+The arvados-ws service will be accessible from anywhere on the internet, so we recommend using SSL for transport encryption.
+
+This is best achieved by putting a reverse proxy with SSL support in front of arvados-ws, running on port 443 and passing requests to arvados-ws on port 9003 (or whatever port you chose in your configuration file).
+
+For example, using Nginx:
+
+<notextile><pre>
+upstream arvados-ws {
+ server 127.0.0.1:<span class="userinput">9003</span>;
+}
+
+server {
+ listen <span class="userinput">[your public IP address]</span>:443 ssl;
+ server_name ws.<span class="userinput">uuid_prefix.your.domain</span>;
+
+ proxy_connect_timeout 90s;
+ proxy_read_timeout 300s;
+
+ ssl on;
+ ssl_certificate <span class="userinput"/>YOUR/PATH/TO/cert.pem</span>;
+ ssl_certificate_key <span class="userinput"/>YOUR/PATH/TO/cert.key</span>;
+
+ location / {
+ proxy_pass http://arvados-ws;
+ proxy_set_header Upgrade $http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host $host;
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ }
+}
+</pre></notextile>
+
+If Nginx is already configured to proxy @ws@ requests to puma, move that configuration out of the way or change its @server_name@ so it doesn't conflict.
+
+h3. Update API server configuration
+
+Ensure the websocket server address is correct in the API server configuration file @/etc/arvados/api/application.yml@.
+
+<notextile>
+<pre><code>websocket_address: wss://ws.<span class="userinput">uuid_prefix.your.domain</span>/websocket
+</code></pre>
+</notextile>
+
+Restart Nginx to reload the API server configuration.
+
+<notextile>
+<pre><code>$ sudo nginx -s reload</span>
+</code></pre>
+</notextile>
+
+h3. Verify DNS and proxy setup
+
+Use a host elsewhere on the Internet to confirm that your DNS, proxy, and SSL are configured correctly.
+
+<notextile>
+<pre><code>$ <span class="userinput">curl https://ws.<b>uuid_prefix.your.domain</b>/status.json</span>
+{"Clients":1}
+</code></pre>
+</notextile>
def test_raw_file
out, err = capture_subprocess_io do
- assert arv_put('--raw', './tmp/foo')
+ assert arv_put('--no-cache', '--raw', './tmp/foo')
end
$stderr.write err
assert_match '', err
def test_as_stream
out, err = capture_subprocess_io do
- assert arv_put('--as-stream', './tmp/foo')
+ assert arv_put('--no-cache', '--as-stream', './tmp/foo')
end
$stderr.write err
assert_match '', err
def test_progress
out, err = capture_subprocess_io do
- assert arv_put('--manifest', '--progress', './tmp/foo')
+ assert arv_put('--no-cache', '--manifest', '--progress', './tmp/foo')
end
assert_match /%/, err
assert match_collection_uuid(out)
def test_batch_progress
out, err = capture_subprocess_io do
- assert arv_put('--manifest', '--batch-progress', './tmp/foo')
+ assert arv_put('--no-cache', '--manifest', '--batch-progress', './tmp/foo')
end
assert_match /: 0 written 3 total/, err
assert_match /: 3 written 3 total/, err
import cwltool.process
from schema_salad.ref_resolver import Loader
-from schema_salad.ref_resolver import Loader
+from .matcher import JsonDiffMatcher
if not os.getenv('ARVADOS_DEBUG'):
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
make_fs_access=make_fs_access, tmpdir="/tmp"):
j.run(enable_reuse=enable_reuse)
runner.api.container_requests().create.assert_called_with(
- body={
+ body=JsonDiffMatcher({
'environment': {
'HOME': '/var/spool/cwl',
'TMPDIR': '/tmp'
'container_image': '99999999999999999999999999999993+99',
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
- 'scheduling_parameters': {}
- })
+ 'scheduling_parameters': {},
+ 'properties': {},
+ }))
# The test passes some fields in builder.resources
# For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
'cwd': '/var/spool/cwl',
'scheduling_parameters': {
'partitions': ['blurb']
- }
+ },
+ 'properties': {}
}
call_body = call_kwargs.get('body', None)
// callers who use a Client to initialize an
// arvadosclient.ArvadosClient.)
KeepServiceURIs []string `json:",omitempty"`
+
+ dd *DiscoveryDocument
}
// The default http.Client used by a Client with Insecure==true and
// DiscoveryDocument is the Arvados server's description of itself.
type DiscoveryDocument struct {
- DefaultCollectionReplication int `json:"defaultCollectionReplication"`
- BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+ BasePath string `json:"basePath"`
+ DefaultCollectionReplication int `json:"defaultCollectionReplication"`
+ BlobSignatureTTL int64 `json:"blobSignatureTtl"`
+ Schemas map[string]Schema `json:"schemas"`
+ Resources map[string]Resource `json:"resources"`
+}
+
+type Resource struct {
+ Methods map[string]ResourceMethod `json:"methods"`
+}
+
+type ResourceMethod struct {
+ HTTPMethod string `json:"httpMethod"`
+ Path string `json:"path"`
+ Response MethodResponse `json:"response"`
+}
+
+type MethodResponse struct {
+ Ref string `json:"$ref"`
+}
+
+type Schema struct {
+ UUIDPrefix string `json:"uuidPrefix"`
}
// DiscoveryDocument returns a *DiscoveryDocument. The returned object
// should not be modified: the same object may be returned by
// subsequent calls.
func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+ if c.dd != nil {
+ return c.dd, nil
+ }
var dd DiscoveryDocument
- return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+ err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+ if err != nil {
+ return nil, err
+ }
+ c.dd = &dd
+ return c.dd, nil
+}
+
+func (c *Client) modelForUUID(dd *DiscoveryDocument, uuid string) (string, error) {
+ if len(uuid) != 27 {
+ return "", fmt.Errorf("invalid UUID: %q", uuid)
+ }
+ infix := uuid[6:11]
+ var model string
+ for m, s := range dd.Schemas {
+ if s.UUIDPrefix == infix {
+ model = m
+ break
+ }
+ }
+ if model == "" {
+ return "", fmt.Errorf("unrecognized type portion %q in UUID %q", infix, uuid)
+ }
+ return model, nil
+}
+
+func (c *Client) KindForUUID(uuid string) (string, error) {
+ dd, err := c.DiscoveryDocument()
+ if err != nil {
+ return "", err
+ }
+ model, err := c.modelForUUID(dd, uuid)
+ if err != nil {
+ return "", err
+ }
+ return "arvados#" + strings.ToLower(model[:1]) + model[1:], nil
+}
+
+func (c *Client) PathForUUID(method, uuid string) (string, error) {
+ dd, err := c.DiscoveryDocument()
+ if err != nil {
+ return "", err
+ }
+ model, err := c.modelForUUID(dd, uuid)
+ if err != nil {
+ return "", err
+ }
+ var resource string
+ for r, rsc := range dd.Resources {
+ if rsc.Methods["get"].Response.Ref == model {
+ resource = r
+ break
+ }
+ }
+ if resource == "" {
+ return "", fmt.Errorf("no resource for model: %q", model)
+ }
+ m, ok := dd.Resources[resource].Methods[method]
+ if !ok {
+ return "", fmt.Errorf("no method %q for resource %q", method, resource)
+ }
+ path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1)
+ if path[0] == '/' {
+ path = path[1:]
+ }
+ return path, nil
}
--- /dev/null
+package arvados
+
+import (
+ "time"
+)
+
+// Log is an arvados#log record
+type Log struct {
+ ID uint64 `json:"id"`
+ UUID string `json:"uuid"`
+ ObjectUUID string `json:"object_uuid"`
+ ObjectOwnerUUID string `json:"object_owner_uuid"`
+ EventType string `json:"event_type"`
+ EventAt *time.Time `json:"event,omitempty"`
+ Properties map[string]interface{} `json:"properties"`
+ CreatedAt *time.Time `json:"created_at,omitempty"`
+}
+
+// LogList is an arvados#logList resource.
+type LogList struct {
+ Items []Log `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
}
return nil
}
+
+// Dump returns a YAML representation of cfg.
+func Dump(cfg interface{}) ([]byte, error) {
+ return yaml.Marshal(cfg)
+}
--- /dev/null
+package ctxlog
+
+import (
+ "context"
+
+ "github.com/Sirupsen/logrus"
+)
+
+var (
+ loggerCtxKey = new(int)
+ rootLogger = logrus.New()
+)
+
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
+// Context returns a new child context such that FromContext(child)
+// returns the given logger.
+func Context(ctx context.Context, logger *logrus.Entry) context.Context {
+ return context.WithValue(ctx, loggerCtxKey, logger)
+}
+
+// FromContext returns the logger suitable for the given context -- the one
+// attached by contextWithLogger() if applicable, otherwise the
+// top-level logger with no fields/values.
+func FromContext(ctx context.Context) *logrus.Entry {
+ if ctx != nil {
+ if logger, ok := ctx.Value(loggerCtxKey).(*logrus.Entry); ok {
+ return logger
+ }
+ }
+ return rootLogger.WithFields(nil)
+}
+
+// SetLevel sets the current logging level. See logrus for level
+// names.
+func SetLevel(level string) {
+ lvl, err := logrus.ParseLevel(level)
+ if err != nil {
+ logrus.Fatal(err)
+ }
+ rootLogger.Level = lvl
+}
+
+// SetFormat sets the current logging format to "json" or "text".
+func SetFormat(format string) {
+ switch format {
+ case "text":
+ rootLogger.Formatter = &logrus.TextFormatter{
+ FullTimestamp: true,
+ TimestampFormat: rfc3339NanoFixed,
+ }
+ case "json":
+ rootLogger.Formatter = &logrus.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ }
+ default:
+ logrus.WithField("LogFormat", format).Fatal("unknown log format")
+ }
+}
--- /dev/null
+package stats
+
+import (
+ "fmt"
+ "strconv"
+ "time"
+)
+
+// Duration is a duration that is displayed as a number of seconds in
+// fixed-point notation.
+type Duration time.Duration
+
+// MarshalJSON implements json.Marshaler.
+func (d Duration) MarshalJSON() ([]byte, error) {
+ return []byte(d.String()), nil
+}
+
+// String implements fmt.Stringer.
+func (d Duration) String() string {
+ return fmt.Sprintf("%.6f", time.Duration(d).Seconds())
+}
+
+// UnmarshalJSON implements json.Unmarshaler
+func (d *Duration) UnmarshalJSON(data []byte) error {
+ return d.Set(string(data))
+}
+
+// Value implements flag.Value
+func (d *Duration) Set(s string) error {
+ sec, err := strconv.ParseFloat(s, 64)
+ if err == nil {
+ *d = Duration(sec * float64(time.Second))
+ }
+ return err
+}
--- /dev/null
+package stats
+
+import (
+ "testing"
+ "time"
+)
+
+func TestString(t *testing.T) {
+ d := Duration(123123123123 * time.Nanosecond)
+ if s, expect := d.String(), "123.123123"; s != expect {
+ t.Errorf("got %s, expect %s", s, expect)
+ }
+}
+
+func TestSet(t *testing.T) {
+ var d Duration
+ if err := d.Set("123.456"); err != nil {
+ t.Fatal(err)
+ }
+ if got, expect := time.Duration(d).Nanoseconds(), int64(123456000000); got != expect {
+ t.Errorf("got %d, expect %d", got, expect)
+ }
+}
def writable(self):
return self.parent.writable()
+ @synchronized
+ def permission_expired(self, as_of_dt=None):
+ """Returns True if any of the segment's locators is expired"""
+ for r in self._segments:
+ if KeepLocator(r.locator).permission_expired(as_of_dt):
+ return True
+ return False
+
@synchronized
def segments(self):
return copy.copy(self._segments)
def find(self, path):
"""Recursively search the specified file path.
- May return either a Collection or ArvadosFile. Return None if not
+ May return either a Collection or ArvadosFile. Return None if not
found.
+ If path is invalid (ex: starts with '/'), an IOError exception will be
+ raised.
"""
if not path:
raise errors.ArgumentError("Parameter 'path' is empty.")
pathcomponents = path.split("/", 1)
+ if pathcomponents[0] == '':
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
item = self._items.get(pathcomponents[0])
- if len(pathcomponents) == 1:
+ if item is None:
+ return None
+ elif len(pathcomponents) == 1:
return item
else:
if isinstance(item, RichCollectionBase):
if target_dir is None:
raise IOError(errno.ENOENT, "Target directory not found", target_name)
- if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+ if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
target_dir = target_dir[target_name]
target_name = sourcecomponents[-1]
import arvados
import arvados.collection
import base64
+import copy
import datetime
import errno
import fcntl
import hashlib
import json
+import logging
import os
import pwd
-import time
+import re
import signal
import socket
import sys
import tempfile
import threading
-import copy
-import logging
+import time
from apiclient import errors as apiclient_errors
from arvados._version import __version__
_group = upload_opts.add_mutually_exclusive_group()
_group.add_argument('--max-manifest-depth', type=int, metavar='N',
- default=-1, help="""
-Maximum depth of directory tree to represent in the manifest
-structure. A directory structure deeper than this will be represented
-as a single stream in the manifest. If N=0, the manifest will contain
-a single stream. Default: -1 (unlimited), i.e., exactly one manifest
-stream per filesystem directory that contains files.
-""")
+ default=-1, help=argparse.SUPPRESS)
_group.add_argument('--normalize', action='store_true',
help="""
data.
""")
+_group.add_argument('--dry-run', action='store_true', default=False,
+ help="""
+Don't actually upload files, but only check if any file should be
+uploaded. Exit with code=2 when files are pending for upload.
+""")
+
_group = upload_opts.add_mutually_exclusive_group()
_group.add_argument('--as-stream', action='store_true', dest='stream',
manifest.
""")
+upload_opts.add_argument('--update-collection', type=str, default=None,
+ dest='update_collection', metavar="UUID", help="""
+Update an existing collection identified by the given Arvados collection
+UUID. All new local files will be uploaded.
+""")
+
upload_opts.add_argument('--use-filename', type=str, default=None,
dest='filename', help="""
Synonym for --filename.
Do not continue interrupted uploads from cached state.
""")
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
+ help="""
+Save upload state in a cache file for resuming (default).
+""")
+_group.add_argument('--no-cache', action='store_false', dest='use_cache',
+ help="""
+Do not save upload state in a cache file for resuming.
+""")
+
arg_parser = argparse.ArgumentParser(
description='Copy data from the local filesystem to Keep.',
parents=[upload_opts, run_opts, arv_cmd.retry_opt])
and os.isatty(sys.stderr.fileno())):
args.progress = True
+ # Turn off --resume (default) if --no-cache is used.
+ if not args.use_cache:
+ args.resume = False
+
if args.paths == ['-']:
+ if args.update_collection:
+ arg_parser.error("""
+ --update-collection cannot be used when reading from stdin.
+ """)
args.resume = False
+ args.use_cache = False
if not args.filename:
args.filename = 'stdin'
return args
+
+class CollectionUpdateError(Exception):
+ pass
+
+
class ResumeCacheConflict(Exception):
pass
+class ArvPutArgumentConflict(Exception):
+ pass
+
+
+class ArvPutUploadIsPending(Exception):
+ pass
+
+
+class ArvPutUploadNotPending(Exception):
+ pass
+
+
+class FileUploadList(list):
+ def __init__(self, dry_run=False):
+ list.__init__(self)
+ self.dry_run = dry_run
+
+ def append(self, other):
+ if self.dry_run:
+ raise ArvPutUploadIsPending()
+ super(FileUploadList, self).append(other)
+
+
class ResumeCache(object):
CACHE_DIR = '.cache/arvados/arv-put'
realpaths = sorted(os.path.realpath(path) for path in args.paths)
md5.update('\0'.join(realpaths))
if any(os.path.isdir(path) for path in realpaths):
- md5.update(str(max(args.max_manifest_depth, -1)))
+ md5.update("-1")
elif args.filename:
md5.update(args.filename)
return os.path.join(
'files' : {} # Previous run file list: {path : {size, mtime}}
}
- def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
- name=None, owner_uuid=None, ensure_unique_name=False,
- num_retries=None, replication_desired=None,
- filename=None, update_time=1.0):
+ def __init__(self, paths, resume=True, use_cache=True, reporter=None,
+ bytes_expected=None, name=None, owner_uuid=None,
+ ensure_unique_name=False, num_retries=None, replication_desired=None,
+ filename=None, update_time=20.0, update_collection=None,
+ logger=logging.getLogger('arvados.arv_put'), dry_run=False):
self.paths = paths
self.resume = resume
+ self.use_cache = use_cache
+ self.update = False
self.reporter = reporter
self.bytes_expected = bytes_expected
self.bytes_written = 0
self._state = None # Previous run state (file list & manifest)
self._current_files = [] # Current run file list
self._cache_file = None
- self._collection = None
self._collection_lock = threading.Lock()
+ self._remote_collection = None # Collection being updated (if asked)
+ self._local_collection = None # Collection from previous run manifest
+ self._file_paths = [] # Files to be updated in remote collection
self._stop_checkpointer = threading.Event()
self._checkpointer = threading.Thread(target=self._update_task)
+ self._checkpointer.daemon = True
self._update_task_time = update_time # How many seconds wait between update runs
- self.logger = logging.getLogger('arvados.arv_put')
+ self._files_to_upload = FileUploadList(dry_run=dry_run)
+ self.logger = logger
+ self.dry_run = dry_run
+
+ if not self.use_cache and self.resume:
+ raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
+
+ # Check for obvious dry-run responses
+ if self.dry_run and (not self.use_cache or not self.resume):
+ raise ArvPutUploadIsPending()
+
# Load cached data if any and if needed
- self._setup_state()
+ self._setup_state(update_collection)
- def start(self):
+ def start(self, save_collection):
"""
Start supporting thread & file uploading
"""
- self._checkpointer.daemon = True
- self._checkpointer.start()
+ if not self.dry_run:
+ self._checkpointer.start()
try:
for path in self.paths:
# Test for stdin first, in case some file named '-' exist
if path == '-':
+ if self.dry_run:
+ raise ArvPutUploadIsPending()
self._write_stdin(self.filename or 'stdin')
elif os.path.isdir(path):
- self._write_directory_tree(path)
+ # Use absolute paths on cache index so CWD doesn't interfere
+ # with the caching logic.
+ prefixdir = path = os.path.abspath(path)
+ if prefixdir != '/':
+ prefixdir += '/'
+ for root, dirs, files in os.walk(path):
+ # Make os.walk()'s dir traversing order deterministic
+ dirs.sort()
+ files.sort()
+ for f in files:
+ self._check_file(os.path.join(root, f),
+ os.path.join(root[len(prefixdir):], f))
else:
- self._write_file(path, self.filename or os.path.basename(path))
- finally:
- # Stop the thread before doing anything else
- self._stop_checkpointer.set()
- self._checkpointer.join()
- # Commit all & one last _update()
- self.manifest_text()
+ self._check_file(os.path.abspath(path),
+ self.filename or os.path.basename(path))
+ # If dry-mode is on, and got up to this point, then we should notify that
+ # there aren't any file to upload.
+ if self.dry_run:
+ raise ArvPutUploadNotPending()
+ # Remove local_collection's files that don't exist locally anymore, so the
+ # bytes_written count is correct.
+ for f in self.collection_file_paths(self._local_collection,
+ path_prefix=""):
+ if f != 'stdin' and not f in self._file_paths:
+ self._local_collection.remove(f)
+ # Update bytes_written from current local collection and
+ # report initial progress.
self._update()
- if self.resume:
+ # Actual file upload
+ self._upload_files()
+ finally:
+ if not self.dry_run:
+ # Stop the thread before doing anything else
+ self._stop_checkpointer.set()
+ self._checkpointer.join()
+ # Commit all pending blocks & one last _update()
+ self._local_collection.manifest_text()
+ self._update(final=True)
+ if save_collection:
+ self.save_collection()
+ if self.use_cache:
self._cache_file.close()
- # Correct the final written bytes count
- self.bytes_written -= self.bytes_skipped
def save_collection(self):
- with self._collection_lock:
- self._my_collection().save_new(
+ if self.update:
+ # Check if files should be updated on the remote collection.
+ for fp in self._file_paths:
+ remote_file = self._remote_collection.find(fp)
+ if not remote_file:
+ # File don't exist on remote collection, copy it.
+ self._remote_collection.copy(fp, fp, self._local_collection)
+ elif remote_file != self._local_collection.find(fp):
+ # A different file exist on remote collection, overwrite it.
+ self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
+ else:
+ # The file already exist on remote collection, skip it.
+ pass
+ self._remote_collection.save(num_retries=self.num_retries)
+ else:
+ self._local_collection.save_new(
name=self.name, owner_uuid=self.owner_uuid,
ensure_unique_name=self.ensure_unique_name,
num_retries=self.num_retries)
def destroy_cache(self):
- if self.resume:
+ if self.use_cache:
try:
os.unlink(self._cache_filename)
except OSError as error:
while not self._stop_checkpointer.wait(self._update_task_time):
self._update()
- def _update(self):
+ def _update(self, final=False):
"""
Update cached manifest text and report progress.
"""
with self._collection_lock:
- self.bytes_written = self._collection_size(self._my_collection())
- # Update cache, if resume enabled
- if self.resume:
+ self.bytes_written = self._collection_size(self._local_collection)
+ if self.use_cache:
+ # Update cache
with self._state_lock:
- # Get the manifest text without comitting pending blocks
- self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+ if final:
+ self._state['manifest'] = self._local_collection.manifest_text()
+ else:
+ # Get the manifest text without comitting pending blocks
+ self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
self._save_state()
# Call the reporter, if any
self.report_progress()
if self.reporter is not None:
self.reporter(self.bytes_written, self.bytes_expected)
- def _write_directory_tree(self, path, stream_name="."):
- # TODO: Check what happens when multiple directories are passed as
- # arguments.
- # If the code below is uncommented, integration test
- # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
- # fails, I suppose it is because the manifest_uuid changes because
- # of the dir addition to stream_name.
-
- # if stream_name == '.':
- # stream_name = os.path.join('.', os.path.basename(path))
- for item in os.listdir(path):
- if os.path.isdir(os.path.join(path, item)):
- self._write_directory_tree(os.path.join(path, item),
- os.path.join(stream_name, item))
- else:
- self._write_file(os.path.join(path, item),
- os.path.join(stream_name, item))
-
def _write_stdin(self, filename):
- with self._collection_lock:
- output = self._my_collection().open(filename, 'w')
+ output = self._local_collection.open(filename, 'w')
self._write(sys.stdin, output)
output.close()
- def _write_file(self, source, filename):
+ def _check_file(self, source, filename):
+ """Check if this file needs to be uploaded"""
resume_offset = 0
- if self.resume:
- # Check if file was already uploaded (at least partially)
- with self._collection_lock:
- try:
- file_in_collection = self._my_collection().find(filename)
- except IOError:
- # Not found
- file_in_collection = None
+ should_upload = False
+ new_file_in_cache = False
+ # Record file path for updating the remote collection before exiting
+ self._file_paths.append(filename)
+
+ with self._state_lock:
# If no previous cached data on this file, store it for an eventual
# repeated run.
if source not in self._state['files']:
- with self._state_lock:
- self._state['files'][source] = {
- 'mtime': os.path.getmtime(source),
- 'size' : os.path.getsize(source)
- }
- with self._state_lock:
- cached_file_data = self._state['files'][source]
- # See if this file was already uploaded at least partially
- if file_in_collection:
- if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
- if cached_file_data['size'] == file_in_collection.size():
- # File already there, skip it.
- self.bytes_skipped += cached_file_data['size']
- return
- elif cached_file_data['size'] > file_in_collection.size():
- # File partially uploaded, resume!
- resume_offset = file_in_collection.size()
- else:
- # Inconsistent cache, re-upload the file
- self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
- else:
- # Local file differs from cached data, re-upload it
- pass
- with open(source, 'r') as source_fd:
- if resume_offset > 0:
- # Start upload where we left off
- with self._collection_lock:
- output = self._my_collection().open(filename, 'a')
- source_fd.seek(resume_offset)
+ self._state['files'][source] = {
+ 'mtime': os.path.getmtime(source),
+ 'size' : os.path.getsize(source)
+ }
+ new_file_in_cache = True
+ cached_file_data = self._state['files'][source]
+
+ # Check if file was already uploaded (at least partially)
+ file_in_local_collection = self._local_collection.find(filename)
+
+ # If not resuming, upload the full file.
+ if not self.resume:
+ should_upload = True
+ # New file detected from last run, upload it.
+ elif new_file_in_cache:
+ should_upload = True
+ # Local file didn't change from last run.
+ elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+ if not file_in_local_collection:
+ # File not uploaded yet, upload it completely
+ should_upload = True
+ elif file_in_local_collection.permission_expired():
+ # Permission token expired, re-upload file. This will change whenever
+ # we have a API for refreshing tokens.
+ should_upload = True
+ self._local_collection.remove(filename)
+ elif cached_file_data['size'] == file_in_local_collection.size():
+ # File already there, skip it.
+ self.bytes_skipped += cached_file_data['size']
+ elif cached_file_data['size'] > file_in_local_collection.size():
+ # File partially uploaded, resume!
+ resume_offset = file_in_local_collection.size()
self.bytes_skipped += resume_offset
+ should_upload = True
else:
- # Start from scratch
- with self._collection_lock:
- output = self._my_collection().open(filename, 'w')
- self._write(source_fd, output)
- output.close(flush=False)
+ # Inconsistent cache, re-upload the file
+ should_upload = True
+ self._local_collection.remove(filename)
+ self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ # Local file differs from cached data, re-upload it.
+ else:
+ if file_in_local_collection:
+ self._local_collection.remove(filename)
+ should_upload = True
+
+ if should_upload:
+ self._files_to_upload.append((source, resume_offset, filename))
+
+ def _upload_files(self):
+ for source, resume_offset, filename in self._files_to_upload:
+ with open(source, 'r') as source_fd:
+ with self._state_lock:
+ self._state['files'][source]['mtime'] = os.path.getmtime(source)
+ self._state['files'][source]['size'] = os.path.getsize(source)
+ if resume_offset > 0:
+ # Start upload where we left off
+ output = self._local_collection.open(filename, 'a')
+ source_fd.seek(resume_offset)
+ else:
+ # Start from scratch
+ output = self._local_collection.open(filename, 'w')
+ self._write(source_fd, output)
+ output.close(flush=False)
def _write(self, source_fd, output):
- first_read = True
while True:
data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
- # Allow an empty file to be written
- if not data and not first_read:
+ if not data:
break
- if first_read:
- first_read = False
output.write(data)
def _my_collection(self):
- """
- Create a new collection if none cached. Load it from cache otherwise.
- """
- if self._collection is None:
- with self._state_lock:
- manifest = self._state['manifest']
- if self.resume and manifest is not None:
- # Create collection from saved state
- self._collection = arvados.collection.Collection(
- manifest,
- replication_desired=self.replication_desired)
- else:
- # Create new collection
- self._collection = arvados.collection.Collection(
- replication_desired=self.replication_desired)
- return self._collection
+ return self._remote_collection if self.update else self._local_collection
- def _setup_state(self):
+ def _setup_state(self, update_collection):
"""
Create a new cache file or load a previously existing one.
"""
- if self.resume:
+ # Load an already existing collection for update
+ if update_collection and re.match(arvados.util.collection_uuid_pattern,
+ update_collection):
+ try:
+ self._remote_collection = arvados.collection.Collection(update_collection)
+ except arvados.errors.ApiError as error:
+ raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
+ else:
+ self.update = True
+ elif update_collection:
+ # Collection locator provided, but unknown format
+ raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+
+ if self.use_cache:
+ # Set up cache file name from input paths.
md5 = hashlib.md5()
md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
realpaths = sorted(os.path.realpath(path) for path in self.paths)
if self.filename:
md5.update(self.filename)
cache_filename = md5.hexdigest()
- self._cache_file = open(os.path.join(
+ cache_filepath = os.path.join(
arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
- cache_filename), 'a+')
+ cache_filename)
+ if self.resume:
+ self._cache_file = open(cache_filepath, 'a+')
+ else:
+ # --no-resume means start with a empty cache file.
+ self._cache_file = open(cache_filepath, 'w+')
self._cache_filename = self._cache_file.name
self._lock_file(self._cache_file)
self._cache_file.seek(0)
- with self._state_lock:
+
+ with self._state_lock:
+ if self.use_cache:
try:
self._state = json.load(self._cache_file)
if not set(['manifest', 'files']).issubset(set(self._state.keys())):
except ValueError:
# Cache file empty, set up new cache
self._state = copy.deepcopy(self.EMPTY_STATE)
- # Load how many bytes were uploaded on previous run
- with self._collection_lock:
- self.bytes_written = self._collection_size(self._my_collection())
- # No resume required
- else:
- with self._state_lock:
+ else:
+ # No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
+ # Load the previous manifest so we can check if files were modified remotely.
+ self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
+
+ def collection_file_paths(self, col, path_prefix='.'):
+ """Return a list of file paths by recursively go through the entire collection `col`"""
+ file_paths = []
+ for name, item in col.items():
+ if isinstance(item, arvados.arvfile.ArvadosFile):
+ file_paths.append(os.path.join(path_prefix, name))
+ elif isinstance(item, arvados.collection.Subcollection):
+ new_prefix = os.path.join(path_prefix, name)
+ file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
+ return file_paths
def _lock_file(self, fileobj):
try:
"""
try:
with self._state_lock:
- state = self._state
+ state = copy.deepcopy(self._state)
new_cache_fd, new_cache_name = tempfile.mkstemp(
dir=os.path.dirname(self._cache_filename))
self._lock_file(new_cache_fd)
self._cache_file = new_cache
def collection_name(self):
- with self._collection_lock:
- name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
- return name
+ return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
def manifest_locator(self):
- with self._collection_lock:
- locator = self._my_collection().manifest_locator()
- return locator
+ return self._my_collection().manifest_locator()
def portable_data_hash(self):
- with self._collection_lock:
- datahash = self._my_collection().portable_data_hash()
- return datahash
+ return self._my_collection().portable_data_hash()
def manifest_text(self, stream_name=".", strip=False, normalize=False):
- with self._collection_lock:
- manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
- return manifest
+ return self._my_collection().manifest_text(stream_name, strip, normalize)
def _datablocks_on_item(self, item):
"""
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
global api_client
+ logger = logging.getLogger('arvados.arv_put')
args = parse_arguments(arguments)
status = 0
if api_client is None:
# Determine the name to use
if args.name:
if args.stream or args.raw:
- print >>stderr, "Cannot use --name with --stream or --raw"
+ logger.error("Cannot use --name with --stream or --raw")
+ sys.exit(1)
+ elif args.update_collection:
+ logger.error("Cannot use --name with --update-collection")
sys.exit(1)
collection_name = args.name
else:
socket.gethostname())
if args.project_uuid and (args.stream or args.raw):
- print >>stderr, "Cannot use --project-uuid with --stream or --raw"
+ logger.error("Cannot use --project-uuid with --stream or --raw")
sys.exit(1)
# Determine the parent project
project_uuid = desired_project_uuid(api_client, args.project_uuid,
args.retries)
except (apiclient_errors.Error, ValueError) as error:
- print >>stderr, error
+ logger.error(error)
sys.exit(1)
if args.progress:
reporter = None
bytes_expected = expected_bytes_for(args.paths)
+
try:
writer = ArvPutUploadJob(paths = args.paths,
resume = args.resume,
+ use_cache = args.use_cache,
filename = args.filename,
reporter = reporter,
bytes_expected = bytes_expected,
replication_desired = args.replication,
name = collection_name,
owner_uuid = project_uuid,
- ensure_unique_name = True)
+ ensure_unique_name = True,
+ update_collection = args.update_collection,
+ logger=logger,
+ dry_run=args.dry_run)
except ResumeCacheConflict:
- print >>stderr, "\n".join([
+ logger.error("\n".join([
"arv-put: Another process is already uploading this data.",
- " Use --no-resume if this is really what you want."])
+ " Use --no-cache if this is really what you want."]))
sys.exit(1)
+ except CollectionUpdateError as error:
+ logger.error("\n".join([
+ "arv-put: %s" % str(error)]))
+ sys.exit(1)
+ except ArvPutUploadIsPending:
+ # Dry run check successful, return proper exit code.
+ sys.exit(2)
+ except ArvPutUploadNotPending:
+ # No files pending for upload
+ sys.exit(0)
# Install our signal handler for each code in CAUGHT_SIGNALS, and save
# the originals.
orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
for sigcode in CAUGHT_SIGNALS}
- if args.resume and writer.bytes_written > 0:
- print >>stderr, "\n".join([
- "arv-put: Resuming previous upload from last checkpoint.",
- " Use the --no-resume option to start over."])
+ if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
+ logger.warning("\n".join([
+ "arv-put: Resuming previous upload from last checkpoint.",
+ " Use the --no-resume option to start over."]))
- writer.report_progress()
+ if not args.dry_run:
+ writer.report_progress()
output = None
- writer.start()
+ try:
+ writer.start(save_collection=not(args.stream or args.raw))
+ except arvados.errors.ApiError as error:
+ logger.error("\n".join([
+ "arv-put: %s" % str(error)]))
+ sys.exit(1)
+ except ArvPutUploadIsPending:
+ # Dry run check successful, return proper exit code.
+ sys.exit(2)
+ except ArvPutUploadNotPending:
+ # No files pending for upload
+ sys.exit(0)
+
if args.progress: # Print newline to split stderr from stdout for humans.
- print >>stderr
+ logger.info("\n")
if args.stream:
if args.normalize:
output = ','.join(writer.data_locators())
else:
try:
- writer.save_collection()
- print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+ if args.update_collection:
+ logger.info("Collection updated: '{}'".format(writer.collection_name()))
+ else:
+ logger.info("Collection saved as '{}'".format(writer.collection_name()))
if args.portable_data_hash:
output = writer.portable_data_hash()
else:
output = writer.manifest_locator()
except apiclient_errors.Error as error:
- print >>stderr, (
+ logger.error(
"arv-put: Error creating Collection on project: {}.".format(
error))
status = 1
sys.exit(status)
# Success!
- writer.destroy_cache()
return output
def _get_user_agent(self):
try:
- return self._user_agent_pool.get(False)
+ return self._user_agent_pool.get(block=False)
except Queue.Empty:
return pycurl.Curl()
def _put_user_agent(self, ua):
try:
ua.reset()
- self._user_agent_pool.put(ua, False)
+ self._user_agent_pool.put(ua, block=False)
except:
ua.close()
install_requires=[
'google-api-python-client==1.4.2',
'oauth2client >=1.4.6, <2',
- 'pyasn1-modules==0.0.5',
'ciso8601',
'httplib2',
'pycurl >=7.19.5.1, <7.21.5',
proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/;
}
}
+ upstream ws {
+ server localhost:{{WSPORT}};
+ }
+ server {
+ listen *:{{WSSPORT}} ssl default_server;
+ server_name ~^(?<request_host>.*)$;
+ ssl_certificate {{SSLCERT}};
+ ssl_certificate_key {{SSLKEY}};
+ location / {
+ proxy_pass http://ws;
+ proxy_set_header Upgrade $http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host $request_host:{{WSPORT}};
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ }
+ }
}
my_api_host = None
_cached_config = {}
+_cached_db_config = {}
def find_server_pid(PID_PATH, wait=10):
now = time.time()
os.makedirs(gitdir)
subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball])
+ # The nginx proxy isn't listening here yet, but we need to choose
+ # the wss:// port now so we can write the API server config file.
+ wss_port = find_available_port()
+ _setport('wss', wss_port)
+
port = find_available_port()
env = os.environ.copy()
env['RAILS_ENV'] = 'test'
- env['ARVADOS_WEBSOCKETS'] = 'yes'
+ env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
+ if env.get('ARVADOS_TEST_EXPERIMENTAL_WS'):
+ env.pop('ARVADOS_WEBSOCKETS', None)
+ else:
+ env['ARVADOS_WEBSOCKETS'] = 'yes'
env.pop('ARVADOS_TEST_API_HOST', None)
env.pop('ARVADOS_API_HOST', None)
env.pop('ARVADOS_API_HOST_INSECURE', None)
kill_server_pid(_pidfile('api'))
my_api_host = None
+def run_ws():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ stop_ws()
+ port = find_available_port()
+ conf = os.path.join(TEST_TMPDIR, 'ws.yml')
+ with open(conf, 'w') as f:
+ f.write("""
+Client:
+ APIHost: {}
+ Insecure: true
+Listen: :{}
+LogLevel: {}
+Postgres:
+ host: {}
+ dbname: {}
+ user: {}
+ password: {}
+ sslmode: require
+ """.format(os.environ['ARVADOS_API_HOST'],
+ port,
+ ('info' if os.environ.get('ARVADOS_DEBUG', '') in ['','0'] else 'debug'),
+ _dbconfig('host'),
+ _dbconfig('database'),
+ _dbconfig('username'),
+ _dbconfig('password')))
+ logf = open(_fifo2stderr('ws'), 'w')
+ ws = subprocess.Popen(
+ ["ws", "-config", conf],
+ stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+ with open(_pidfile('ws'), 'w') as f:
+ f.write(str(ws.pid))
+ _wait_until_port_listens(port)
+ _setport('ws', port)
+ return port
+
+def stop_ws():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ kill_server_pid(_pidfile('ws'))
+
def _start_keep(n, keep_args):
keep0 = tempfile.mkdtemp()
port = find_available_port()
def run_nginx():
if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
return
+ stop_nginx()
nginxconf = {}
nginxconf['KEEPWEBPORT'] = _getport('keep-web')
nginxconf['KEEPWEBDLSSLPORT'] = find_available_port()
nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
nginxconf['GITPORT'] = _getport('arv-git-httpd')
nginxconf['GITSSLPORT'] = find_available_port()
+ nginxconf['WSPORT'] = _getport('ws')
+ nginxconf['WSSPORT'] = _getport('wss')
nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
except IOError:
return 9
+def _dbconfig(key):
+ global _cached_db_config
+ if not _cached_db_config:
+ _cached_db_config = yaml.load(open(os.path.join(
+ SERVICES_SRC_DIR, 'api', 'config', 'database.yml')))
+ return _cached_db_config['test'][key]
+
def _apiconfig(key):
+ global _cached_config
if _cached_config:
return _cached_config[key]
def _load(f, required=True):
original environment.
"""
MAIN_SERVER = None
+ WS_SERVER = None
KEEP_SERVER = None
KEEP_PROXY_SERVER = None
KEEP_WEB_SERVER = None
os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
for server_kwargs, start_func, stop_func in (
(cls.MAIN_SERVER, run, reset),
+ (cls.WS_SERVER, run_ws, stop_ws),
(cls.KEEP_SERVER, run_keep, stop_keep),
(cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
(cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
if __name__ == "__main__":
actions = [
'start', 'stop',
+ 'start_ws', 'stop_ws',
'start_keep', 'stop_keep',
'start_keep_proxy', 'stop_keep_proxy',
'start_keep-web', 'stop_keep-web',
print(host)
elif args.action == 'stop':
stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
+ elif args.action == 'start_ws':
+ run_ws()
+ elif args.action == 'stop_ws':
+ stop_ws()
elif args.action == 'start_keep':
run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
elif args.action == 'stop_keep':
[],
['/dev/null'],
['/dev/null', '--filename', 'empty'],
- ['/tmp'],
- ['/tmp', '--max-manifest-depth', '0'],
- ['/tmp', '--max-manifest-depth', '1']
+ ['/tmp']
]
def tearDown(self):
class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
+
def setUp(self):
super(ArvPutUploadJobTest, self).setUp()
run_test_server.authorize_with('active')
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
- cwriter.start()
+ cwriter.start(save_collection=False)
self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
def test_writer_works_with_cache(self):
f.write('foo')
f.flush()
cwriter = arv_put.ArvPutUploadJob([f.name])
- cwriter.start()
- self.assertEqual(3, cwriter.bytes_written)
+ cwriter.start(save_collection=False)
+ self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
# Don't destroy the cache, and start another upload
cwriter_new = arv_put.ArvPutUploadJob([f.name])
- cwriter_new.start()
+ cwriter_new.start(save_collection=False)
cwriter_new.destroy_cache()
- self.assertEqual(0, cwriter_new.bytes_written)
+ self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
def make_progress_tester(self):
progression = []
progression, reporter = self.make_progress_tester()
cwriter = arv_put.ArvPutUploadJob([f.name],
reporter=reporter, bytes_expected=expect_count)
- cwriter.start()
+ cwriter.start(save_collection=False)
cwriter.destroy_cache()
self.assertIn((3, expect_count), progression)
def test_writer_upload_directory(self):
cwriter = arv_put.ArvPutUploadJob([self.tempdir])
- cwriter.start()
+ cwriter.start(save_collection=False)
cwriter.destroy_cache()
self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
writer = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
with self.assertRaises(SystemExit):
- writer.start()
- self.assertLess(writer.bytes_written,
- os.path.getsize(self.large_file_name))
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
# Retry the upload
writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
- writer2.start()
- self.assertEqual(writer.bytes_written + writer2.bytes_written,
+ writer2.start(save_collection=False)
+ self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
os.path.getsize(self.large_file_name))
writer2.destroy_cache()
+ def test_no_resume_when_asked(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload, this time without resume
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ resume=False)
+ writer2.start(save_collection=False)
+ self.assertEqual(writer2.bytes_skipped, 0)
+ self.assertEqual(writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
+
+ def test_no_resume_when_no_cache(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload, this time without cache usage
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ resume=False,
+ use_cache=False)
+ writer2.start(save_collection=False)
+ self.assertEqual(writer2.bytes_skipped, 0)
+ self.assertEqual(writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
+
+
+ def test_dry_run_feature(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload using dry_run to check if there is a pending upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ writer2.start(save_collection=False)
+ # Complete the pending upload
+ writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ writer3.start(save_collection=False)
+ # Confirm there's no pending upload with dry_run=True
+ writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
+ with self.assertRaises(arv_put.ArvPutUploadNotPending):
+ writer4.start(save_collection=False)
+ writer4.destroy_cache()
+ # Test obvious cases
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True,
+ resume=False,
+ use_cache=False)
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True,
+ resume=False)
+
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
TEST_SIZE = os.path.getsize(__file__)
self.assertEqual(1, len(collection_list))
return collection_list[0]
+ def test_put_collection_with_later_update(self):
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ # Add a new file to the directory
+ with open(os.path.join(tmpdir, 'file2'), 'w') as f:
+ f.write('The quick brown fox jumped over the lazy dog')
+ updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
+ self.assertEqual(col['uuid'], updated_col['uuid'])
+ # Get the manifest and check that the new file is being included
+ c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
+ self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
+
def test_put_collection_with_high_redundancy(self):
# Write empty data: we're not testing CollectionWriter, just
# making sure collections.create tells the API server what our
#!/usr/bin/env python
import bz2
+import datetime
import gzip
import io
import mock
def read_for_test(self, reader, byte_count, **kwargs):
return ''.join(reader.readlines(**kwargs))
+
+class ArvadosFileTestCase(unittest.TestCase):
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+
+ def test_permission_expired(self):
+ base_manifest = ". 781e5e245d69b566979b86e28d23f2c7+10+A715fd31f8111894f717eb1003c1b0216799dd9ec@{} 0:10:count.txt\n"
+ now = datetime.datetime.now()
+ a_week_ago = now - datetime.timedelta(days=7)
+ a_month_ago = now - datetime.timedelta(days=30)
+ a_week_from_now = now + datetime.timedelta(days=7)
+ with Collection(base_manifest.format(self.datetime_to_hex(a_week_from_now))) as c:
+ self.assertFalse(c.find('count.txt').permission_expired())
+ with Collection(base_manifest.format(self.datetime_to_hex(a_week_ago))) as c:
+ f = c.find('count.txt')
+ self.assertTrue(f.permission_expired())
+ self.assertTrue(f.permission_expired(a_week_from_now))
+ self.assertFalse(f.permission_expired(a_month_ago))
+
+
class BlockManagerTest(unittest.TestCase):
def test_bufferblock_append(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
c.find("/.")
with self.assertRaises(arvados.errors.ArgumentError):
c.find("")
+ self.assertIs(c.find("./nonexistant.txt"), None)
+ self.assertIs(c.find("./nonexistantsubdir/nonexistant.txt"), None)
def test_remove_in_subdir(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
self.assertEqual(200, events.get(True, 5)['status'])
human = arvados.api('v1').humans().create(body={}).execute()
- log_object_uuids = []
- for i in range(0, expected):
- log_object_uuids.append(events.get(True, 5)['object_uuid'])
-
+ want_uuids = []
if expected > 0:
- self.assertIn(human['uuid'], log_object_uuids)
-
+ want_uuids.append(human['uuid'])
if expected > 1:
- self.assertIn(ancestor['uuid'], log_object_uuids)
+ want_uuids.append(ancestor['uuid'])
+ log_object_uuids = []
+ while set(want_uuids) - set(log_object_uuids):
+ log_object_uuids.append(events.get(True, 5)['object_uuid'])
- with self.assertRaises(Queue.Empty):
- # assertEqual just serves to show us what unexpected thing
- # comes out of the queue when the assertRaises fails; when
- # the test passes, this assertEqual doesn't get called.
- self.assertEqual(events.get(True, 2), None)
+ if expected < 2:
+ with self.assertRaises(Queue.Empty):
+ # assertEqual just serves to show us what unexpected
+ # thing comes out of the queue when the assertRaises
+ # fails; when the test passes, this assertEqual
+ # doesn't get called.
+ self.assertEqual(events.get(True, 2), None)
def test_subscribe_websocket(self):
self._test_subscribe(
return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
def isotz(self, offset):
- """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
- return '{:+03d}{:02d}'.format(offset/60, offset%60)
+ """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
+ return '{:+03d}:{:02d}'.format(offset/60, offset%60)
# Test websocket reconnection on (un)execpted close
def _test_websocket_reconnect(self, close_unexpected):
parts = full_text_searchable_columns.collect do |column|
"coalesce(#{column},'')"
end
- # We prepend a space to the tsvector() argument here. Otherwise,
- # it might start with a column that has its own (non-full-text)
- # index, which causes Postgres to use the column index instead of
- # the tsvector index, which causes full text queries to be just as
- # slow as if we had no index at all.
- "to_tsvector('english', ' ' || #{parts.join(" || ' ' || ")})"
+ "to_tsvector('english', #{parts.join(" || ' ' || ")})"
end
def self.apply_filters query, filters
super - ["script_parameters_digest"]
end
+ def self.full_text_searchable_columns
+ super - ["script_parameters_digest"]
+ end
+
def self.load_job_specific_filters attrs, orig_filters, read_users
# Convert Job-specific @filters entries into general SQL filters.
script_info = {"repository" => nil, "script" => nil}
workbench_address: https://localhost:3001/
git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %>
git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %>
+ websocket_address: <% if ENV['ARVADOS_TEST_EXPERIMENTAL_WS'] %>"wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"<% else %>false<% end %>
--- /dev/null
+class FullTextSearchIndexes < ActiveRecord::Migration
+ def fts_indexes
+ {
+ "collections" => "collections_full_text_search_idx",
+ "container_requests" => "container_requests_full_text_search_idx",
+ "groups" => "groups_full_text_search_idx",
+ "jobs" => "jobs_full_text_search_idx",
+ "pipeline_instances" => "pipeline_instances_full_text_search_idx",
+ "pipeline_templates" => "pipeline_templates_full_text_search_idx",
+ "workflows" => "workflows_full_text_search_idx",
+ }
+ end
+
+ def up
+ # remove existing fts indexes and create up to date ones with no leading space
+ fts_indexes.each do |t, i|
+ ActiveRecord::Base.connection.indexes(t).each do |idx|
+ if idx.name == i
+ remove_index t.to_sym, :name => i
+ break
+ end
+ end
+ execute "CREATE INDEX #{i} ON #{t} USING gin(#{t.classify.constantize.full_text_tsvector});"
+ end
+ end
+
+ def down
+ fts_indexes.each do |t, i|
+ remove_index t.to_sym, :name => i
+ end
+ end
+end
-- Name: collections_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX collections_full_text_search_idx ON collections USING gin (to_tsvector('english'::regconfig, (((((((((((((((((' '::text || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(portable_data_hash, ''::character varying))::text) || ' '::text) || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(file_names, ''::character varying))::text)));
+CREATE INDEX collections_full_text_search_idx ON collections USING gin (to_tsvector('english'::regconfig, (((((((((((((((((COALESCE(owner_uuid, ''::character varying))::text || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(portable_data_hash, ''::character varying))::text) || ' '::text) || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(file_names, ''::character varying))::text)));
--
-- Name: container_requests_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX container_requests_full_text_search_idx ON container_requests USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(mounts, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text))));
+CREATE INDEX container_requests_full_text_search_idx ON container_requests USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(mounts, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)) || ' '::text) || COALESCE(scheduling_parameters, ''::text)) || ' '::text) || (COALESCE(output_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log_uuid, ''::character varying))::text)));
--
-- Name: groups_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX groups_full_text_search_idx ON groups USING gin (to_tsvector('english'::regconfig, (((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(group_class, ''::character varying))::text)));
+CREATE INDEX groups_full_text_search_idx ON groups USING gin (to_tsvector('english'::regconfig, (((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(group_class, ''::character varying))::text)));
--
-- Name: jobs_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX jobs_full_text_search_idx ON jobs USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(submit_id, ''::character varying))::text) || ' '::text) || (COALESCE(script, ''::character varying))::text) || ' '::text) || (COALESCE(script_version, ''::character varying))::text) || ' '::text) || COALESCE(script_parameters, ''::text)) || ' '::text) || (COALESCE(cancelled_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(cancelled_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output, ''::character varying))::text) || ' '::text) || (COALESCE(is_locked_by_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log, ''::character varying))::text) || ' '::text) || COALESCE(tasks_summary, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(repository, ''::character varying))::text) || ' '::text) || (COALESCE(supplied_script_version, ''::character varying))::text) || ' '::text) || (COALESCE(docker_image_locator, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(arvados_sdk_version, ''::character varying))::text)));
+CREATE INDEX jobs_full_text_search_idx ON jobs USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(submit_id, ''::character varying))::text) || ' '::text) || (COALESCE(script, ''::character varying))::text) || ' '::text) || (COALESCE(script_version, ''::character varying))::text) || ' '::text) || COALESCE(script_parameters, ''::text)) || ' '::text) || (COALESCE(cancelled_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(cancelled_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output, ''::character varying))::text) || ' '::text) || (COALESCE(is_locked_by_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log, ''::character varying))::text) || ' '::text) || COALESCE(tasks_summary, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(repository, ''::character varying))::text) || ' '::text) || (COALESCE(supplied_script_version, ''::character varying))::text) || ' '::text) || (COALESCE(docker_image_locator, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(arvados_sdk_version, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text))));
--
-- Name: pipeline_instances_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX pipeline_instances_full_text_search_idx ON pipeline_instances USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(pipeline_template_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || COALESCE(components_summary, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text)));
+CREATE INDEX pipeline_instances_full_text_search_idx ON pipeline_instances USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(pipeline_template_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || COALESCE(components_summary, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text)));
--
-- Name: pipeline_templates_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX pipeline_templates_full_text_search_idx ON pipeline_templates USING gin (to_tsvector('english'::regconfig, (((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text)));
+CREATE INDEX pipeline_templates_full_text_search_idx ON pipeline_templates USING gin (to_tsvector('english'::regconfig, (((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text)));
--
-- Name: workflows_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX workflows_full_text_search_idx ON workflows USING gin (to_tsvector('english'::regconfig, (((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(definition, ''::text))));
+CREATE INDEX workflows_full_text_search_idx ON workflows USING gin (to_tsvector('english'::regconfig, (((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(definition, ''::text))));
--
INSERT INTO schema_migrations (version) VALUES ('20161115171221');
-INSERT INTO schema_migrations (version) VALUES ('20161115174218');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20161115174218');
+
+INSERT INTO schema_migrations (version) VALUES ('20161213172944');
\ No newline at end of file
--- /dev/null
+namespace :config do
+ desc 'Show site configuration'
+ task dump: :environment do
+ puts $application_config.to_yaml
+ end
+end
if 'event_type' not in ev:
return
with llfuse.lock:
+ new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
+ pdh = new_attrs.get("portable_data_hash")
+ # new_attributes.modified_at currently lacks
+ # subsecond precision (see #6347) so use event_at
+ # which should always be the same.
+ stamp = ev.get("event_at")
+
for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
item.invalidate()
- if ev["object_kind"] == "arvados#collection":
- new_attr = (ev.get("properties") and
- ev["properties"].get("new_attributes") and
- ev["properties"]["new_attributes"])
-
- # new_attributes.modified_at currently lacks
- # subsecond precision (see #6347) so use event_at
- # which should always be the same.
- record_version = (
- (ev["event_at"], new_attr["portable_data_hash"])
- if new_attr else None)
-
- item.update(to_record_version=record_version)
+ if stamp and pdh and ev.get("object_kind") == "arvados#collection":
+ item.update(to_record_version=(stamp, pdh))
else:
item.update()
- oldowner = (
- ev.get("properties") and
- ev["properties"].get("old_attributes") and
- ev["properties"]["old_attributes"].get("owner_uuid"))
- newowner = ev["object_owner_uuid"]
+ oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
+ newowner = ev.get("object_owner_uuid")
for parent in (
self.inodes.inode_cache.find_by_uuid(oldowner) +
self.inodes.inode_cache.find_by_uuid(newowner)):
parent.invalidate()
parent.update()
-
@catch_exceptions
def getattr(self, inode):
if inode not in self.inodes:
return self
def __exit__(self, exc_type, exc_value, traceback):
+ if self.operations.events:
+ self.operations.events.close(timeout=self.args.unmount_timeout)
subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
self.llfuse_thread.join(timeout=self.args.unmount_timeout)
if self.llfuse_thread.is_alive():
def tearDown(self):
if self.llfuse_thread:
+ if self.operations.events:
+ self.operations.events.close(timeout=10)
subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
t0 = time.time()
self.llfuse_thread.join(timeout=10)
q.put(mockedCurl)
q.put(pycurl.Curl())
q.put(pycurl.Curl())
- with mock.patch('arvados.keep.KeepClient.KeepService._get_user_agent', side_effect=lambda: q.get(block=None)):
+ with mock.patch('arvados.keep.KeepClient.KeepService._get_user_agent', side_effect=q.get_nowait):
self.pool_test(os.path.join(self.mnt, 'zzz'))
self.assertTrue(mockedCurl.perform.called)
@staticmethod
import (
"context"
- "fmt"
"net/http"
"strings"
"time"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
log "github.com/Sirupsen/logrus"
)
}
lgr.WithFields(log.Fields{
- "timeTotal": loggedDuration(tDone.Sub(tStart)),
- "timeToStatus": loggedDuration(resp.sentHdr.Sub(tStart)),
- "timeWriteBody": loggedDuration(tDone.Sub(resp.sentHdr)),
+ "timeTotal": stats.Duration(tDone.Sub(tStart)),
+ "timeToStatus": stats.Duration(resp.sentHdr.Sub(tStart)),
+ "timeWriteBody": stats.Duration(tDone.Sub(resp.sentHdr)),
"respStatusCode": resp.Status,
"respStatus": statusText,
"respBytes": resp.Length,
}).Info("response")
}
-
-type loggedDuration time.Duration
-
-// MarshalJSON formats a duration as a number of seconds, using
-// fixed-point notation with no more than 6 decimal places.
-func (d loggedDuration) MarshalJSON() ([]byte, error) {
- return []byte(d.String()), nil
-}
-
-// String formats a duration as a number of seconds, using
-// fixed-point notation with no more than 6 decimal places.
-func (d loggedDuration) String() string {
- return fmt.Sprintf("%.6f", time.Duration(d).Seconds())
-}
--- /dev/null
+[Unit]
+Description=Arvados websocket server
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/ws/ws.yml
+
+[Service]
+Type=notify
+ExecStart=/usr/bin/arvados-ws
+Restart=always
+
+[Install]
+WantedBy=multi-user.target
--- /dev/null
+package main
+
+import (
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type wsConfig struct {
+ Client arvados.Client
+ Postgres pgConfig
+ Listen string
+ LogLevel string
+ LogFormat string
+
+ PingTimeout arvados.Duration
+ ClientEventQueue int
+ ServerEventQueue int
+}
+
+func defaultConfig() wsConfig {
+ return wsConfig{
+ Client: arvados.Client{
+ APIHost: "localhost:443",
+ },
+ Postgres: pgConfig{
+ "dbname": "arvados_production",
+ "user": "arvados",
+ "password": "xyzzy",
+ "host": "localhost",
+ "connect_timeout": "30",
+ "sslmode": "require",
+ },
+ LogLevel: "info",
+ LogFormat: "json",
+ PingTimeout: arvados.Duration(time.Minute),
+ ClientEventQueue: 64,
+ ServerEventQueue: 4,
+ }
+}
--- /dev/null
+// Arvados-ws exposes Arvados APIs (currently just one, the
+// cache-invalidation event feed at "ws://.../websocket") to
+// websocket clients.
+//
+// Installation
+//
+// See https://doc.arvados.org/install/install-ws.html.
+//
+// Developer info
+//
+// See https://dev.arvados.org/projects/arvados/wiki/Hacking_websocket_server.
+//
+// Usage
+//
+// arvados-ws [-config /etc/arvados/ws/ws.yml] [-dump-config]
+//
+// Minimal configuration
+//
+// Client:
+// APIHost: localhost:443
+// Listen: ":1234"
+// Postgres:
+// dbname: arvados_production
+// host: localhost
+// password: xyzzy
+// user: arvados
+//
+// Options
+//
+// -config path
+//
+// Load configuration from the given file instead of the default
+// /etc/arvados/ws/ws.yml
+//
+// -dump-config
+//
+// Print the loaded configuration to stdout and exit.
+//
+// Logs
+//
+// Logs are printed to stderr, formatted as JSON.
+//
+// A log is printed each time a client connects or disconnects.
+//
+// Enable additional logs by configuring:
+//
+// LogLevel: debug
+//
+// Runtime status
+//
+// GET /debug.json responds with debug stats.
+//
+// GET /status.json responds with health check results and
+// activity/usage metrics.
+package main
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/ghodss/yaml"
+)
+
+type eventSink interface {
+ Channel() <-chan *event
+ Stop()
+}
+
+type eventSource interface {
+ NewSink() eventSink
+ DB() *sql.DB
+}
+
+type event struct {
+ LogID uint64
+ Received time.Time
+ Ready time.Time
+ Serial uint64
+
+ db *sql.DB
+ logRow *arvados.Log
+ err error
+ mtx sync.Mutex
+}
+
+// Detail returns the database row corresponding to the event. It can
+// be called safely from multiple goroutines. Only one attempt will be
+// made. If the database row cannot be retrieved, Detail returns nil.
+func (e *event) Detail() *arvados.Log {
+ e.mtx.Lock()
+ defer e.mtx.Unlock()
+ if e.logRow != nil || e.err != nil {
+ return e.logRow
+ }
+ var logRow arvados.Log
+ var propYAML []byte
+ e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), event_at, created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
+ &logRow.ID,
+ &logRow.UUID,
+ &logRow.ObjectUUID,
+ &logRow.ObjectOwnerUUID,
+ &logRow.EventType,
+ &logRow.EventAt,
+ &logRow.CreatedAt,
+ &propYAML)
+ if e.err != nil {
+ logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
+ return nil
+ }
+ e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
+ if e.err != nil {
+ logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
+ return nil
+ }
+ e.logRow = &logRow
+ return e.logRow
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/stats"
+ "github.com/lib/pq"
+)
+
+type pgConfig map[string]string
+
+func (c pgConfig) ConnectionString() string {
+ s := ""
+ for k, v := range c {
+ s += k
+ s += "='"
+ s += strings.Replace(
+ strings.Replace(v, `\`, `\\`, -1),
+ `'`, `\'`, -1)
+ s += "' "
+ }
+ return s
+}
+
+type pgEventSource struct {
+ DataSource string
+ QueueSize int
+
+ db *sql.DB
+ pqListener *pq.Listener
+ queue chan *event
+ sinks map[*pgEventSink]bool
+ setupOnce sync.Once
+ mtx sync.Mutex
+ shutdown chan error
+
+ lastQDelay time.Duration
+ eventsIn uint64
+ eventsOut uint64
+}
+
+var _ debugStatuser = (*pgEventSource)(nil)
+
+func (ps *pgEventSource) setup() {
+ ps.shutdown = make(chan error, 1)
+ ps.sinks = make(map[*pgEventSink]bool)
+
+ db, err := sql.Open("postgres", ps.DataSource)
+ if err != nil {
+ logger(nil).WithError(err).Fatal("sql.Open failed")
+ }
+ if err = db.Ping(); err != nil {
+ logger(nil).WithError(err).Fatal("db.Ping failed")
+ }
+ ps.db = db
+
+ ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+ if err != nil {
+ // Until we have a mechanism for catching up
+ // on missed events, we cannot recover from a
+ // dropped connection without breaking our
+ // promises to clients.
+ logger(nil).WithError(err).Error("listener problem")
+ ps.shutdown <- err
+ }
+ })
+ err = ps.pqListener.Listen("logs")
+ if err != nil {
+ logger(nil).WithError(err).Fatal("pq Listen failed")
+ }
+ logger(nil).Debug("pgEventSource listening")
+
+ go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+ ps.queue = make(chan *event, ps.QueueSize)
+
+ go func() {
+ for e := range ps.queue {
+ // Wait for the "select ... from logs" call to
+ // finish. This limits max concurrent queries
+ // to ps.QueueSize. Without this, max
+ // concurrent queries would be bounded by
+ // client_count X client_queue_size.
+ e.Detail()
+
+ logger(nil).
+ WithField("serial", e.Serial).
+ WithField("detail", e.Detail()).
+ Debug("event ready")
+ e.Ready = time.Now()
+ ps.lastQDelay = e.Ready.Sub(e.Received)
+
+ ps.mtx.Lock()
+ atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
+ for sink := range ps.sinks {
+ sink.channel <- e
+ }
+ ps.mtx.Unlock()
+ }
+ }()
+
+ var serial uint64
+ ticker := time.NewTicker(time.Minute)
+ defer ticker.Stop()
+ for {
+ select {
+ case err, ok := <-ps.shutdown:
+ if ok {
+ logger(nil).WithError(err).Info("shutdown")
+ }
+ close(ps.queue)
+ return
+
+ case <-ticker.C:
+ logger(nil).Debug("listener ping")
+ ps.pqListener.Ping()
+
+ case pqEvent, ok := <-ps.pqListener.Notify:
+ if !ok {
+ close(ps.queue)
+ return
+ }
+ if pqEvent.Channel != "logs" {
+ continue
+ }
+ logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+ if err != nil {
+ logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
+ continue
+ }
+ serial++
+ e := &event{
+ LogID: logID,
+ Received: time.Now(),
+ Serial: serial,
+ db: ps.db,
+ }
+ logger(nil).WithField("event", e).Debug("incoming")
+ atomic.AddUint64(&ps.eventsIn, 1)
+ ps.queue <- e
+ go e.Detail()
+ }
+ }
+}
+
+// NewSink subscribes to the event source. NewSink returns an
+// eventSink, whose Channel() method returns a channel: a pointer to
+// each subsequent event will be sent to that channel.
+//
+// The caller must ensure events are received from the sink channel as
+// quickly as possible because when one sink stops being ready, all
+// other sinks block.
+func (ps *pgEventSource) NewSink() eventSink {
+ ps.setupOnce.Do(ps.setup)
+ sink := &pgEventSink{
+ channel: make(chan *event, 1),
+ source: ps,
+ }
+ ps.mtx.Lock()
+ ps.sinks[sink] = true
+ ps.mtx.Unlock()
+ return sink
+}
+
+func (ps *pgEventSource) DB() *sql.DB {
+ ps.setupOnce.Do(ps.setup)
+ return ps.db
+}
+
+func (ps *pgEventSource) DebugStatus() interface{} {
+ ps.mtx.Lock()
+ defer ps.mtx.Unlock()
+ blocked := 0
+ for sink := range ps.sinks {
+ blocked += len(sink.channel)
+ }
+ return map[string]interface{}{
+ "EventsIn": atomic.LoadUint64(&ps.eventsIn),
+ "EventsOut": atomic.LoadUint64(&ps.eventsOut),
+ "Queue": len(ps.queue),
+ "QueueLimit": cap(ps.queue),
+ "QueueDelay": stats.Duration(ps.lastQDelay),
+ "Sinks": len(ps.sinks),
+ "SinksBlocked": blocked,
+ }
+}
+
+type pgEventSink struct {
+ channel chan *event
+ source *pgEventSource
+}
+
+func (sink *pgEventSink) Channel() <-chan *event {
+ return sink.channel
+}
+
+func (sink *pgEventSink) Stop() {
+ go func() {
+ // Ensure this sink cannot fill up and block the
+ // server-side queue (which otherwise could in turn
+ // block our mtx.Lock() here)
+ for _ = range sink.channel {
+ }
+ }()
+ sink.source.mtx.Lock()
+ delete(sink.source.sinks, sink)
+ sink.source.mtx.Unlock()
+ close(sink.channel)
+}
--- /dev/null
+package main
+
+import (
+ "context"
+ "io"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
+)
+
+type handler struct {
+ Client arvados.Client
+ PingTimeout time.Duration
+ QueueSize int
+
+ mtx sync.Mutex
+ lastDelay map[chan interface{}]stats.Duration
+ setupOnce sync.Once
+}
+
+type handlerStats struct {
+ QueueDelayNs time.Duration
+ WriteDelayNs time.Duration
+ EventBytes uint64
+ EventCount uint64
+}
+
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
+ h.setupOnce.Do(h.setup)
+
+ ctx, cancel := context.WithCancel(ws.Request().Context())
+ defer cancel()
+ log := logger(ctx)
+
+ incoming := eventSource.NewSink()
+ defer incoming.Stop()
+
+ queue := make(chan interface{}, h.QueueSize)
+ h.mtx.Lock()
+ h.lastDelay[queue] = 0
+ h.mtx.Unlock()
+ defer func() {
+ h.mtx.Lock()
+ delete(h.lastDelay, queue)
+ h.mtx.Unlock()
+ }()
+
+ sess, err := newSession(ws, queue)
+ if err != nil {
+ log.WithError(err).Error("newSession failed")
+ return
+ }
+
+ // Receive websocket frames from the client and pass them to
+ // sess.Receive().
+ go func() {
+ buf := make([]byte, 2<<20)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
+ n, err := ws.Read(buf)
+ buf := buf[:n]
+ log.WithField("frame", string(buf[:n])).Debug("received frame")
+ if err == nil && n == cap(buf) {
+ err = errFrameTooBig
+ }
+ if err != nil {
+ if err != io.EOF {
+ log.WithError(err).Info("read error")
+ }
+ cancel()
+ return
+ }
+ err = sess.Receive(buf)
+ if err != nil {
+ log.WithError(err).Error("sess.Receive() failed")
+ cancel()
+ return
+ }
+ }
+ }()
+
+ // Take items from the outgoing queue, serialize them using
+ // sess.EventMessage() as needed, and send them to the client
+ // as websocket frames.
+ go func() {
+ for {
+ var ok bool
+ var data interface{}
+ select {
+ case <-ctx.Done():
+ return
+ case data, ok = <-queue:
+ if !ok {
+ return
+ }
+ }
+ var e *event
+ var buf []byte
+ var err error
+ log := log
+
+ switch data := data.(type) {
+ case []byte:
+ buf = data
+ case *event:
+ e = data
+ log = log.WithField("serial", e.Serial)
+ buf, err = sess.EventMessage(e)
+ if err != nil {
+ log.WithError(err).Error("EventMessage failed")
+ cancel()
+ break
+ } else if len(buf) == 0 {
+ log.Debug("skip")
+ continue
+ }
+ default:
+ log.WithField("data", data).Error("bad object in client queue")
+ continue
+ }
+
+ log.WithField("frame", string(buf)).Debug("send event")
+ ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+ t0 := time.Now()
+ _, err = ws.Write(buf)
+ if err != nil {
+ log.WithError(err).Error("write failed")
+ cancel()
+ break
+ }
+ log.Debug("sent")
+
+ if e != nil {
+ hStats.QueueDelayNs += t0.Sub(e.Ready)
+ h.mtx.Lock()
+ h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
+ h.mtx.Unlock()
+ }
+ hStats.WriteDelayNs += time.Since(t0)
+ hStats.EventBytes += uint64(len(buf))
+ hStats.EventCount++
+ }
+ }()
+
+ // Filter incoming events against the current subscription
+ // list, and forward matching events to the outgoing message
+ // queue. Close the queue and return when the request context
+ // is done/cancelled or the incoming event stream ends. Shut
+ // down the handler if the outgoing queue fills up.
+ go func() {
+ ticker := time.NewTicker(h.PingTimeout)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ // If the outgoing queue is empty,
+ // send an empty message. This can
+ // help detect a disconnected network
+ // socket, and prevent an idle socket
+ // from being closed.
+ if len(queue) == 0 {
+ select {
+ case queue <- []byte(`{}`):
+ default:
+ }
+ }
+ continue
+ case e, ok := <-incoming.Channel():
+ if !ok {
+ cancel()
+ return
+ }
+ if !sess.Filter(e) {
+ continue
+ }
+ select {
+ case queue <- e:
+ default:
+ log.WithError(errQueueFull).Error("terminate")
+ cancel()
+ return
+ }
+ }
+ }
+ }()
+
+ <-ctx.Done()
+ return
+}
+
+func (h *handler) DebugStatus() interface{} {
+ h.mtx.Lock()
+ defer h.mtx.Unlock()
+
+ var s struct {
+ QueueCount int
+ QueueMin int
+ QueueMax int
+ QueueTotal uint64
+ QueueDelayMin stats.Duration
+ QueueDelayMax stats.Duration
+ }
+ for q, lastDelay := range h.lastDelay {
+ s.QueueCount++
+ n := len(q)
+ s.QueueTotal += uint64(n)
+ if s.QueueMax < n {
+ s.QueueMax = n
+ }
+ if s.QueueMin > n || s.QueueCount == 1 {
+ s.QueueMin = n
+ }
+ if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
+ s.QueueDelayMin = lastDelay
+ }
+ if s.QueueDelayMax < lastDelay {
+ s.QueueDelayMax = lastDelay
+ }
+ }
+ return &s
+}
+
+func (h *handler) setup() {
+ h.lastDelay = make(map[chan interface{}]stats.Duration)
+}
--- /dev/null
+package main
+
+import (
+ "flag"
+ "fmt"
+ "net/http"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "github.com/coreos/go-systemd/daemon"
+)
+
+var logger = ctxlog.FromContext
+
+func main() {
+ log := logger(nil)
+
+ configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+ dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
+ cfg := defaultConfig()
+ flag.Parse()
+
+ err := config.LoadFile(&cfg, *configPath)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ ctxlog.SetLevel(cfg.LogLevel)
+ ctxlog.SetFormat(cfg.LogFormat)
+
+ if *dumpConfig {
+ txt, err := config.Dump(&cfg)
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Print(string(txt))
+ return
+ }
+
+ log.Info("started")
+ eventSource := &pgEventSource{
+ DataSource: cfg.Postgres.ConnectionString(),
+ QueueSize: cfg.ServerEventQueue,
+ }
+ srv := &http.Server{
+ Addr: cfg.Listen,
+ ReadTimeout: time.Minute,
+ WriteTimeout: time.Minute,
+ MaxHeaderBytes: 1 << 20,
+ Handler: &router{
+ Config: &cfg,
+ eventSource: eventSource,
+ newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
+ },
+ }
+ // Bootstrap the eventSource by attaching a dummy subscriber
+ // and hanging up.
+ eventSource.NewSink().Stop()
+
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+ log.WithError(err).Warn("error notifying init daemon")
+ }
+
+ log.WithField("Listen", srv.Addr).Info("listening")
+ log.Fatal(srv.ListenAndServe())
+}
--- /dev/null
+package main
+
+import (
+ "net/http"
+ "net/url"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+const (
+ maxPermCacheAge = time.Hour
+ minPermCacheAge = 5 * time.Minute
+)
+
+type permChecker interface {
+ SetToken(token string)
+ Check(uuid string) (bool, error)
+}
+
+func newPermChecker(ac arvados.Client) permChecker {
+ ac.AuthToken = ""
+ return &cachingPermChecker{
+ Client: &ac,
+ cache: make(map[string]cacheEnt),
+ maxCurrent: 16,
+ }
+}
+
+type cacheEnt struct {
+ time.Time
+ allowed bool
+}
+
+type cachingPermChecker struct {
+ *arvados.Client
+ cache map[string]cacheEnt
+ maxCurrent int
+}
+
+func (pc *cachingPermChecker) SetToken(token string) {
+ pc.Client.AuthToken = token
+}
+
+func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+ logger := logger(nil).
+ WithField("token", pc.Client.AuthToken).
+ WithField("uuid", uuid)
+ pc.tidy()
+ now := time.Now()
+ if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
+ logger.WithField("allowed", perm.allowed).Debug("cache hit")
+ return perm.allowed, nil
+ }
+ var buf map[string]interface{}
+ path, err := pc.PathForUUID("get", uuid)
+ if err != nil {
+ return false, err
+ }
+ err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
+ "select": {`["uuid"]`},
+ })
+
+ var allowed bool
+ if err == nil {
+ allowed = true
+ } else if txErr, ok := err.(*arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+ allowed = false
+ } else if txErr.StatusCode == http.StatusForbidden {
+ // Some requests are expressly forbidden for reasons
+ // other than "you aren't allowed to know whether this
+ // UUID exists" (404).
+ allowed = false
+ } else {
+ logger.WithError(err).Error("lookup error")
+ return false, err
+ }
+ logger.WithField("allowed", allowed).Debug("cache miss")
+ pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
+ return allowed, nil
+}
+
+func (pc *cachingPermChecker) tidy() {
+ if len(pc.cache) <= pc.maxCurrent*2 {
+ return
+ }
+ tooOld := time.Now().Add(-minPermCacheAge)
+ for uuid, t := range pc.cache {
+ if t.Before(tooOld) {
+ delete(pc.cache, uuid)
+ }
+ }
+ pc.maxCurrent = len(pc.cache)
+}
--- /dev/null
+package main
+
+import (
+ "encoding/json"
+ "io"
+ "net/http"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "github.com/Sirupsen/logrus"
+ "golang.org/x/net/websocket"
+)
+
+type wsConn interface {
+ io.ReadWriter
+ Request() *http.Request
+ SetReadDeadline(time.Time) error
+ SetWriteDeadline(time.Time) error
+}
+
+type router struct {
+ Config *wsConfig
+ eventSource eventSource
+ newPermChecker func() permChecker
+
+ handler *handler
+ mux *http.ServeMux
+ setupOnce sync.Once
+
+ lastReqID int64
+ lastReqMtx sync.Mutex
+
+ status routerDebugStatus
+}
+
+type routerDebugStatus struct {
+ ReqsReceived int64
+ ReqsActive int64
+}
+
+type debugStatuser interface {
+ DebugStatus() interface{}
+}
+
+func (rtr *router) setup() {
+ rtr.handler = &handler{
+ PingTimeout: rtr.Config.PingTimeout.Duration(),
+ QueueSize: rtr.Config.ClientEventQueue,
+ }
+ rtr.mux = http.NewServeMux()
+ rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
+ rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
+ rtr.mux.HandleFunc("/debug.json", jsonHandler(rtr.DebugStatus))
+ rtr.mux.HandleFunc("/status.json", jsonHandler(rtr.Status))
+}
+
+func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
+ return &websocket.Server{
+ Handshake: func(c *websocket.Config, r *http.Request) error {
+ return nil
+ },
+ Handler: websocket.Handler(func(ws *websocket.Conn) {
+ t0 := time.Now()
+ log := logger(ws.Request().Context())
+ log.Info("connected")
+
+ stats := rtr.handler.Handle(ws, rtr.eventSource,
+ func(ws wsConn, sendq chan<- interface{}) (session, error) {
+ return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.Config.Client)
+ })
+
+ log.WithFields(logrus.Fields{
+ "elapsed": time.Now().Sub(t0).Seconds(),
+ "stats": stats,
+ }).Info("disconnect")
+ ws.Close()
+ }),
+ }
+}
+
+func (rtr *router) newReqID() string {
+ rtr.lastReqMtx.Lock()
+ defer rtr.lastReqMtx.Unlock()
+ id := time.Now().UnixNano()
+ if id <= rtr.lastReqID {
+ id = rtr.lastReqID + 1
+ }
+ return strconv.FormatInt(id, 36)
+}
+
+func (rtr *router) DebugStatus() interface{} {
+ s := map[string]interface{}{
+ "HTTP": rtr.status,
+ "Outgoing": rtr.handler.DebugStatus(),
+ }
+ if es, ok := rtr.eventSource.(debugStatuser); ok {
+ s["EventSource"] = es.DebugStatus()
+ }
+ return s
+}
+
+func (rtr *router) Status() interface{} {
+ return map[string]interface{}{
+ "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
+ }
+}
+
+func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ rtr.setupOnce.Do(rtr.setup)
+ atomic.AddInt64(&rtr.status.ReqsReceived, 1)
+ atomic.AddInt64(&rtr.status.ReqsActive, 1)
+ defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
+
+ logger := logger(req.Context()).
+ WithField("RequestID", rtr.newReqID())
+ ctx := ctxlog.Context(req.Context(), logger)
+ req = req.WithContext(ctx)
+ logger.WithFields(logrus.Fields{
+ "remoteAddr": req.RemoteAddr,
+ "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+ }).Info("accept request")
+ rtr.mux.ServeHTTP(resp, req)
+}
+
+func jsonHandler(fn func() interface{}) http.HandlerFunc {
+ return func(resp http.ResponseWriter, req *http.Request) {
+ logger := logger(req.Context())
+ resp.Header().Set("Content-Type", "application/json")
+ enc := json.NewEncoder(resp)
+ err := enc.Encode(fn())
+ if err != nil {
+ msg := "encode failed"
+ logger.WithError(err).Error(msg)
+ http.Error(resp, msg, http.StatusInternalServerError)
+ }
+ }
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type session interface {
+ // Receive processes a message received from the client. If a
+ // non-nil error is returned, the connection will be
+ // terminated.
+ Receive([]byte) error
+
+ // Filter returns true if the event should be queued for
+ // sending to the client. It should return as fast as
+ // possible, and must not block.
+ Filter(*event) bool
+
+ // EventMessage encodes the given event (from the front of the
+ // queue) into a form suitable to send to the client. If a
+ // non-nil error is returned, the connection is terminated. If
+ // the returned buffer is empty, nothing is sent to the client
+ // and the event is not counted in statistics.
+ //
+ // Unlike Filter, EventMessage can block without affecting
+ // other connections. If EventMessage is slow, additional
+ // incoming events will be queued. If the event queue fills
+ // up, the connection will be dropped.
+ EventMessage(*event) ([]byte, error)
+}
+
+type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker, *arvados.Client) (session, error)
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "encoding/json"
+ "errors"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
+)
+
+var (
+ errQueueFull = errors.New("client queue full")
+ errFrameTooBig = errors.New("frame too big")
+
+ sendObjectAttributes = []string{"state", "name", "owner_uuid", "portable_data_hash"}
+
+ v0subscribeOK = []byte(`{"status":200}`)
+ v0subscribeFail = []byte(`{"status":400}`)
+)
+
+type v0session struct {
+ ac *arvados.Client
+ ws wsConn
+ sendq chan<- interface{}
+ db *sql.DB
+ permChecker permChecker
+ subscriptions []v0subscribe
+ lastMsgID uint64
+ log *logrus.Entry
+ mtx sync.Mutex
+ setupOnce sync.Once
+}
+
+// newSessionV0 returns a v0 session: a partial port of the Rails/puma
+// implementation, with just enough functionality to support Workbench
+// and arv-mount.
+func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
+ sess := &v0session{
+ sendq: sendq,
+ ws: ws,
+ db: db,
+ ac: ac,
+ permChecker: pc,
+ log: logger(ws.Request().Context()),
+ }
+
+ err := ws.Request().ParseForm()
+ if err != nil {
+ sess.log.WithError(err).Error("ParseForm failed")
+ return nil, err
+ }
+ token := ws.Request().Form.Get("api_token")
+ sess.permChecker.SetToken(token)
+ sess.log.WithField("token", token).Debug("set token")
+
+ return sess, nil
+}
+
+func (sess *v0session) Receive(buf []byte) error {
+ var sub v0subscribe
+ if err := json.Unmarshal(buf, &sub); err != nil {
+ sess.log.WithError(err).Info("invalid message from client")
+ } else if sub.Method == "subscribe" {
+ sub.prepare(sess)
+ sess.log.WithField("sub", sub).Debug("sub prepared")
+ sess.sendq <- v0subscribeOK
+ sess.mtx.Lock()
+ sess.subscriptions = append(sess.subscriptions, sub)
+ sess.mtx.Unlock()
+ sub.sendOldEvents(sess)
+ return nil
+ } else {
+ sess.log.WithField("Method", sub.Method).Info("unknown method")
+ }
+ sess.sendq <- v0subscribeFail
+ return nil
+}
+
+func (sess *v0session) EventMessage(e *event) ([]byte, error) {
+ detail := e.Detail()
+ if detail == nil {
+ return nil, nil
+ }
+
+ ok, err := sess.permChecker.Check(detail.ObjectUUID)
+ if err != nil || !ok {
+ return nil, err
+ }
+
+ kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
+ msg := map[string]interface{}{
+ "msgID": atomic.AddUint64(&sess.lastMsgID, 1),
+ "id": detail.ID,
+ "uuid": detail.UUID,
+ "object_uuid": detail.ObjectUUID,
+ "object_owner_uuid": detail.ObjectOwnerUUID,
+ "object_kind": kind,
+ "event_type": detail.EventType,
+ "event_at": detail.EventAt,
+ }
+ if detail.Properties != nil && detail.Properties["text"] != nil {
+ msg["properties"] = detail.Properties
+ } else {
+ msgProps := map[string]map[string]interface{}{}
+ for _, ak := range []string{"old_attributes", "new_attributes"} {
+ eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
+ if !ok {
+ continue
+ }
+ msgAttrs := map[string]interface{}{}
+ for _, k := range sendObjectAttributes {
+ if v, ok := eventAttrs[k]; ok {
+ msgAttrs[k] = v
+ }
+ }
+ msgProps[ak] = msgAttrs
+ }
+ msg["properties"] = msgProps
+ }
+ return json.Marshal(msg)
+}
+
+func (sess *v0session) Filter(e *event) bool {
+ sess.mtx.Lock()
+ defer sess.mtx.Unlock()
+ for _, sub := range sess.subscriptions {
+ if sub.match(sess, e) {
+ return true
+ }
+ }
+ return false
+}
+
+func (sub *v0subscribe) sendOldEvents(sess *v0session) {
+ if sub.LastLogID == 0 {
+ return
+ }
+ sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
+ // Here we do a "select id" query and queue an event for every
+ // log since the given ID, then use (*event)Detail() to
+ // retrieve the whole row and decide whether to send it. This
+ // approach is very inefficient if the subscriber asks for
+ // last_log_id==1, even if the filters end up matching very
+ // few events.
+ //
+ // To mitigate this, filter on "created > 10 minutes ago" when
+ // retrieving the list of old event IDs to consider.
+ rows, err := sess.db.Query(
+ `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
+ sub.LastLogID,
+ time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
+ if err != nil {
+ sess.log.WithError(err).Error("db.Query failed")
+ return
+ }
+ for rows.Next() {
+ var id uint64
+ err := rows.Scan(&id)
+ if err != nil {
+ sess.log.WithError(err).Error("row Scan failed")
+ continue
+ }
+ for len(sess.sendq)*2 > cap(sess.sendq) {
+ // Ugly... but if we fill up the whole client
+ // queue with a backlog of old events, a
+ // single new event will overflow it and
+ // terminate the connection, and then the
+ // client will probably reconnect and do the
+ // same thing all over again.
+ time.Sleep(100 * time.Millisecond)
+ }
+ now := time.Now()
+ e := &event{
+ LogID: id,
+ Received: now,
+ Ready: now,
+ db: sess.db,
+ }
+ if sub.match(sess, e) {
+ select {
+ case sess.sendq <- e:
+ case <-sess.ws.Request().Context().Done():
+ return
+ }
+ }
+ }
+ if err := rows.Err(); err != nil {
+ sess.log.WithError(err).Error("db.Query failed")
+ }
+}
+
+type v0subscribe struct {
+ Method string
+ Filters []v0filter
+ LastLogID int64 `json:"last_log_id"`
+
+ funcs []func(*event) bool
+}
+
+type v0filter [3]interface{}
+
+func (sub *v0subscribe) match(sess *v0session, e *event) bool {
+ log := sess.log.WithField("LogID", e.LogID)
+ detail := e.Detail()
+ if detail == nil {
+ log.Error("match failed, no detail")
+ return false
+ }
+ log = log.WithField("funcs", len(sub.funcs))
+ for i, f := range sub.funcs {
+ if !f(e) {
+ log.WithField("func", i).Debug("match failed")
+ return false
+ }
+ }
+ log.Debug("match passed")
+ return true
+}
+
+func (sub *v0subscribe) prepare(sess *v0session) {
+ for _, f := range sub.Filters {
+ if len(f) != 3 {
+ continue
+ }
+ if col, ok := f[0].(string); ok && col == "event_type" {
+ op, ok := f[1].(string)
+ if !ok || op != "in" {
+ continue
+ }
+ arr, ok := f[2].([]interface{})
+ if !ok {
+ continue
+ }
+ var strs []string
+ for _, s := range arr {
+ if s, ok := s.(string); ok {
+ strs = append(strs, s)
+ }
+ }
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ for _, s := range strs {
+ if s == e.Detail().EventType {
+ return true
+ }
+ }
+ return false
+ })
+ } else if ok && col == "created_at" {
+ op, ok := f[1].(string)
+ if !ok {
+ continue
+ }
+ tstr, ok := f[2].(string)
+ if !ok {
+ continue
+ }
+ t, err := time.Parse(time.RFC3339Nano, tstr)
+ if err != nil {
+ sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
+ continue
+ }
+ var fn func(*event) bool
+ switch op {
+ case ">=":
+ fn = func(e *event) bool {
+ return !e.Detail().CreatedAt.Before(t)
+ }
+ case "<=":
+ fn = func(e *event) bool {
+ return !e.Detail().CreatedAt.After(t)
+ }
+ case ">":
+ fn = func(e *event) bool {
+ return e.Detail().CreatedAt.After(t)
+ }
+ case "<":
+ fn = func(e *event) bool {
+ return e.Detail().CreatedAt.Before(t)
+ }
+ case "=":
+ fn = func(e *event) bool {
+ return e.Detail().CreatedAt.Equal(t)
+ }
+ default:
+ sess.log.WithField("operator", op).Info("bogus operator")
+ continue
+ }
+ sub.funcs = append(sub.funcs, fn)
+ }
+ }
+}
--- /dev/null
+package main
+
+import (
+ "database/sql"
+ "errors"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// newSessionV1 returns a v1 session -- see
+// https://dev.arvados.org/projects/arvados/wiki/Websocket_server
+func newSessionV1(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
+ return nil, errors.New("Not implemented")
+}