From: Peter Amstutz Date: Fri, 16 Dec 2016 19:55:54 +0000 (-0500) Subject: Merge branch '10684-crunch-run-ca-certs' closes #10684 X-Git-Tag: 1.1.0~531 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/ffd4738242c61fa5acd423f927339f836dfb0ffb?hp=005cb811d937fe52cee11d76484252ed7167f9a8 Merge branch '10684-crunch-run-ca-certs' closes #10684 --- diff --git a/apps/workbench/app/controllers/container_requests_controller.rb b/apps/workbench/app/controllers/container_requests_controller.rb index b67d100887..b286a9456e 100644 --- a/apps/workbench/app/controllers/container_requests_controller.rb +++ b/apps/workbench/app/controllers/container_requests_controller.rb @@ -59,4 +59,32 @@ class ContainerRequestsController < ApplicationController end end + def copy + src = @object + + @object = ContainerRequest.new + + @object.command = src.command + @object.container_image = src.container_image + @object.cwd = src.cwd + @object.description = src.description + @object.environment = src.environment + @object.mounts = src.mounts + @object.name = src.name + @object.output_path = src.output_path + @object.priority = 1 + @object.properties[:template_uuid] = src.properties[:template_uuid] + @object.runtime_constraints = src.runtime_constraints + @object.scheduling_parameters = src.scheduling_parameters + @object.state = 'Uncommitted' + @object.use_existing = false + + # set owner_uuid to that of source, provided it is a project and writable by current user + current_project = Group.find(src.owner_uuid) rescue nil + if (current_project && current_project.writable_by.andand.include?(current_user.uuid)) + @object.owner_uuid = src.owner_uuid + end + + super + end end diff --git a/apps/workbench/app/controllers/pipeline_instances_controller.rb b/apps/workbench/app/controllers/pipeline_instances_controller.rb index c5fbda0cf3..83fe0dda46 100644 --- a/apps/workbench/app/controllers/pipeline_instances_controller.rb +++ b/apps/workbench/app/controllers/pipeline_instances_controller.rb @@ -53,7 +53,7 @@ class PipelineInstancesController < ApplicationController end @object.state = 'New' - # set owner_uuid to that of source, provided it is a project and wriable by current user + # set owner_uuid to that of source, provided it is a project and writable by current user current_project = Group.find(source.owner_uuid) rescue nil if (current_project && current_project.writable_by.andand.include?(current_user.uuid)) @object.owner_uuid = source.owner_uuid diff --git a/apps/workbench/app/views/application/_extra_tab_line_buttons.html.erb b/apps/workbench/app/views/application/_extra_tab_line_buttons.html.erb new file mode 100644 index 0000000000..e69de29bb2 diff --git a/apps/workbench/app/views/application/_title_and_buttons.html.erb b/apps/workbench/app/views/application/_title_and_buttons.html.erb index 398f248a39..46a949aa12 100644 --- a/apps/workbench/app/views/application/_title_and_buttons.html.erb +++ b/apps/workbench/app/views/application/_title_and_buttons.html.erb @@ -13,6 +13,9 @@ <% if @object.class.goes_in_projects? && @object.uuid != current_user.andand.uuid # Not the "Home" project %> <% content_for :tab_line_buttons do %> + <% if current_user.andand.is_active %> + <%= render partial: 'extra_tab_line_buttons' %> + <% end %> <% if current_user.andand.is_active && @object.class.copies_to_projects? %> <%= link_to( choose_projects_path( diff --git a/apps/workbench/app/views/container_requests/_extra_tab_line_buttons.html.erb b/apps/workbench/app/views/container_requests/_extra_tab_line_buttons.html.erb new file mode 100644 index 0000000000..662309ffe8 --- /dev/null +++ b/apps/workbench/app/views/container_requests/_extra_tab_line_buttons.html.erb @@ -0,0 +1,10 @@ +<% if @object.state == 'Final' %> + <%= link_to(copy_container_request_path('id' => @object.uuid), + class: 'btn btn-primary', + title: 'Re-run', + data: {toggle: :tooltip, placement: :top}, title: 'This will make a copy and take you there. You can then make any needed changes and run it', + method: :post, + ) do %> + Re-run + <% end %> +<% end %> diff --git a/apps/workbench/config/application.default.yml b/apps/workbench/config/application.default.yml index e4e2782986..c2dcba8094 100644 --- a/apps/workbench/config/application.default.yml +++ b/apps/workbench/config/application.default.yml @@ -26,6 +26,11 @@ diagnostics: pipeline_2: template_uuid: zzzzz-p5p6p-1xbobfobk94ppbv input_paths: [zzzzz-4zz18-nz98douzhaa3jh2, zzzzz-4zz18-gpw9o5wpcti3nib] + container_requests_to_test: + container_request_1: + workflow_uuid: zzzzz-7fd4e-60e96shgwspt4mw + input_paths: [] + max_wait_seconds: 10 # Below is a sample setting for performance testing. # Configure workbench URL as "arvados_workbench_url" diff --git a/apps/workbench/config/routes.rb b/apps/workbench/config/routes.rb index 7c2312c1ce..21cb7c40bc 100644 --- a/apps/workbench/config/routes.rb +++ b/apps/workbench/config/routes.rb @@ -26,6 +26,7 @@ ArvadosWorkbench::Application.routes.draw do resources :containers resources :container_requests do post 'cancel', :on => :member + post 'copy', on: :member end get '/virtual_machines/:id/webshell/:login' => 'virtual_machines#webshell', :as => :webshell_virtual_machine resources :authorized_keys diff --git a/apps/workbench/lib/tasks/config_dump.rake b/apps/workbench/lib/tasks/config_dump.rake new file mode 100644 index 0000000000..c7e021488a --- /dev/null +++ b/apps/workbench/lib/tasks/config_dump.rake @@ -0,0 +1,6 @@ +namespace :config do + desc 'Show site configuration' + task dump: :environment do + puts $application_config.to_yaml + end +end diff --git a/apps/workbench/test/controllers/container_requests_controller_test.rb b/apps/workbench/test/controllers/container_requests_controller_test.rb index 8dbbbd07c1..70e042cd3d 100644 --- a/apps/workbench/test/controllers/container_requests_controller_test.rb +++ b/apps/workbench/test/controllers/container_requests_controller_test.rb @@ -29,4 +29,31 @@ class ContainerRequestsControllerTest < ActionController::TestCase assert_includes @response.body, '
cd arvados/doc/user/cwl/bwa-mem +# > arvados-cwl-runner --create-workflow bwa-mem.cwl bwa-mem-input.yml + +class ContainerRequestTest < DiagnosticsTest + crs_to_test = Rails.configuration.container_requests_to_test.andand.keys + + setup do + need_selenium 'to make websockets work' + end + + crs_to_test.andand.each do |cr_to_test| + test "run container_request: #{cr_to_test}" do + cr_config = Rails.configuration.container_requests_to_test[cr_to_test] + + visit_page_with_token 'active' + + find('.btn', text: 'Run a process').click + + within('.modal-dialog') do + page.find_field('Search').set cr_config['workflow_uuid'] + wait_for_ajax + find('.selectable', text: 'bwa-mem.cwl').click + find('.btn', text: 'Next: choose inputs').click + end + + page.assert_selector('a.disabled,button.disabled', text: 'Run') if cr_config['input_paths'].any? + + # Choose input for the workflow + cr_config['input_paths'].each do |look_for| + select_input look_for + end + wait_for_ajax + + # All needed input are already filled in. Run this workflow now + page.assert_no_selector('a.disabled,button.disabled', text: 'Run') + find('a,button', text: 'Run').click + + # container_request is running. Run button is no longer available. + page.assert_no_selector('a', text: 'Run') + + # Wait for container_request run to complete + wait_until_page_has 'completed', cr_config['max_wait_seconds'] + end + end +end diff --git a/apps/workbench/test/diagnostics/pipeline_test.rb b/apps/workbench/test/diagnostics/pipeline_test.rb index d038222cf0..11d0e42629 100644 --- a/apps/workbench/test/diagnostics/pipeline_test.rb +++ b/apps/workbench/test/diagnostics/pipeline_test.rb @@ -42,54 +42,11 @@ class PipelineTest < DiagnosticsTest find('a,button', text: 'Components').click find('a,button', text: 'Run').click - # Pipeline is running. We have a "Stop" button instead now. + # Pipeline is running. We have a "Pause" button instead now. page.assert_selector 'a,button', text: 'Pause' # Wait for pipeline run to complete wait_until_page_has 'completed', pipeline_config['max_wait_seconds'] end end - - def select_input look_for - inputs_needed = page.all('.btn', text: 'Choose') - return if (!inputs_needed || !inputs_needed.any?) - - look_for_uuid = nil - look_for_file = nil - if look_for.andand.index('/').andand.>0 - partitions = look_for.partition('/') - look_for_uuid = partitions[0] - look_for_file = partitions[2] - else - look_for_uuid = look_for - look_for_file = nil - end - - assert_triggers_dom_event 'shown.bs.modal' do - inputs_needed[0].click - end - - within('.modal-dialog') do - if look_for_uuid - fill_in('Search', with: look_for_uuid, exact: true) - wait_for_ajax - end - - page.all('.selectable').first.click - wait_for_ajax - # ajax reload is wiping out input selection after search results; so, select again. - page.all('.selectable').first.click - wait_for_ajax - - if look_for_file - wait_for_ajax - within('.collection_files_name', text: look_for_file) do - find('.fa-file').click - end - end - - find('button', text: 'OK').click - wait_for_ajax - end - end end diff --git a/apps/workbench/test/diagnostics_test_helper.rb b/apps/workbench/test/diagnostics_test_helper.rb index 3587721eda..46b961ae11 100644 --- a/apps/workbench/test/diagnostics_test_helper.rb +++ b/apps/workbench/test/diagnostics_test_helper.rb @@ -21,6 +21,49 @@ class DiagnosticsTest < ActionDispatch::IntegrationTest visit page_with_token(tokens[token_name], (workbench_url + path)) end + def select_input look_for + inputs_needed = page.all('.btn', text: 'Choose') + return if (!inputs_needed || !inputs_needed.any?) + + look_for_uuid = nil + look_for_file = nil + if look_for.andand.index('/').andand.>0 + partitions = look_for.partition('/') + look_for_uuid = partitions[0] + look_for_file = partitions[2] + else + look_for_uuid = look_for + look_for_file = nil + end + + assert_triggers_dom_event 'shown.bs.modal' do + inputs_needed[0].click + end + + within('.modal-dialog') do + if look_for_uuid + fill_in('Search', with: look_for_uuid, exact: true) + wait_for_ajax + end + + page.all('.selectable').first.click + wait_for_ajax + # ajax reload is wiping out input selection after search results; so, select again. + page.all('.selectable').first.click + wait_for_ajax + + if look_for_file + wait_for_ajax + within('.collection_files_name', text: look_for_file) do + find('.fa-file').click + end + end + + find('button', text: 'OK').click + wait_for_ajax + end + end + # Looks for the text_to_look_for for up to the max_time provided def wait_until_page_has text_to_look_for, max_time=30 max_time = 30 if (!max_time || (max_time.to_s != max_time.to_i.to_s)) diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh index 6f9aca98b4..ddd0124c70 100755 --- a/build/run-build-packages.sh +++ b/build/run-build-packages.sh @@ -432,6 +432,8 @@ package_go_binary services/keepstore keepstore \ "Keep storage daemon, accessible to clients on the LAN" package_go_binary services/keep-web keep-web \ "Static web hosting service for user data stored in Arvados Keep" +package_go_binary services/ws arvados-ws \ + "Arvados Websocket server" package_go_binary tools/keep-block-check keep-block-check \ "Verify that all data from one set of Keep servers to another was copied" package_go_binary tools/keep-rsync keep-rsync \ diff --git a/build/run-tests.sh b/build/run-tests.sh index 560a6933e8..e0e1ce2852 100755 --- a/build/run-tests.sh +++ b/build/run-tests.sh @@ -79,6 +79,7 @@ services/nodemanager services/crunch-run services/crunch-dispatch-local services/crunch-dispatch-slurm +services/ws sdk/cli sdk/pam sdk/python @@ -90,6 +91,7 @@ sdk/go/httpserver sdk/go/manifest sdk/go/blockdigest sdk/go/streamer +sdk/go/stats sdk/go/crunchrunner sdk/cwl tools/crunchstat-summary @@ -270,15 +272,18 @@ start_api() { && eval $(python sdk/python/tests/run_test_server.py start --auth admin) \ && export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \ && export ARVADOS_TEST_API_INSTALLED="$$" \ + && python sdk/python/tests/run_test_server.py start_ws \ + && python sdk/python/tests/run_test_server.py start_nginx \ && (env | egrep ^ARVADOS) } start_nginx_proxy_services() { - echo 'Starting keepproxy, keep-web, arv-git-httpd, and nginx ssl proxy...' + echo 'Starting keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...' cd "$WORKSPACE" \ && python sdk/python/tests/run_test_server.py start_keep_proxy \ && python sdk/python/tests/run_test_server.py start_keep-web \ && python sdk/python/tests/run_test_server.py start_arv-git-httpd \ + && python sdk/python/tests/run_test_server.py start_ws \ && python sdk/python/tests/run_test_server.py start_nginx \ && export ARVADOS_TEST_PROXY_SERVICES=1 } @@ -289,12 +294,15 @@ stop_services() { cd "$WORKSPACE" \ && python sdk/python/tests/run_test_server.py stop_nginx \ && python sdk/python/tests/run_test_server.py stop_arv-git-httpd \ + && python sdk/python/tests/run_test_server.py stop_ws \ && python sdk/python/tests/run_test_server.py stop_keep-web \ && python sdk/python/tests/run_test_server.py stop_keep_proxy fi if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then unset ARVADOS_TEST_API_HOST cd "$WORKSPACE" \ + && python sdk/python/tests/run_test_server.py stop_nginx \ + && python sdk/python/tests/run_test_server.py stop_ws \ && python sdk/python/tests/run_test_server.py stop fi } @@ -756,6 +764,7 @@ gostuff=( sdk/go/manifest sdk/go/streamer sdk/go/crunchrunner + sdk/go/stats lib/crunchstat services/arv-git-httpd services/crunchstat @@ -767,6 +776,7 @@ gostuff=( services/crunch-dispatch-local services/crunch-dispatch-slurm services/crunch-run + services/ws tools/keep-block-check tools/keep-exercise tools/keep-rsync diff --git a/doc/install/install-ws.html.textile.liquid b/doc/install/install-ws.html.textile.liquid new file mode 100644 index 0000000000..a36a59a56f --- /dev/null +++ b/doc/install/install-ws.html.textile.liquid @@ -0,0 +1,204 @@ +--- +layout: default +navsection: installguide +title: Install the websocket server +... + +{% include 'notebox_begin_warning' %} + +This websocket server is an alternative to the puma server that comes with the API server. It is available as an *experimental pre-release* and is not recommended for production sites. + +{% include 'notebox_end' %} + +The arvados-ws server provides event notifications to websocket clients. It can be installed anywhere with access to Postgres database and the Arvados API server, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for additional information. + +By convention, we use the following hostname for the websocket service. + + +
ws.uuid_prefix.your.domain
+
+ +The above hostname should resolve from anywhere on the internet. + +h2. Install arvados-ws + +Typically arvados-ws runs on the same host as the API server. + +On Debian-based systems: + + +
~$ sudo apt-get install arvados-ws
+
+
+ +On Red Hat-based systems: + + +
~$ sudo yum install arvados-ws
+
+
+ +Verify that @arvados-ws@ is functional: + + +
~$ arvados-ws -h
+Usage of arvados-ws:
+  -config path
+        path to config file (default "/etc/arvados/ws/ws.yml")
+  -dump-config
+        show current configuration and exit
+
+
+ +h3. Create a configuration file + +Create @/etc/arvados/ws/ws.yml@ using the following template. Replace @xxxxxxxx@ with the "password you generated during database setup":install-postgresql.html#api. + + +
Client:
+  APIHost: uuid_prefix.your.domain:443
+Listen: ":9003"
+Postgres:
+  dbname: arvados_production
+  host: localhost
+  password: xxxxxxxx
+  user: arvados
+
+
+ +h3. Start the service (option 1: systemd) + +If your system does not use systemd, skip this section and follow the "runit instructions":#runit instead. + +If your system uses systemd, the arvados-ws service should already be set up. Start it and check its status: + + +
~$ sudo systemctl restart arvados-ws
+~$ sudo systemctl status arvados-ws
+● arvados-ws.service - Arvados websocket server
+   Loaded: loaded (/lib/systemd/system/arvados-ws.service; enabled)
+   Active: active (running) since Tue 2016-12-06 11:20:48 EST; 10s ago
+     Docs: https://doc.arvados.org/
+ Main PID: 9421 (arvados-ws)
+   CGroup: /system.slice/arvados-ws.service
+           └─9421 /usr/bin/arvados-ws
+
+Dec 06 11:20:48 zzzzz arvados-ws[9421]: {"level":"info","msg":"started","time":"2016-12-06T11:20:48.207617188-05:00"}
+Dec 06 11:20:48 zzzzz arvados-ws[9421]: {"Listen":":9003","level":"info","msg":"listening","time":"2016-12-06T11:20:48.244956506-05:00"}
+Dec 06 11:20:48 zzzzz systemd[1]: Started Arvados websocket server.
+
+
+ +If it is not running, use @journalctl@ to check logs for errors: + + +
~$ sudo journalctl -n10 -u arvados-ws
+...
+Dec 06 11:12:48 zzzzz systemd[1]: Starting Arvados websocket server...
+Dec 06 11:12:48 zzzzz arvados-ws[8918]: {"level":"info","msg":"started","time":"2016-12-06T11:12:48.030496636-05:00"}
+Dec 06 11:12:48 zzzzz arvados-ws[8918]: {"error":"pq: password authentication failed for user \"arvados\"","level":"fatal","msg":"db.Ping failed","time":"2016-12-06T11:12:48.058206400-05:00"}
+
+
+ +Skip ahead to "confirm the service is working":#confirm. + +h3(#runit). Start the service (option 2: runit) + +Install runit to supervise the arvados-ws daemon. {% include 'install_runit' %} + +Create a supervised service. + + +
~$ sudo mkdir /etc/service/arvados-ws
+~$ cd /etc/service/arvados-ws
+~$ sudo mkdir log log/main
+~$ printf '#!/bin/sh\nexec arvados-ws 2>&1\n' | sudo tee run
+~$ printf '#!/bin/sh\nexec svlogd main\n' | sudo tee log/run
+~$ sudo chmod +x run log/run
+~$ sudo sv exit .
+~$ cd -
+
+
+ +Use @sv stat@ and check the log file to verify the service is running. + + +
~$ sudo sv stat /etc/service/arvados-ws
+run: /etc/service/arvados-ws: (pid 12520) 2s; run: log: (pid 12519) 2s
+~$ tail /etc/service/arvados-ws/log/main/current
+{"level":"info","msg":"started","time":"2016-12-06T11:56:20.669171449-05:00"}
+{"Listen":":9003","level":"info","msg":"listening","time":"2016-12-06T11:56:20.708847627-05:00"}
+
+
+ +h3(#confirm). Confirm the service is working + +Confirm the service is listening on its assigned port and responding to requests. + + +
~$ curl http://0.0.0.0:9003/status.json
+{"Clients":1}
+
+
+ +h3. Set up a reverse proxy with SSL support + +The arvados-ws service will be accessible from anywhere on the internet, so we recommend using SSL for transport encryption. + +This is best achieved by putting a reverse proxy with SSL support in front of arvados-ws, running on port 443 and passing requests to arvados-ws on port 9003 (or whatever port you chose in your configuration file). + +For example, using Nginx: + +
+upstream arvados-ws {
+  server                127.0.0.1:9003;
+}
+
+server {
+  listen                [your public IP address]:443 ssl;
+  server_name           ws.uuid_prefix.your.domain;
+
+  proxy_connect_timeout 90s;
+  proxy_read_timeout    300s;
+
+  ssl                   on;
+  ssl_certificate       YOUR/PATH/TO/cert.pem;
+  ssl_certificate_key   YOUR/PATH/TO/cert.key;
+
+  location / {
+    proxy_pass          http://arvados-ws;
+    proxy_set_header    Upgrade         $http_upgrade;
+    proxy_set_header    Connection      "upgrade";
+    proxy_set_header    Host            $host;
+    proxy_set_header    X-Forwarded-For $proxy_add_x_forwarded_for;
+  }
+}
+
+ +If Nginx is already configured to proxy @ws@ requests to puma, move that configuration out of the way or change its @server_name@ so it doesn't conflict. + +h3. Update API server configuration + +Ensure the websocket server address is correct in the API server configuration file @/etc/arvados/api/application.yml@. + + +
websocket_address: wss://ws.uuid_prefix.your.domain/websocket
+
+
+ +Restart Nginx to reload the API server configuration. + + +
$ sudo nginx -s reload
+
+
+ +h3. Verify DNS and proxy setup + +Use a host elsewhere on the Internet to confirm that your DNS, proxy, and SSL are configured correctly. + + +
$ curl https://ws.uuid_prefix.your.domain/status.json
+{"Clients":1}
+
+
diff --git a/sdk/cli/test/test_arv-keep-put.rb b/sdk/cli/test/test_arv-keep-put.rb index fefbc27298..e6ead25b80 100644 --- a/sdk/cli/test/test_arv-keep-put.rb +++ b/sdk/cli/test/test_arv-keep-put.rb @@ -40,7 +40,7 @@ class TestArvKeepPut < Minitest::Test def test_raw_file out, err = capture_subprocess_io do - assert arv_put('--raw', './tmp/foo') + assert arv_put('--no-cache', '--raw', './tmp/foo') end $stderr.write err assert_match '', err @@ -87,7 +87,7 @@ class TestArvKeepPut < Minitest::Test def test_as_stream out, err = capture_subprocess_io do - assert arv_put('--as-stream', './tmp/foo') + assert arv_put('--no-cache', '--as-stream', './tmp/foo') end $stderr.write err assert_match '', err @@ -96,7 +96,7 @@ class TestArvKeepPut < Minitest::Test def test_progress out, err = capture_subprocess_io do - assert arv_put('--manifest', '--progress', './tmp/foo') + assert arv_put('--no-cache', '--manifest', '--progress', './tmp/foo') end assert_match /%/, err assert match_collection_uuid(out) @@ -104,7 +104,7 @@ class TestArvKeepPut < Minitest::Test def test_batch_progress out, err = capture_subprocess_io do - assert arv_put('--manifest', '--batch-progress', './tmp/foo') + assert arv_put('--no-cache', '--manifest', '--batch-progress', './tmp/foo') end assert_match /: 0 written 3 total/, err assert_match /: 3 written 3 total/, err diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py index bb661550da..0394988ccd 100644 --- a/sdk/cwl/tests/test_container.py +++ b/sdk/cwl/tests/test_container.py @@ -8,7 +8,7 @@ import functools import cwltool.process from schema_salad.ref_resolver import Loader -from schema_salad.ref_resolver import Loader +from .matcher import JsonDiffMatcher if not os.getenv('ARVADOS_DEBUG'): logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN) @@ -48,7 +48,7 @@ class TestContainer(unittest.TestCase): make_fs_access=make_fs_access, tmpdir="/tmp"): j.run(enable_reuse=enable_reuse) runner.api.container_requests().create.assert_called_with( - body={ + body=JsonDiffMatcher({ 'environment': { 'HOME': '/var/spool/cwl', 'TMPDIR': '/tmp' @@ -69,8 +69,9 @@ class TestContainer(unittest.TestCase): 'container_image': '99999999999999999999999999999993+99', 'command': ['ls', '/var/spool/cwl'], 'cwd': '/var/spool/cwl', - 'scheduling_parameters': {} - }) + 'scheduling_parameters': {}, + 'properties': {}, + })) # The test passes some fields in builder.resources # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024} @@ -141,7 +142,8 @@ class TestContainer(unittest.TestCase): 'cwd': '/var/spool/cwl', 'scheduling_parameters': { 'partitions': ['blurb'] - } + }, + 'properties': {} } call_body = call_kwargs.get('body', None) diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go index 36f4eb52ae..fc937494e5 100644 --- a/sdk/go/arvados/client.go +++ b/sdk/go/arvados/client.go @@ -41,6 +41,8 @@ type Client struct { // callers who use a Client to initialize an // arvadosclient.ArvadosClient.) KeepServiceURIs []string `json:",omitempty"` + + dd *DiscoveryDocument } // The default http.Client used by a Client with Insecure==true and @@ -198,14 +200,103 @@ func (c *Client) apiURL(path string) string { // DiscoveryDocument is the Arvados server's description of itself. type DiscoveryDocument struct { - DefaultCollectionReplication int `json:"defaultCollectionReplication"` - BlobSignatureTTL int64 `json:"blobSignatureTtl"` + BasePath string `json:"basePath"` + DefaultCollectionReplication int `json:"defaultCollectionReplication"` + BlobSignatureTTL int64 `json:"blobSignatureTtl"` + Schemas map[string]Schema `json:"schemas"` + Resources map[string]Resource `json:"resources"` +} + +type Resource struct { + Methods map[string]ResourceMethod `json:"methods"` +} + +type ResourceMethod struct { + HTTPMethod string `json:"httpMethod"` + Path string `json:"path"` + Response MethodResponse `json:"response"` +} + +type MethodResponse struct { + Ref string `json:"$ref"` +} + +type Schema struct { + UUIDPrefix string `json:"uuidPrefix"` } // DiscoveryDocument returns a *DiscoveryDocument. The returned object // should not be modified: the same object may be returned by // subsequent calls. func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) { + if c.dd != nil { + return c.dd, nil + } var dd DiscoveryDocument - return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil) + err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil) + if err != nil { + return nil, err + } + c.dd = &dd + return c.dd, nil +} + +func (c *Client) modelForUUID(dd *DiscoveryDocument, uuid string) (string, error) { + if len(uuid) != 27 { + return "", fmt.Errorf("invalid UUID: %q", uuid) + } + infix := uuid[6:11] + var model string + for m, s := range dd.Schemas { + if s.UUIDPrefix == infix { + model = m + break + } + } + if model == "" { + return "", fmt.Errorf("unrecognized type portion %q in UUID %q", infix, uuid) + } + return model, nil +} + +func (c *Client) KindForUUID(uuid string) (string, error) { + dd, err := c.DiscoveryDocument() + if err != nil { + return "", err + } + model, err := c.modelForUUID(dd, uuid) + if err != nil { + return "", err + } + return "arvados#" + strings.ToLower(model[:1]) + model[1:], nil +} + +func (c *Client) PathForUUID(method, uuid string) (string, error) { + dd, err := c.DiscoveryDocument() + if err != nil { + return "", err + } + model, err := c.modelForUUID(dd, uuid) + if err != nil { + return "", err + } + var resource string + for r, rsc := range dd.Resources { + if rsc.Methods["get"].Response.Ref == model { + resource = r + break + } + } + if resource == "" { + return "", fmt.Errorf("no resource for model: %q", model) + } + m, ok := dd.Resources[resource].Methods[method] + if !ok { + return "", fmt.Errorf("no method %q for resource %q", method, resource) + } + path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1) + if path[0] == '/' { + path = path[1:] + } + return path, nil } diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go new file mode 100644 index 0000000000..a48f1c6b5c --- /dev/null +++ b/sdk/go/arvados/log.go @@ -0,0 +1,25 @@ +package arvados + +import ( + "time" +) + +// Log is an arvados#log record +type Log struct { + ID uint64 `json:"id"` + UUID string `json:"uuid"` + ObjectUUID string `json:"object_uuid"` + ObjectOwnerUUID string `json:"object_owner_uuid"` + EventType string `json:"event_type"` + EventAt *time.Time `json:"event,omitempty"` + Properties map[string]interface{} `json:"properties"` + CreatedAt *time.Time `json:"created_at,omitempty"` +} + +// LogList is an arvados#logList resource. +type LogList struct { + Items []Log `json:"items"` + ItemsAvailable int `json:"items_available"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} diff --git a/sdk/go/config/load.go b/sdk/go/config/load.go index 9c65d65e84..2bbb440fb3 100644 --- a/sdk/go/config/load.go +++ b/sdk/go/config/load.go @@ -22,3 +22,8 @@ func LoadFile(cfg interface{}, configPath string) error { } return nil } + +// Dump returns a YAML representation of cfg. +func Dump(cfg interface{}) ([]byte, error) { + return yaml.Marshal(cfg) +} diff --git a/sdk/go/ctxlog/log.go b/sdk/go/ctxlog/log.go new file mode 100644 index 0000000000..6565c88f07 --- /dev/null +++ b/sdk/go/ctxlog/log.go @@ -0,0 +1,59 @@ +package ctxlog + +import ( + "context" + + "github.com/Sirupsen/logrus" +) + +var ( + loggerCtxKey = new(int) + rootLogger = logrus.New() +) + +const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" + +// Context returns a new child context such that FromContext(child) +// returns the given logger. +func Context(ctx context.Context, logger *logrus.Entry) context.Context { + return context.WithValue(ctx, loggerCtxKey, logger) +} + +// FromContext returns the logger suitable for the given context -- the one +// attached by contextWithLogger() if applicable, otherwise the +// top-level logger with no fields/values. +func FromContext(ctx context.Context) *logrus.Entry { + if ctx != nil { + if logger, ok := ctx.Value(loggerCtxKey).(*logrus.Entry); ok { + return logger + } + } + return rootLogger.WithFields(nil) +} + +// SetLevel sets the current logging level. See logrus for level +// names. +func SetLevel(level string) { + lvl, err := logrus.ParseLevel(level) + if err != nil { + logrus.Fatal(err) + } + rootLogger.Level = lvl +} + +// SetFormat sets the current logging format to "json" or "text". +func SetFormat(format string) { + switch format { + case "text": + rootLogger.Formatter = &logrus.TextFormatter{ + FullTimestamp: true, + TimestampFormat: rfc3339NanoFixed, + } + case "json": + rootLogger.Formatter = &logrus.JSONFormatter{ + TimestampFormat: rfc3339NanoFixed, + } + default: + logrus.WithField("LogFormat", format).Fatal("unknown log format") + } +} diff --git a/sdk/go/stats/duration.go b/sdk/go/stats/duration.go new file mode 100644 index 0000000000..103dea0055 --- /dev/null +++ b/sdk/go/stats/duration.go @@ -0,0 +1,35 @@ +package stats + +import ( + "fmt" + "strconv" + "time" +) + +// Duration is a duration that is displayed as a number of seconds in +// fixed-point notation. +type Duration time.Duration + +// MarshalJSON implements json.Marshaler. +func (d Duration) MarshalJSON() ([]byte, error) { + return []byte(d.String()), nil +} + +// String implements fmt.Stringer. +func (d Duration) String() string { + return fmt.Sprintf("%.6f", time.Duration(d).Seconds()) +} + +// UnmarshalJSON implements json.Unmarshaler +func (d *Duration) UnmarshalJSON(data []byte) error { + return d.Set(string(data)) +} + +// Value implements flag.Value +func (d *Duration) Set(s string) error { + sec, err := strconv.ParseFloat(s, 64) + if err == nil { + *d = Duration(sec * float64(time.Second)) + } + return err +} diff --git a/sdk/go/stats/duration_test.go b/sdk/go/stats/duration_test.go new file mode 100644 index 0000000000..730e646999 --- /dev/null +++ b/sdk/go/stats/duration_test.go @@ -0,0 +1,23 @@ +package stats + +import ( + "testing" + "time" +) + +func TestString(t *testing.T) { + d := Duration(123123123123 * time.Nanosecond) + if s, expect := d.String(), "123.123123"; s != expect { + t.Errorf("got %s, expect %s", s, expect) + } +} + +func TestSet(t *testing.T) { + var d Duration + if err := d.Set("123.456"); err != nil { + t.Fatal(err) + } + if got, expect := time.Duration(d).Nanoseconds(), int64(123456000000); got != expect { + t.Errorf("got %d, expect %d", got, expect) + } +} diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 517d617d8c..4cc2591ebb 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -759,6 +759,14 @@ class ArvadosFile(object): def writable(self): return self.parent.writable() + @synchronized + def permission_expired(self, as_of_dt=None): + """Returns True if any of the segment's locators is expired""" + for r in self._segments: + if KeepLocator(r.locator).permission_expired(as_of_dt): + return True + return False + @synchronized def segments(self): return copy.copy(self._segments) diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 27aad033ae..812438e2cc 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -565,16 +565,23 @@ class RichCollectionBase(CollectionBase): def find(self, path): """Recursively search the specified file path. - May return either a Collection or ArvadosFile. Return None if not + May return either a Collection or ArvadosFile. Return None if not found. + If path is invalid (ex: starts with '/'), an IOError exception will be + raised. """ if not path: raise errors.ArgumentError("Parameter 'path' is empty.") pathcomponents = path.split("/", 1) + if pathcomponents[0] == '': + raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) + item = self._items.get(pathcomponents[0]) - if len(pathcomponents) == 1: + if item is None: + return None + elif len(pathcomponents) == 1: return item else: if isinstance(item, RichCollectionBase): @@ -829,7 +836,7 @@ class RichCollectionBase(CollectionBase): if target_dir is None: raise IOError(errno.ENOENT, "Target directory not found", target_name) - if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents: + if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: target_dir = target_dir[target_name] target_name = sourcecomponents[-1] diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index e3b41b26d3..38e4349614 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -7,21 +7,22 @@ import argparse import arvados import arvados.collection import base64 +import copy import datetime import errno import fcntl import hashlib import json +import logging import os import pwd -import time +import re import signal import socket import sys import tempfile import threading -import copy -import logging +import time from apiclient import errors as apiclient_errors from arvados._version import __version__ @@ -43,13 +44,7 @@ Local file or directory. Default: read from standard input. _group = upload_opts.add_mutually_exclusive_group() _group.add_argument('--max-manifest-depth', type=int, metavar='N', - default=-1, help=""" -Maximum depth of directory tree to represent in the manifest -structure. A directory structure deeper than this will be represented -as a single stream in the manifest. If N=0, the manifest will contain -a single stream. Default: -1 (unlimited), i.e., exactly one manifest -stream per filesystem directory that contains files. -""") + default=-1, help=argparse.SUPPRESS) _group.add_argument('--normalize', action='store_true', help=""" @@ -57,6 +52,12 @@ Normalize the manifest by re-ordering files and streams after writing data. """) +_group.add_argument('--dry-run', action='store_true', default=False, + help=""" +Don't actually upload files, but only check if any file should be +uploaded. Exit with code=2 when files are pending for upload. +""") + _group = upload_opts.add_mutually_exclusive_group() _group.add_argument('--as-stream', action='store_true', dest='stream', @@ -100,6 +101,12 @@ separated by commas, with a trailing newline. Do not store a manifest. """) +upload_opts.add_argument('--update-collection', type=str, default=None, + dest='update_collection', metavar="UUID", help=""" +Update an existing collection identified by the given Arvados collection +UUID. All new local files will be uploaded. +""") + upload_opts.add_argument('--use-filename', type=str, default=None, dest='filename', help=""" Synonym for --filename. @@ -167,6 +174,16 @@ _group.add_argument('--no-resume', action='store_false', dest='resume', Do not continue interrupted uploads from cached state. """) +_group = run_opts.add_mutually_exclusive_group() +_group.add_argument('--cache', action='store_true', dest='use_cache', default=True, + help=""" +Save upload state in a cache file for resuming (default). +""") +_group.add_argument('--no-cache', action='store_false', dest='use_cache', + help=""" +Do not save upload state in a cache file for resuming. +""") + arg_parser = argparse.ArgumentParser( description='Copy data from the local filesystem to Keep.', parents=[upload_opts, run_opts, arv_cmd.retry_opt]) @@ -191,17 +208,54 @@ def parse_arguments(arguments): and os.isatty(sys.stderr.fileno())): args.progress = True + # Turn off --resume (default) if --no-cache is used. + if not args.use_cache: + args.resume = False + if args.paths == ['-']: + if args.update_collection: + arg_parser.error(""" + --update-collection cannot be used when reading from stdin. + """) args.resume = False + args.use_cache = False if not args.filename: args.filename = 'stdin' return args + +class CollectionUpdateError(Exception): + pass + + class ResumeCacheConflict(Exception): pass +class ArvPutArgumentConflict(Exception): + pass + + +class ArvPutUploadIsPending(Exception): + pass + + +class ArvPutUploadNotPending(Exception): + pass + + +class FileUploadList(list): + def __init__(self, dry_run=False): + list.__init__(self) + self.dry_run = dry_run + + def append(self, other): + if self.dry_run: + raise ArvPutUploadIsPending() + super(FileUploadList, self).append(other) + + class ResumeCache(object): CACHE_DIR = '.cache/arvados/arv-put' @@ -217,7 +271,7 @@ class ResumeCache(object): realpaths = sorted(os.path.realpath(path) for path in args.paths) md5.update('\0'.join(realpaths)) if any(os.path.isdir(path) for path in realpaths): - md5.update(str(max(args.max_manifest_depth, -1))) + md5.update("-1") elif args.filename: md5.update(args.filename) return os.path.join( @@ -291,12 +345,15 @@ class ArvPutUploadJob(object): 'files' : {} # Previous run file list: {path : {size, mtime}} } - def __init__(self, paths, resume=True, reporter=None, bytes_expected=None, - name=None, owner_uuid=None, ensure_unique_name=False, - num_retries=None, replication_desired=None, - filename=None, update_time=1.0): + def __init__(self, paths, resume=True, use_cache=True, reporter=None, + bytes_expected=None, name=None, owner_uuid=None, + ensure_unique_name=False, num_retries=None, replication_desired=None, + filename=None, update_time=20.0, update_collection=None, + logger=logging.getLogger('arvados.arv_put'), dry_run=False): self.paths = paths self.resume = resume + self.use_cache = use_cache + self.update = False self.reporter = reporter self.bytes_expected = bytes_expected self.bytes_written = 0 @@ -311,51 +368,108 @@ class ArvPutUploadJob(object): self._state = None # Previous run state (file list & manifest) self._current_files = [] # Current run file list self._cache_file = None - self._collection = None self._collection_lock = threading.Lock() + self._remote_collection = None # Collection being updated (if asked) + self._local_collection = None # Collection from previous run manifest + self._file_paths = [] # Files to be updated in remote collection self._stop_checkpointer = threading.Event() self._checkpointer = threading.Thread(target=self._update_task) + self._checkpointer.daemon = True self._update_task_time = update_time # How many seconds wait between update runs - self.logger = logging.getLogger('arvados.arv_put') + self._files_to_upload = FileUploadList(dry_run=dry_run) + self.logger = logger + self.dry_run = dry_run + + if not self.use_cache and self.resume: + raise ArvPutArgumentConflict('resume cannot be True when use_cache is False') + + # Check for obvious dry-run responses + if self.dry_run and (not self.use_cache or not self.resume): + raise ArvPutUploadIsPending() + # Load cached data if any and if needed - self._setup_state() + self._setup_state(update_collection) - def start(self): + def start(self, save_collection): """ Start supporting thread & file uploading """ - self._checkpointer.daemon = True - self._checkpointer.start() + if not self.dry_run: + self._checkpointer.start() try: for path in self.paths: # Test for stdin first, in case some file named '-' exist if path == '-': + if self.dry_run: + raise ArvPutUploadIsPending() self._write_stdin(self.filename or 'stdin') elif os.path.isdir(path): - self._write_directory_tree(path) + # Use absolute paths on cache index so CWD doesn't interfere + # with the caching logic. + prefixdir = path = os.path.abspath(path) + if prefixdir != '/': + prefixdir += '/' + for root, dirs, files in os.walk(path): + # Make os.walk()'s dir traversing order deterministic + dirs.sort() + files.sort() + for f in files: + self._check_file(os.path.join(root, f), + os.path.join(root[len(prefixdir):], f)) else: - self._write_file(path, self.filename or os.path.basename(path)) - finally: - # Stop the thread before doing anything else - self._stop_checkpointer.set() - self._checkpointer.join() - # Commit all & one last _update() - self.manifest_text() + self._check_file(os.path.abspath(path), + self.filename or os.path.basename(path)) + # If dry-mode is on, and got up to this point, then we should notify that + # there aren't any file to upload. + if self.dry_run: + raise ArvPutUploadNotPending() + # Remove local_collection's files that don't exist locally anymore, so the + # bytes_written count is correct. + for f in self.collection_file_paths(self._local_collection, + path_prefix=""): + if f != 'stdin' and not f in self._file_paths: + self._local_collection.remove(f) + # Update bytes_written from current local collection and + # report initial progress. self._update() - if self.resume: + # Actual file upload + self._upload_files() + finally: + if not self.dry_run: + # Stop the thread before doing anything else + self._stop_checkpointer.set() + self._checkpointer.join() + # Commit all pending blocks & one last _update() + self._local_collection.manifest_text() + self._update(final=True) + if save_collection: + self.save_collection() + if self.use_cache: self._cache_file.close() - # Correct the final written bytes count - self.bytes_written -= self.bytes_skipped def save_collection(self): - with self._collection_lock: - self._my_collection().save_new( + if self.update: + # Check if files should be updated on the remote collection. + for fp in self._file_paths: + remote_file = self._remote_collection.find(fp) + if not remote_file: + # File don't exist on remote collection, copy it. + self._remote_collection.copy(fp, fp, self._local_collection) + elif remote_file != self._local_collection.find(fp): + # A different file exist on remote collection, overwrite it. + self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True) + else: + # The file already exist on remote collection, skip it. + pass + self._remote_collection.save(num_retries=self.num_retries) + else: + self._local_collection.save_new( name=self.name, owner_uuid=self.owner_uuid, ensure_unique_name=self.ensure_unique_name, num_retries=self.num_retries) def destroy_cache(self): - if self.resume: + if self.use_cache: try: os.unlink(self._cache_filename) except OSError as error: @@ -384,17 +498,20 @@ class ArvPutUploadJob(object): while not self._stop_checkpointer.wait(self._update_task_time): self._update() - def _update(self): + def _update(self, final=False): """ Update cached manifest text and report progress. """ with self._collection_lock: - self.bytes_written = self._collection_size(self._my_collection()) - # Update cache, if resume enabled - if self.resume: + self.bytes_written = self._collection_size(self._local_collection) + if self.use_cache: + # Update cache with self._state_lock: - # Get the manifest text without comitting pending blocks - self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True) + if final: + self._state['manifest'] = self._local_collection.manifest_text() + else: + # Get the manifest text without comitting pending blocks + self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True) self._save_state() # Call the reporter, if any self.report_progress() @@ -403,114 +520,116 @@ class ArvPutUploadJob(object): if self.reporter is not None: self.reporter(self.bytes_written, self.bytes_expected) - def _write_directory_tree(self, path, stream_name="."): - # TODO: Check what happens when multiple directories are passed as - # arguments. - # If the code below is uncommented, integration test - # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest) - # fails, I suppose it is because the manifest_uuid changes because - # of the dir addition to stream_name. - - # if stream_name == '.': - # stream_name = os.path.join('.', os.path.basename(path)) - for item in os.listdir(path): - if os.path.isdir(os.path.join(path, item)): - self._write_directory_tree(os.path.join(path, item), - os.path.join(stream_name, item)) - else: - self._write_file(os.path.join(path, item), - os.path.join(stream_name, item)) - def _write_stdin(self, filename): - with self._collection_lock: - output = self._my_collection().open(filename, 'w') + output = self._local_collection.open(filename, 'w') self._write(sys.stdin, output) output.close() - def _write_file(self, source, filename): + def _check_file(self, source, filename): + """Check if this file needs to be uploaded""" resume_offset = 0 - if self.resume: - # Check if file was already uploaded (at least partially) - with self._collection_lock: - try: - file_in_collection = self._my_collection().find(filename) - except IOError: - # Not found - file_in_collection = None + should_upload = False + new_file_in_cache = False + # Record file path for updating the remote collection before exiting + self._file_paths.append(filename) + + with self._state_lock: # If no previous cached data on this file, store it for an eventual # repeated run. if source not in self._state['files']: - with self._state_lock: - self._state['files'][source] = { - 'mtime': os.path.getmtime(source), - 'size' : os.path.getsize(source) - } - with self._state_lock: - cached_file_data = self._state['files'][source] - # See if this file was already uploaded at least partially - if file_in_collection: - if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source): - if cached_file_data['size'] == file_in_collection.size(): - # File already there, skip it. - self.bytes_skipped += cached_file_data['size'] - return - elif cached_file_data['size'] > file_in_collection.size(): - # File partially uploaded, resume! - resume_offset = file_in_collection.size() - else: - # Inconsistent cache, re-upload the file - self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source)) - else: - # Local file differs from cached data, re-upload it - pass - with open(source, 'r') as source_fd: - if resume_offset > 0: - # Start upload where we left off - with self._collection_lock: - output = self._my_collection().open(filename, 'a') - source_fd.seek(resume_offset) + self._state['files'][source] = { + 'mtime': os.path.getmtime(source), + 'size' : os.path.getsize(source) + } + new_file_in_cache = True + cached_file_data = self._state['files'][source] + + # Check if file was already uploaded (at least partially) + file_in_local_collection = self._local_collection.find(filename) + + # If not resuming, upload the full file. + if not self.resume: + should_upload = True + # New file detected from last run, upload it. + elif new_file_in_cache: + should_upload = True + # Local file didn't change from last run. + elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source): + if not file_in_local_collection: + # File not uploaded yet, upload it completely + should_upload = True + elif file_in_local_collection.permission_expired(): + # Permission token expired, re-upload file. This will change whenever + # we have a API for refreshing tokens. + should_upload = True + self._local_collection.remove(filename) + elif cached_file_data['size'] == file_in_local_collection.size(): + # File already there, skip it. + self.bytes_skipped += cached_file_data['size'] + elif cached_file_data['size'] > file_in_local_collection.size(): + # File partially uploaded, resume! + resume_offset = file_in_local_collection.size() self.bytes_skipped += resume_offset + should_upload = True else: - # Start from scratch - with self._collection_lock: - output = self._my_collection().open(filename, 'w') - self._write(source_fd, output) - output.close(flush=False) + # Inconsistent cache, re-upload the file + should_upload = True + self._local_collection.remove(filename) + self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source)) + # Local file differs from cached data, re-upload it. + else: + if file_in_local_collection: + self._local_collection.remove(filename) + should_upload = True + + if should_upload: + self._files_to_upload.append((source, resume_offset, filename)) + + def _upload_files(self): + for source, resume_offset, filename in self._files_to_upload: + with open(source, 'r') as source_fd: + with self._state_lock: + self._state['files'][source]['mtime'] = os.path.getmtime(source) + self._state['files'][source]['size'] = os.path.getsize(source) + if resume_offset > 0: + # Start upload where we left off + output = self._local_collection.open(filename, 'a') + source_fd.seek(resume_offset) + else: + # Start from scratch + output = self._local_collection.open(filename, 'w') + self._write(source_fd, output) + output.close(flush=False) def _write(self, source_fd, output): - first_read = True while True: data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) - # Allow an empty file to be written - if not data and not first_read: + if not data: break - if first_read: - first_read = False output.write(data) def _my_collection(self): - """ - Create a new collection if none cached. Load it from cache otherwise. - """ - if self._collection is None: - with self._state_lock: - manifest = self._state['manifest'] - if self.resume and manifest is not None: - # Create collection from saved state - self._collection = arvados.collection.Collection( - manifest, - replication_desired=self.replication_desired) - else: - # Create new collection - self._collection = arvados.collection.Collection( - replication_desired=self.replication_desired) - return self._collection + return self._remote_collection if self.update else self._local_collection - def _setup_state(self): + def _setup_state(self, update_collection): """ Create a new cache file or load a previously existing one. """ - if self.resume: + # Load an already existing collection for update + if update_collection and re.match(arvados.util.collection_uuid_pattern, + update_collection): + try: + self._remote_collection = arvados.collection.Collection(update_collection) + except arvados.errors.ApiError as error: + raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error)) + else: + self.update = True + elif update_collection: + # Collection locator provided, but unknown format + raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection)) + + if self.use_cache: + # Set up cache file name from input paths. md5 = hashlib.md5() md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost')) realpaths = sorted(os.path.realpath(path) for path in self.paths) @@ -518,13 +637,20 @@ class ArvPutUploadJob(object): if self.filename: md5.update(self.filename) cache_filename = md5.hexdigest() - self._cache_file = open(os.path.join( + cache_filepath = os.path.join( arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'), - cache_filename), 'a+') + cache_filename) + if self.resume: + self._cache_file = open(cache_filepath, 'a+') + else: + # --no-resume means start with a empty cache file. + self._cache_file = open(cache_filepath, 'w+') self._cache_filename = self._cache_file.name self._lock_file(self._cache_file) self._cache_file.seek(0) - with self._state_lock: + + with self._state_lock: + if self.use_cache: try: self._state = json.load(self._cache_file) if not set(['manifest', 'files']).issubset(set(self._state.keys())): @@ -533,13 +659,22 @@ class ArvPutUploadJob(object): except ValueError: # Cache file empty, set up new cache self._state = copy.deepcopy(self.EMPTY_STATE) - # Load how many bytes were uploaded on previous run - with self._collection_lock: - self.bytes_written = self._collection_size(self._my_collection()) - # No resume required - else: - with self._state_lock: + else: + # No cache file, set empty state self._state = copy.deepcopy(self.EMPTY_STATE) + # Load the previous manifest so we can check if files were modified remotely. + self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired) + + def collection_file_paths(self, col, path_prefix='.'): + """Return a list of file paths by recursively go through the entire collection `col`""" + file_paths = [] + for name, item in col.items(): + if isinstance(item, arvados.arvfile.ArvadosFile): + file_paths.append(os.path.join(path_prefix, name)) + elif isinstance(item, arvados.collection.Subcollection): + new_prefix = os.path.join(path_prefix, name) + file_paths += self.collection_file_paths(item, path_prefix=new_prefix) + return file_paths def _lock_file(self, fileobj): try: @@ -553,7 +688,7 @@ class ArvPutUploadJob(object): """ try: with self._state_lock: - state = self._state + state = copy.deepcopy(self._state) new_cache_fd, new_cache_name = tempfile.mkstemp( dir=os.path.dirname(self._cache_filename)) self._lock_file(new_cache_fd) @@ -573,24 +708,16 @@ class ArvPutUploadJob(object): self._cache_file = new_cache def collection_name(self): - with self._collection_lock: - name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None - return name + return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None def manifest_locator(self): - with self._collection_lock: - locator = self._my_collection().manifest_locator() - return locator + return self._my_collection().manifest_locator() def portable_data_hash(self): - with self._collection_lock: - datahash = self._my_collection().portable_data_hash() - return datahash + return self._my_collection().portable_data_hash() def manifest_text(self, stream_name=".", strip=False, normalize=False): - with self._collection_lock: - manifest = self._my_collection().manifest_text(stream_name, strip, normalize) - return manifest + return self._my_collection().manifest_text(stream_name, strip, normalize) def _datablocks_on_item(self, item): """ @@ -673,6 +800,7 @@ def desired_project_uuid(api_client, project_uuid, num_retries): def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): global api_client + logger = logging.getLogger('arvados.arv_put') args = parse_arguments(arguments) status = 0 if api_client is None: @@ -681,7 +809,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): # Determine the name to use if args.name: if args.stream or args.raw: - print >>stderr, "Cannot use --name with --stream or --raw" + logger.error("Cannot use --name with --stream or --raw") + sys.exit(1) + elif args.update_collection: + logger.error("Cannot use --name with --update-collection") sys.exit(1) collection_name = args.name else: @@ -691,7 +822,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): socket.gethostname()) if args.project_uuid and (args.stream or args.raw): - print >>stderr, "Cannot use --project-uuid with --stream or --raw" + logger.error("Cannot use --project-uuid with --stream or --raw") sys.exit(1) # Determine the parent project @@ -699,7 +830,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): project_uuid = desired_project_uuid(api_client, args.project_uuid, args.retries) except (apiclient_errors.Error, ValueError) as error: - print >>stderr, error + logger.error(error) sys.exit(1) if args.progress: @@ -710,9 +841,11 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): reporter = None bytes_expected = expected_bytes_for(args.paths) + try: writer = ArvPutUploadJob(paths = args.paths, resume = args.resume, + use_cache = args.use_cache, filename = args.filename, reporter = reporter, bytes_expected = bytes_expected, @@ -720,28 +853,54 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): replication_desired = args.replication, name = collection_name, owner_uuid = project_uuid, - ensure_unique_name = True) + ensure_unique_name = True, + update_collection = args.update_collection, + logger=logger, + dry_run=args.dry_run) except ResumeCacheConflict: - print >>stderr, "\n".join([ + logger.error("\n".join([ "arv-put: Another process is already uploading this data.", - " Use --no-resume if this is really what you want."]) + " Use --no-cache if this is really what you want."])) sys.exit(1) + except CollectionUpdateError as error: + logger.error("\n".join([ + "arv-put: %s" % str(error)])) + sys.exit(1) + except ArvPutUploadIsPending: + # Dry run check successful, return proper exit code. + sys.exit(2) + except ArvPutUploadNotPending: + # No files pending for upload + sys.exit(0) # Install our signal handler for each code in CAUGHT_SIGNALS, and save # the originals. orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler) for sigcode in CAUGHT_SIGNALS} - if args.resume and writer.bytes_written > 0: - print >>stderr, "\n".join([ - "arv-put: Resuming previous upload from last checkpoint.", - " Use the --no-resume option to start over."]) + if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0: + logger.warning("\n".join([ + "arv-put: Resuming previous upload from last checkpoint.", + " Use the --no-resume option to start over."])) - writer.report_progress() + if not args.dry_run: + writer.report_progress() output = None - writer.start() + try: + writer.start(save_collection=not(args.stream or args.raw)) + except arvados.errors.ApiError as error: + logger.error("\n".join([ + "arv-put: %s" % str(error)])) + sys.exit(1) + except ArvPutUploadIsPending: + # Dry run check successful, return proper exit code. + sys.exit(2) + except ArvPutUploadNotPending: + # No files pending for upload + sys.exit(0) + if args.progress: # Print newline to split stderr from stdout for humans. - print >>stderr + logger.info("\n") if args.stream: if args.normalize: @@ -752,14 +911,16 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): output = ','.join(writer.data_locators()) else: try: - writer.save_collection() - print >>stderr, "Collection saved as '%s'" % writer.collection_name() + if args.update_collection: + logger.info("Collection updated: '{}'".format(writer.collection_name())) + else: + logger.info("Collection saved as '{}'".format(writer.collection_name())) if args.portable_data_hash: output = writer.portable_data_hash() else: output = writer.manifest_locator() except apiclient_errors.Error as error: - print >>stderr, ( + logger.error( "arv-put: Error creating Collection on project: {}.".format( error)) status = 1 @@ -779,7 +940,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): sys.exit(status) # Success! - writer.destroy_cache() return output diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index c989479456..38f332b38e 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -296,14 +296,14 @@ class KeepClient(object): def _get_user_agent(self): try: - return self._user_agent_pool.get(False) + return self._user_agent_pool.get(block=False) except Queue.Empty: return pycurl.Curl() def _put_user_agent(self, ua): try: ua.reset() - self._user_agent_pool.put(ua, False) + self._user_agent_pool.put(ua, block=False) except: ua.close() diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 9d7d2481fd..d470ab4d00 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -46,7 +46,6 @@ setup(name='arvados-python-client', install_requires=[ 'google-api-python-client==1.4.2', 'oauth2client >=1.4.6, <2', - 'pyasn1-modules==0.0.5', 'ciso8601', 'httplib2', 'pycurl >=7.19.5.1, <7.21.5', diff --git a/sdk/python/tests/nginx.conf b/sdk/python/tests/nginx.conf index 2b8b6ca1c4..006604077d 100644 --- a/sdk/python/tests/nginx.conf +++ b/sdk/python/tests/nginx.conf @@ -54,4 +54,20 @@ http { proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/; } } + upstream ws { + server localhost:{{WSPORT}}; + } + server { + listen *:{{WSSPORT}} ssl default_server; + server_name ~^(?.*)$; + ssl_certificate {{SSLCERT}}; + ssl_certificate_key {{SSLKEY}}; + location / { + proxy_pass http://ws; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_set_header Host $request_host:{{WSPORT}}; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + } } diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py index 642b7ccbad..b969b12a7a 100644 --- a/sdk/python/tests/run_test_server.py +++ b/sdk/python/tests/run_test_server.py @@ -44,6 +44,7 @@ if not os.path.exists(TEST_TMPDIR): my_api_host = None _cached_config = {} +_cached_db_config = {} def find_server_pid(PID_PATH, wait=10): now = time.time() @@ -284,10 +285,19 @@ def run(leave_running_atexit=False): os.makedirs(gitdir) subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball]) + # The nginx proxy isn't listening here yet, but we need to choose + # the wss:// port now so we can write the API server config file. + wss_port = find_available_port() + _setport('wss', wss_port) + port = find_available_port() env = os.environ.copy() env['RAILS_ENV'] = 'test' - env['ARVADOS_WEBSOCKETS'] = 'yes' + env['ARVADOS_TEST_WSS_PORT'] = str(wss_port) + if env.get('ARVADOS_TEST_EXPERIMENTAL_WS'): + env.pop('ARVADOS_WEBSOCKETS', None) + else: + env['ARVADOS_WEBSOCKETS'] = 'yes' env.pop('ARVADOS_TEST_API_HOST', None) env.pop('ARVADOS_API_HOST', None) env.pop('ARVADOS_API_HOST_INSECURE', None) @@ -360,6 +370,47 @@ def stop(force=False): kill_server_pid(_pidfile('api')) my_api_host = None +def run_ws(): + if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ: + return + stop_ws() + port = find_available_port() + conf = os.path.join(TEST_TMPDIR, 'ws.yml') + with open(conf, 'w') as f: + f.write(""" +Client: + APIHost: {} + Insecure: true +Listen: :{} +LogLevel: {} +Postgres: + host: {} + dbname: {} + user: {} + password: {} + sslmode: require + """.format(os.environ['ARVADOS_API_HOST'], + port, + ('info' if os.environ.get('ARVADOS_DEBUG', '') in ['','0'] else 'debug'), + _dbconfig('host'), + _dbconfig('database'), + _dbconfig('username'), + _dbconfig('password'))) + logf = open(_fifo2stderr('ws'), 'w') + ws = subprocess.Popen( + ["ws", "-config", conf], + stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True) + with open(_pidfile('ws'), 'w') as f: + f.write(str(ws.pid)) + _wait_until_port_listens(port) + _setport('ws', port) + return port + +def stop_ws(): + if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ: + return + kill_server_pid(_pidfile('ws')) + def _start_keep(n, keep_args): keep0 = tempfile.mkdtemp() port = find_available_port() @@ -537,6 +588,7 @@ def stop_keep_web(): def run_nginx(): if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ: return + stop_nginx() nginxconf = {} nginxconf['KEEPWEBPORT'] = _getport('keep-web') nginxconf['KEEPWEBDLSSLPORT'] = find_available_port() @@ -545,6 +597,8 @@ def run_nginx(): nginxconf['KEEPPROXYSSLPORT'] = find_available_port() nginxconf['GITPORT'] = _getport('arv-git-httpd') nginxconf['GITSSLPORT'] = find_available_port() + nginxconf['WSPORT'] = _getport('ws') + nginxconf['WSSPORT'] = _getport('wss') nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem') nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key') nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log') @@ -593,7 +647,15 @@ def _getport(program): except IOError: return 9 +def _dbconfig(key): + global _cached_db_config + if not _cached_db_config: + _cached_db_config = yaml.load(open(os.path.join( + SERVICES_SRC_DIR, 'api', 'config', 'database.yml'))) + return _cached_db_config['test'][key] + def _apiconfig(key): + global _cached_config if _cached_config: return _cached_config[key] def _load(f, required=True): @@ -647,6 +709,7 @@ class TestCaseWithServers(unittest.TestCase): original environment. """ MAIN_SERVER = None + WS_SERVER = None KEEP_SERVER = None KEEP_PROXY_SERVER = None KEEP_WEB_SERVER = None @@ -667,6 +730,7 @@ class TestCaseWithServers(unittest.TestCase): os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None) for server_kwargs, start_func, stop_func in ( (cls.MAIN_SERVER, run, reset), + (cls.WS_SERVER, run_ws, stop_ws), (cls.KEEP_SERVER, run_keep, stop_keep), (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy), (cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)): @@ -693,6 +757,7 @@ class TestCaseWithServers(unittest.TestCase): if __name__ == "__main__": actions = [ 'start', 'stop', + 'start_ws', 'stop_ws', 'start_keep', 'stop_keep', 'start_keep_proxy', 'stop_keep_proxy', 'start_keep-web', 'stop_keep-web', @@ -725,6 +790,10 @@ if __name__ == "__main__": print(host) elif args.action == 'stop': stop(force=('ARVADOS_TEST_API_HOST' not in os.environ)) + elif args.action == 'start_ws': + run_ws() + elif args.action == 'stop_ws': + stop_ws() elif args.action == 'start_keep': run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers) elif args.action == 'stop_keep': diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py index f35e4c725c..f1dfd03def 100644 --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@ -32,9 +32,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): [], ['/dev/null'], ['/dev/null', '--filename', 'empty'], - ['/tmp'], - ['/tmp', '--max-manifest-depth', '0'], - ['/tmp', '--max-manifest-depth', '1'] + ['/tmp'] ] def tearDown(self): @@ -241,6 +239,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase): + def setUp(self): super(ArvPutUploadJobTest, self).setUp() run_test_server.authorize_with('active') @@ -271,7 +270,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, def test_writer_works_without_cache(self): cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False) - cwriter.start() + cwriter.start(save_collection=False) self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text()) def test_writer_works_with_cache(self): @@ -279,13 +278,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, f.write('foo') f.flush() cwriter = arv_put.ArvPutUploadJob([f.name]) - cwriter.start() - self.assertEqual(3, cwriter.bytes_written) + cwriter.start(save_collection=False) + self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped) # Don't destroy the cache, and start another upload cwriter_new = arv_put.ArvPutUploadJob([f.name]) - cwriter_new.start() + cwriter_new.start(save_collection=False) cwriter_new.destroy_cache() - self.assertEqual(0, cwriter_new.bytes_written) + self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped) def make_progress_tester(self): progression = [] @@ -301,13 +300,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, progression, reporter = self.make_progress_tester() cwriter = arv_put.ArvPutUploadJob([f.name], reporter=reporter, bytes_expected=expect_count) - cwriter.start() + cwriter.start(save_collection=False) cwriter.destroy_cache() self.assertIn((3, expect_count), progression) def test_writer_upload_directory(self): cwriter = arv_put.ArvPutUploadJob([self.tempdir]) - cwriter.start() + cwriter.start(save_collection=False) cwriter.destroy_cache() self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written) @@ -325,17 +324,128 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, writer = arv_put.ArvPutUploadJob([self.large_file_name], replication_desired=1) with self.assertRaises(SystemExit): - writer.start() - self.assertLess(writer.bytes_written, - os.path.getsize(self.large_file_name)) + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) # Retry the upload writer2 = arv_put.ArvPutUploadJob([self.large_file_name], replication_desired=1) - writer2.start() - self.assertEqual(writer.bytes_written + writer2.bytes_written, + writer2.start(save_collection=False) + self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped, os.path.getsize(self.large_file_name)) writer2.destroy_cache() + def test_no_resume_when_asked(self): + def wrapped_write(*args, **kwargs): + data = args[1] + # Exit only on last block + if len(data) < arvados.config.KEEP_BLOCK_SIZE: + raise SystemExit("Simulated error") + return self.arvfile_write(*args, **kwargs) + + with mock.patch('arvados.arvfile.ArvadosFileWriter.write', + autospec=True) as mocked_write: + mocked_write.side_effect = wrapped_write + writer = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) + # Retry the upload, this time without resume + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + resume=False) + writer2.start(save_collection=False) + self.assertEqual(writer2.bytes_skipped, 0) + self.assertEqual(writer2.bytes_written, + os.path.getsize(self.large_file_name)) + writer2.destroy_cache() + + def test_no_resume_when_no_cache(self): + def wrapped_write(*args, **kwargs): + data = args[1] + # Exit only on last block + if len(data) < arvados.config.KEEP_BLOCK_SIZE: + raise SystemExit("Simulated error") + return self.arvfile_write(*args, **kwargs) + + with mock.patch('arvados.arvfile.ArvadosFileWriter.write', + autospec=True) as mocked_write: + mocked_write.side_effect = wrapped_write + writer = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) + # Retry the upload, this time without cache usage + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + resume=False, + use_cache=False) + writer2.start(save_collection=False) + self.assertEqual(writer2.bytes_skipped, 0) + self.assertEqual(writer2.bytes_written, + os.path.getsize(self.large_file_name)) + writer2.destroy_cache() + + + def test_dry_run_feature(self): + def wrapped_write(*args, **kwargs): + data = args[1] + # Exit only on last block + if len(data) < arvados.config.KEEP_BLOCK_SIZE: + raise SystemExit("Simulated error") + return self.arvfile_write(*args, **kwargs) + + with mock.patch('arvados.arvfile.ArvadosFileWriter.write', + autospec=True) as mocked_write: + mocked_write.side_effect = wrapped_write + writer = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + with self.assertRaises(SystemExit): + writer.start(save_collection=False) + # Confirm that the file was partially uploaded + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) + # Retry the upload using dry_run to check if there is a pending upload + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True) + with self.assertRaises(arv_put.ArvPutUploadIsPending): + writer2.start(save_collection=False) + # Complete the pending upload + writer3 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1) + writer3.start(save_collection=False) + # Confirm there's no pending upload with dry_run=True + writer4 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True) + with self.assertRaises(arv_put.ArvPutUploadNotPending): + writer4.start(save_collection=False) + writer4.destroy_cache() + # Test obvious cases + with self.assertRaises(arv_put.ArvPutUploadIsPending): + arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True, + resume=False, + use_cache=False) + with self.assertRaises(arv_put.ArvPutUploadIsPending): + arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True, + resume=False) + class ArvadosExpectedBytesTest(ArvadosBaseTestCase): TEST_SIZE = os.path.getsize(__file__) @@ -634,6 +744,21 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, self.assertEqual(1, len(collection_list)) return collection_list[0] + def test_put_collection_with_later_update(self): + tmpdir = self.make_tmpdir() + with open(os.path.join(tmpdir, 'file1'), 'w') as f: + f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box') + col = self.run_and_find_collection("", ['--no-progress', tmpdir]) + self.assertNotEqual(None, col['uuid']) + # Add a new file to the directory + with open(os.path.join(tmpdir, 'file2'), 'w') as f: + f.write('The quick brown fox jumped over the lazy dog') + updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir]) + self.assertEqual(col['uuid'], updated_col['uuid']) + # Get the manifest and check that the new file is being included + c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute() + self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n') + def test_put_collection_with_high_redundancy(self): # Write empty data: we're not testing CollectionWriter, just # making sure collections.create tells the API server what our diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py index 6b3562602a..8f02d517fc 100644 --- a/sdk/python/tests/test_arvfile.py +++ b/sdk/python/tests/test_arvfile.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import bz2 +import datetime import gzip import io import mock @@ -570,6 +571,26 @@ class ArvadosFileReadlinesTestCase(ArvadosFileReadTestCase): def read_for_test(self, reader, byte_count, **kwargs): return ''.join(reader.readlines(**kwargs)) + +class ArvadosFileTestCase(unittest.TestCase): + def datetime_to_hex(self, dt): + return hex(int(time.mktime(dt.timetuple())))[2:] + + def test_permission_expired(self): + base_manifest = ". 781e5e245d69b566979b86e28d23f2c7+10+A715fd31f8111894f717eb1003c1b0216799dd9ec@{} 0:10:count.txt\n" + now = datetime.datetime.now() + a_week_ago = now - datetime.timedelta(days=7) + a_month_ago = now - datetime.timedelta(days=30) + a_week_from_now = now + datetime.timedelta(days=7) + with Collection(base_manifest.format(self.datetime_to_hex(a_week_from_now))) as c: + self.assertFalse(c.find('count.txt').permission_expired()) + with Collection(base_manifest.format(self.datetime_to_hex(a_week_ago))) as c: + f = c.find('count.txt') + self.assertTrue(f.permission_expired()) + self.assertTrue(f.permission_expired(a_week_from_now)) + self.assertFalse(f.permission_expired(a_month_ago)) + + class BlockManagerTest(unittest.TestCase): def test_bufferblock_append(self): keep = ArvadosFileWriterTestCase.MockKeep({}) diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py index fc30a242eb..0e3d5e13f1 100644 --- a/sdk/python/tests/test_collections.py +++ b/sdk/python/tests/test_collections.py @@ -861,6 +861,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin): c.find("/.") with self.assertRaises(arvados.errors.ArgumentError): c.find("") + self.assertIs(c.find("./nonexistant.txt"), None) + self.assertIs(c.find("./nonexistantsubdir/nonexistant.txt"), None) def test_remove_in_subdir(self): c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n') diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py index 0199724339..7e8c84ec11 100644 --- a/sdk/python/tests/test_events.py +++ b/sdk/python/tests/test_events.py @@ -53,21 +53,22 @@ class WebsocketTest(run_test_server.TestCaseWithServers): self.assertEqual(200, events.get(True, 5)['status']) human = arvados.api('v1').humans().create(body={}).execute() - log_object_uuids = [] - for i in range(0, expected): - log_object_uuids.append(events.get(True, 5)['object_uuid']) - + want_uuids = [] if expected > 0: - self.assertIn(human['uuid'], log_object_uuids) - + want_uuids.append(human['uuid']) if expected > 1: - self.assertIn(ancestor['uuid'], log_object_uuids) + want_uuids.append(ancestor['uuid']) + log_object_uuids = [] + while set(want_uuids) - set(log_object_uuids): + log_object_uuids.append(events.get(True, 5)['object_uuid']) - with self.assertRaises(Queue.Empty): - # assertEqual just serves to show us what unexpected thing - # comes out of the queue when the assertRaises fails; when - # the test passes, this assertEqual doesn't get called. - self.assertEqual(events.get(True, 2), None) + if expected < 2: + with self.assertRaises(Queue.Empty): + # assertEqual just serves to show us what unexpected + # thing comes out of the queue when the assertRaises + # fails; when the test passes, this assertEqual + # doesn't get called. + self.assertEqual(events.get(True, 2), None) def test_subscribe_websocket(self): self._test_subscribe( @@ -145,8 +146,8 @@ class WebsocketTest(run_test_server.TestCaseWithServers): return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60) def isotz(self, offset): - """Convert minutes-east-of-UTC to ISO8601 time zone designator""" - return '{:+03d}{:02d}'.format(offset/60, offset%60) + """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator""" + return '{:+03d}:{:02d}'.format(offset/60, offset%60) # Test websocket reconnection on (un)execpted close def _test_websocket_reconnect(self, close_unexpected): diff --git a/services/api/app/models/arvados_model.rb b/services/api/app/models/arvados_model.rb index aed0309591..910db7e0c7 100644 --- a/services/api/app/models/arvados_model.rb +++ b/services/api/app/models/arvados_model.rb @@ -252,12 +252,7 @@ class ArvadosModel < ActiveRecord::Base parts = full_text_searchable_columns.collect do |column| "coalesce(#{column},'')" end - # We prepend a space to the tsvector() argument here. Otherwise, - # it might start with a column that has its own (non-full-text) - # index, which causes Postgres to use the column index instead of - # the tsvector index, which causes full text queries to be just as - # slow as if we had no index at all. - "to_tsvector('english', ' ' || #{parts.join(" || ' ' || ")})" + "to_tsvector('english', #{parts.join(" || ' ' || ")})" end def self.apply_filters query, filters diff --git a/services/api/app/models/job.rb b/services/api/app/models/job.rb index ef3d0b5e10..9556799aba 100644 --- a/services/api/app/models/job.rb +++ b/services/api/app/models/job.rb @@ -119,6 +119,10 @@ class Job < ArvadosModel super - ["script_parameters_digest"] end + def self.full_text_searchable_columns + super - ["script_parameters_digest"] + end + def self.load_job_specific_filters attrs, orig_filters, read_users # Convert Job-specific @filters entries into general SQL filters. script_info = {"repository" => nil, "script" => nil} diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml index a9aa953f9f..c8d2d1d6f1 100644 --- a/services/api/config/application.default.yml +++ b/services/api/config/application.default.yml @@ -444,3 +444,4 @@ test: workbench_address: https://localhost:3001/ git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %> git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %> + websocket_address: <% if ENV['ARVADOS_TEST_EXPERIMENTAL_WS'] %>"wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"<% else %>false<% end %> diff --git a/services/api/db/migrate/20161213172944_full_text_search_indexes.rb b/services/api/db/migrate/20161213172944_full_text_search_indexes.rb new file mode 100644 index 0000000000..4cb4d978a1 --- /dev/null +++ b/services/api/db/migrate/20161213172944_full_text_search_indexes.rb @@ -0,0 +1,32 @@ +class FullTextSearchIndexes < ActiveRecord::Migration + def fts_indexes + { + "collections" => "collections_full_text_search_idx", + "container_requests" => "container_requests_full_text_search_idx", + "groups" => "groups_full_text_search_idx", + "jobs" => "jobs_full_text_search_idx", + "pipeline_instances" => "pipeline_instances_full_text_search_idx", + "pipeline_templates" => "pipeline_templates_full_text_search_idx", + "workflows" => "workflows_full_text_search_idx", + } + end + + def up + # remove existing fts indexes and create up to date ones with no leading space + fts_indexes.each do |t, i| + ActiveRecord::Base.connection.indexes(t).each do |idx| + if idx.name == i + remove_index t.to_sym, :name => i + break + end + end + execute "CREATE INDEX #{i} ON #{t} USING gin(#{t.classify.constantize.full_text_tsvector});" + end + end + + def down + fts_indexes.each do |t, i| + remove_index t.to_sym, :name => i + end + end +end diff --git a/services/api/db/structure.sql b/services/api/db/structure.sql index e715cd60c4..7ee7ea6ff9 100644 --- a/services/api/db/structure.sql +++ b/services/api/db/structure.sql @@ -1506,7 +1506,7 @@ CREATE UNIQUE INDEX collection_owner_uuid_name_unique ON collections USING btree -- Name: collections_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: -- -CREATE INDEX collections_full_text_search_idx ON collections USING gin (to_tsvector('english'::regconfig, (((((((((((((((((' '::text || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(portable_data_hash, ''::character varying))::text) || ' '::text) || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(file_names, ''::character varying))::text))); +CREATE INDEX collections_full_text_search_idx ON collections USING gin (to_tsvector('english'::regconfig, (((((((((((((((((COALESCE(owner_uuid, ''::character varying))::text || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(portable_data_hash, ''::character varying))::text) || ' '::text) || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(file_names, ''::character varying))::text))); -- @@ -1520,7 +1520,7 @@ CREATE INDEX collections_search_index ON collections USING btree (owner_uuid, mo -- Name: container_requests_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: -- -CREATE INDEX container_requests_full_text_search_idx ON container_requests USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(mounts, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)))); +CREATE INDEX container_requests_full_text_search_idx ON container_requests USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(mounts, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)) || ' '::text) || COALESCE(scheduling_parameters, ''::text)) || ' '::text) || (COALESCE(output_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log_uuid, ''::character varying))::text))); -- @@ -1541,7 +1541,7 @@ CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid -- Name: groups_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: -- -CREATE INDEX groups_full_text_search_idx ON groups USING gin (to_tsvector('english'::regconfig, (((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(group_class, ''::character varying))::text))); +CREATE INDEX groups_full_text_search_idx ON groups USING gin (to_tsvector('english'::regconfig, (((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(group_class, ''::character varying))::text))); -- @@ -2283,7 +2283,7 @@ CREATE INDEX job_tasks_search_index ON job_tasks USING btree (uuid, owner_uuid, -- Name: jobs_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: -- -CREATE INDEX jobs_full_text_search_idx ON jobs USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(submit_id, ''::character varying))::text) || ' '::text) || (COALESCE(script, ''::character varying))::text) || ' '::text) || (COALESCE(script_version, ''::character varying))::text) || ' '::text) || COALESCE(script_parameters, ''::text)) || ' '::text) || (COALESCE(cancelled_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(cancelled_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output, ''::character varying))::text) || ' '::text) || (COALESCE(is_locked_by_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log, ''::character varying))::text) || ' '::text) || COALESCE(tasks_summary, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(repository, ''::character varying))::text) || ' '::text) || (COALESCE(supplied_script_version, ''::character varying))::text) || ' '::text) || (COALESCE(docker_image_locator, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(arvados_sdk_version, ''::character varying))::text))); +CREATE INDEX jobs_full_text_search_idx ON jobs USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(submit_id, ''::character varying))::text) || ' '::text) || (COALESCE(script, ''::character varying))::text) || ' '::text) || (COALESCE(script_version, ''::character varying))::text) || ' '::text) || COALESCE(script_parameters, ''::text)) || ' '::text) || (COALESCE(cancelled_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(cancelled_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output, ''::character varying))::text) || ' '::text) || (COALESCE(is_locked_by_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log, ''::character varying))::text) || ' '::text) || COALESCE(tasks_summary, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(repository, ''::character varying))::text) || ' '::text) || (COALESCE(supplied_script_version, ''::character varying))::text) || ' '::text) || (COALESCE(docker_image_locator, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(arvados_sdk_version, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)))); -- @@ -2339,7 +2339,7 @@ CREATE INDEX nodes_search_index ON nodes USING btree (uuid, owner_uuid, modified -- Name: pipeline_instances_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: -- -CREATE INDEX pipeline_instances_full_text_search_idx ON pipeline_instances USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(pipeline_template_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || COALESCE(components_summary, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text))); +CREATE INDEX pipeline_instances_full_text_search_idx ON pipeline_instances USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(pipeline_template_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || COALESCE(components_summary, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text))); -- @@ -2360,7 +2360,7 @@ CREATE UNIQUE INDEX pipeline_template_owner_uuid_name_unique ON pipeline_templat -- Name: pipeline_templates_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: -- -CREATE INDEX pipeline_templates_full_text_search_idx ON pipeline_templates USING gin (to_tsvector('english'::regconfig, (((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text))); +CREATE INDEX pipeline_templates_full_text_search_idx ON pipeline_templates USING gin (to_tsvector('english'::regconfig, (((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text))); -- @@ -2416,7 +2416,7 @@ CREATE INDEX virtual_machines_search_index ON virtual_machines USING btree (uuid -- Name: workflows_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: -- -CREATE INDEX workflows_full_text_search_idx ON workflows USING gin (to_tsvector('english'::regconfig, (((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(definition, ''::text)))); +CREATE INDEX workflows_full_text_search_idx ON workflows USING gin (to_tsvector('english'::regconfig, (((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(definition, ''::text)))); -- @@ -2704,4 +2704,6 @@ INSERT INTO schema_migrations (version) VALUES ('20161111143147'); INSERT INTO schema_migrations (version) VALUES ('20161115171221'); -INSERT INTO schema_migrations (version) VALUES ('20161115174218'); \ No newline at end of file +INSERT INTO schema_migrations (version) VALUES ('20161115174218'); + +INSERT INTO schema_migrations (version) VALUES ('20161213172944'); \ No newline at end of file diff --git a/services/api/lib/tasks/config_dump.rake b/services/api/lib/tasks/config_dump.rake new file mode 100644 index 0000000000..c7e021488a --- /dev/null +++ b/services/api/lib/tasks/config_dump.rake @@ -0,0 +1,6 @@ +namespace :config do + desc 'Show site configuration' + task dump: :environment do + puts $application_config.to_yaml + end +end diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index 1828e150bb..51b0a57a7f 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -377,36 +377,28 @@ class Operations(llfuse.Operations): if 'event_type' not in ev: return with llfuse.lock: + new_attrs = (ev.get("properties") or {}).get("new_attributes") or {} + pdh = new_attrs.get("portable_data_hash") + # new_attributes.modified_at currently lacks + # subsecond precision (see #6347) so use event_at + # which should always be the same. + stamp = ev.get("event_at") + for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]): item.invalidate() - if ev["object_kind"] == "arvados#collection": - new_attr = (ev.get("properties") and - ev["properties"].get("new_attributes") and - ev["properties"]["new_attributes"]) - - # new_attributes.modified_at currently lacks - # subsecond precision (see #6347) so use event_at - # which should always be the same. - record_version = ( - (ev["event_at"], new_attr["portable_data_hash"]) - if new_attr else None) - - item.update(to_record_version=record_version) + if stamp and pdh and ev.get("object_kind") == "arvados#collection": + item.update(to_record_version=(stamp, pdh)) else: item.update() - oldowner = ( - ev.get("properties") and - ev["properties"].get("old_attributes") and - ev["properties"]["old_attributes"].get("owner_uuid")) - newowner = ev["object_owner_uuid"] + oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid") + newowner = ev.get("object_owner_uuid") for parent in ( self.inodes.inode_cache.find_by_uuid(oldowner) + self.inodes.inode_cache.find_by_uuid(newowner)): parent.invalidate() parent.update() - @catch_exceptions def getattr(self, inode): if inode not in self.inodes: diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py index f2948f9e45..ffcfc6500f 100644 --- a/services/fuse/arvados_fuse/command.py +++ b/services/fuse/arvados_fuse/command.py @@ -126,6 +126,8 @@ class Mount(object): return self def __exit__(self, exc_type, exc_value, traceback): + if self.operations.events: + self.operations.events.close(timeout=self.args.unmount_timeout) subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint]) self.llfuse_thread.join(timeout=self.args.unmount_timeout) if self.llfuse_thread.is_alive(): diff --git a/services/fuse/tests/mount_test_base.py b/services/fuse/tests/mount_test_base.py index 20192f9d84..1319aebdcc 100644 --- a/services/fuse/tests/mount_test_base.py +++ b/services/fuse/tests/mount_test_base.py @@ -66,6 +66,8 @@ class MountTestBase(unittest.TestCase): def tearDown(self): if self.llfuse_thread: + if self.operations.events: + self.operations.events.close(timeout=10) subprocess.call(["fusermount", "-u", "-z", self.mounttmp]) t0 = time.time() self.llfuse_thread.join(timeout=10) diff --git a/services/fuse/tests/test_retry.py b/services/fuse/tests/test_retry.py index b46ba7839f..81d5c86072 100644 --- a/services/fuse/tests/test_retry.py +++ b/services/fuse/tests/test_retry.py @@ -50,7 +50,7 @@ class RetryPUT(IntegrationTest): q.put(mockedCurl) q.put(pycurl.Curl()) q.put(pycurl.Curl()) - with mock.patch('arvados.keep.KeepClient.KeepService._get_user_agent', side_effect=lambda: q.get(block=None)): + with mock.patch('arvados.keep.KeepClient.KeepService._get_user_agent', side_effect=q.get_nowait): self.pool_test(os.path.join(self.mnt, 'zzz')) self.assertTrue(mockedCurl.perform.called) @staticmethod diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go index bfd006ee8d..e34f8581fd 100644 --- a/services/keepstore/logging_router.go +++ b/services/keepstore/logging_router.go @@ -5,12 +5,12 @@ package main import ( "context" - "fmt" "net/http" "strings" "time" "git.curoverse.com/arvados.git/sdk/go/httpserver" + "git.curoverse.com/arvados.git/sdk/go/stats" log "github.com/Sirupsen/logrus" ) @@ -97,25 +97,11 @@ func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWrite } lgr.WithFields(log.Fields{ - "timeTotal": loggedDuration(tDone.Sub(tStart)), - "timeToStatus": loggedDuration(resp.sentHdr.Sub(tStart)), - "timeWriteBody": loggedDuration(tDone.Sub(resp.sentHdr)), + "timeTotal": stats.Duration(tDone.Sub(tStart)), + "timeToStatus": stats.Duration(resp.sentHdr.Sub(tStart)), + "timeWriteBody": stats.Duration(tDone.Sub(resp.sentHdr)), "respStatusCode": resp.Status, "respStatus": statusText, "respBytes": resp.Length, }).Info("response") } - -type loggedDuration time.Duration - -// MarshalJSON formats a duration as a number of seconds, using -// fixed-point notation with no more than 6 decimal places. -func (d loggedDuration) MarshalJSON() ([]byte, error) { - return []byte(d.String()), nil -} - -// String formats a duration as a number of seconds, using -// fixed-point notation with no more than 6 decimal places. -func (d loggedDuration) String() string { - return fmt.Sprintf("%.6f", time.Duration(d).Seconds()) -} diff --git a/services/ws/arvados-ws.service b/services/ws/arvados-ws.service new file mode 100644 index 0000000000..ebccf0c89d --- /dev/null +++ b/services/ws/arvados-ws.service @@ -0,0 +1,13 @@ +[Unit] +Description=Arvados websocket server +Documentation=https://doc.arvados.org/ +After=network.target +AssertPathExists=/etc/arvados/ws/ws.yml + +[Service] +Type=notify +ExecStart=/usr/bin/arvados-ws +Restart=always + +[Install] +WantedBy=multi-user.target diff --git a/services/ws/config.go b/services/ws/config.go new file mode 100644 index 0000000000..0faa863d82 --- /dev/null +++ b/services/ws/config.go @@ -0,0 +1,40 @@ +package main + +import ( + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +type wsConfig struct { + Client arvados.Client + Postgres pgConfig + Listen string + LogLevel string + LogFormat string + + PingTimeout arvados.Duration + ClientEventQueue int + ServerEventQueue int +} + +func defaultConfig() wsConfig { + return wsConfig{ + Client: arvados.Client{ + APIHost: "localhost:443", + }, + Postgres: pgConfig{ + "dbname": "arvados_production", + "user": "arvados", + "password": "xyzzy", + "host": "localhost", + "connect_timeout": "30", + "sslmode": "require", + }, + LogLevel: "info", + LogFormat: "json", + PingTimeout: arvados.Duration(time.Minute), + ClientEventQueue: 64, + ServerEventQueue: 4, + } +} diff --git a/services/ws/doc.go b/services/ws/doc.go new file mode 100644 index 0000000000..7ccb588d85 --- /dev/null +++ b/services/ws/doc.go @@ -0,0 +1,55 @@ +// Arvados-ws exposes Arvados APIs (currently just one, the +// cache-invalidation event feed at "ws://.../websocket") to +// websocket clients. +// +// Installation +// +// See https://doc.arvados.org/install/install-ws.html. +// +// Developer info +// +// See https://dev.arvados.org/projects/arvados/wiki/Hacking_websocket_server. +// +// Usage +// +// arvados-ws [-config /etc/arvados/ws/ws.yml] [-dump-config] +// +// Minimal configuration +// +// Client: +// APIHost: localhost:443 +// Listen: ":1234" +// Postgres: +// dbname: arvados_production +// host: localhost +// password: xyzzy +// user: arvados +// +// Options +// +// -config path +// +// Load configuration from the given file instead of the default +// /etc/arvados/ws/ws.yml +// +// -dump-config +// +// Print the loaded configuration to stdout and exit. +// +// Logs +// +// Logs are printed to stderr, formatted as JSON. +// +// A log is printed each time a client connects or disconnects. +// +// Enable additional logs by configuring: +// +// LogLevel: debug +// +// Runtime status +// +// GET /debug.json responds with debug stats. +// +// GET /status.json responds with health check results and +// activity/usage metrics. +package main diff --git a/services/ws/event.go b/services/ws/event.go new file mode 100644 index 0000000000..304f86bbd0 --- /dev/null +++ b/services/ws/event.go @@ -0,0 +1,65 @@ +package main + +import ( + "database/sql" + "sync" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/ghodss/yaml" +) + +type eventSink interface { + Channel() <-chan *event + Stop() +} + +type eventSource interface { + NewSink() eventSink + DB() *sql.DB +} + +type event struct { + LogID uint64 + Received time.Time + Ready time.Time + Serial uint64 + + db *sql.DB + logRow *arvados.Log + err error + mtx sync.Mutex +} + +// Detail returns the database row corresponding to the event. It can +// be called safely from multiple goroutines. Only one attempt will be +// made. If the database row cannot be retrieved, Detail returns nil. +func (e *event) Detail() *arvados.Log { + e.mtx.Lock() + defer e.mtx.Unlock() + if e.logRow != nil || e.err != nil { + return e.logRow + } + var logRow arvados.Log + var propYAML []byte + e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), event_at, created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan( + &logRow.ID, + &logRow.UUID, + &logRow.ObjectUUID, + &logRow.ObjectOwnerUUID, + &logRow.EventType, + &logRow.EventAt, + &logRow.CreatedAt, + &propYAML) + if e.err != nil { + logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed") + return nil + } + e.err = yaml.Unmarshal(propYAML, &logRow.Properties) + if e.err != nil { + logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed") + return nil + } + e.logRow = &logRow + return e.logRow +} diff --git a/services/ws/event_source.go b/services/ws/event_source.go new file mode 100644 index 0000000000..ea90ec7242 --- /dev/null +++ b/services/ws/event_source.go @@ -0,0 +1,216 @@ +package main + +import ( + "database/sql" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "git.curoverse.com/arvados.git/sdk/go/stats" + "github.com/lib/pq" +) + +type pgConfig map[string]string + +func (c pgConfig) ConnectionString() string { + s := "" + for k, v := range c { + s += k + s += "='" + s += strings.Replace( + strings.Replace(v, `\`, `\\`, -1), + `'`, `\'`, -1) + s += "' " + } + return s +} + +type pgEventSource struct { + DataSource string + QueueSize int + + db *sql.DB + pqListener *pq.Listener + queue chan *event + sinks map[*pgEventSink]bool + setupOnce sync.Once + mtx sync.Mutex + shutdown chan error + + lastQDelay time.Duration + eventsIn uint64 + eventsOut uint64 +} + +var _ debugStatuser = (*pgEventSource)(nil) + +func (ps *pgEventSource) setup() { + ps.shutdown = make(chan error, 1) + ps.sinks = make(map[*pgEventSink]bool) + + db, err := sql.Open("postgres", ps.DataSource) + if err != nil { + logger(nil).WithError(err).Fatal("sql.Open failed") + } + if err = db.Ping(); err != nil { + logger(nil).WithError(err).Fatal("db.Ping failed") + } + ps.db = db + + ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) { + if err != nil { + // Until we have a mechanism for catching up + // on missed events, we cannot recover from a + // dropped connection without breaking our + // promises to clients. + logger(nil).WithError(err).Error("listener problem") + ps.shutdown <- err + } + }) + err = ps.pqListener.Listen("logs") + if err != nil { + logger(nil).WithError(err).Fatal("pq Listen failed") + } + logger(nil).Debug("pgEventSource listening") + + go ps.run() +} + +func (ps *pgEventSource) run() { + ps.queue = make(chan *event, ps.QueueSize) + + go func() { + for e := range ps.queue { + // Wait for the "select ... from logs" call to + // finish. This limits max concurrent queries + // to ps.QueueSize. Without this, max + // concurrent queries would be bounded by + // client_count X client_queue_size. + e.Detail() + + logger(nil). + WithField("serial", e.Serial). + WithField("detail", e.Detail()). + Debug("event ready") + e.Ready = time.Now() + ps.lastQDelay = e.Ready.Sub(e.Received) + + ps.mtx.Lock() + atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks))) + for sink := range ps.sinks { + sink.channel <- e + } + ps.mtx.Unlock() + } + }() + + var serial uint64 + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case err, ok := <-ps.shutdown: + if ok { + logger(nil).WithError(err).Info("shutdown") + } + close(ps.queue) + return + + case <-ticker.C: + logger(nil).Debug("listener ping") + ps.pqListener.Ping() + + case pqEvent, ok := <-ps.pqListener.Notify: + if !ok { + close(ps.queue) + return + } + if pqEvent.Channel != "logs" { + continue + } + logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64) + if err != nil { + logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload") + continue + } + serial++ + e := &event{ + LogID: logID, + Received: time.Now(), + Serial: serial, + db: ps.db, + } + logger(nil).WithField("event", e).Debug("incoming") + atomic.AddUint64(&ps.eventsIn, 1) + ps.queue <- e + go e.Detail() + } + } +} + +// NewSink subscribes to the event source. NewSink returns an +// eventSink, whose Channel() method returns a channel: a pointer to +// each subsequent event will be sent to that channel. +// +// The caller must ensure events are received from the sink channel as +// quickly as possible because when one sink stops being ready, all +// other sinks block. +func (ps *pgEventSource) NewSink() eventSink { + ps.setupOnce.Do(ps.setup) + sink := &pgEventSink{ + channel: make(chan *event, 1), + source: ps, + } + ps.mtx.Lock() + ps.sinks[sink] = true + ps.mtx.Unlock() + return sink +} + +func (ps *pgEventSource) DB() *sql.DB { + ps.setupOnce.Do(ps.setup) + return ps.db +} + +func (ps *pgEventSource) DebugStatus() interface{} { + ps.mtx.Lock() + defer ps.mtx.Unlock() + blocked := 0 + for sink := range ps.sinks { + blocked += len(sink.channel) + } + return map[string]interface{}{ + "EventsIn": atomic.LoadUint64(&ps.eventsIn), + "EventsOut": atomic.LoadUint64(&ps.eventsOut), + "Queue": len(ps.queue), + "QueueLimit": cap(ps.queue), + "QueueDelay": stats.Duration(ps.lastQDelay), + "Sinks": len(ps.sinks), + "SinksBlocked": blocked, + } +} + +type pgEventSink struct { + channel chan *event + source *pgEventSource +} + +func (sink *pgEventSink) Channel() <-chan *event { + return sink.channel +} + +func (sink *pgEventSink) Stop() { + go func() { + // Ensure this sink cannot fill up and block the + // server-side queue (which otherwise could in turn + // block our mtx.Lock() here) + for _ = range sink.channel { + } + }() + sink.source.mtx.Lock() + delete(sink.source.sinks, sink) + sink.source.mtx.Unlock() + close(sink.channel) +} diff --git a/services/ws/handler.go b/services/ws/handler.go new file mode 100644 index 0000000000..72291900fa --- /dev/null +++ b/services/ws/handler.go @@ -0,0 +1,235 @@ +package main + +import ( + "context" + "io" + "sync" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/stats" +) + +type handler struct { + Client arvados.Client + PingTimeout time.Duration + QueueSize int + + mtx sync.Mutex + lastDelay map[chan interface{}]stats.Duration + setupOnce sync.Once +} + +type handlerStats struct { + QueueDelayNs time.Duration + WriteDelayNs time.Duration + EventBytes uint64 + EventCount uint64 +} + +func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) { + h.setupOnce.Do(h.setup) + + ctx, cancel := context.WithCancel(ws.Request().Context()) + defer cancel() + log := logger(ctx) + + incoming := eventSource.NewSink() + defer incoming.Stop() + + queue := make(chan interface{}, h.QueueSize) + h.mtx.Lock() + h.lastDelay[queue] = 0 + h.mtx.Unlock() + defer func() { + h.mtx.Lock() + delete(h.lastDelay, queue) + h.mtx.Unlock() + }() + + sess, err := newSession(ws, queue) + if err != nil { + log.WithError(err).Error("newSession failed") + return + } + + // Receive websocket frames from the client and pass them to + // sess.Receive(). + go func() { + buf := make([]byte, 2<<20) + for { + select { + case <-ctx.Done(): + return + default: + } + ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour)) + n, err := ws.Read(buf) + buf := buf[:n] + log.WithField("frame", string(buf[:n])).Debug("received frame") + if err == nil && n == cap(buf) { + err = errFrameTooBig + } + if err != nil { + if err != io.EOF { + log.WithError(err).Info("read error") + } + cancel() + return + } + err = sess.Receive(buf) + if err != nil { + log.WithError(err).Error("sess.Receive() failed") + cancel() + return + } + } + }() + + // Take items from the outgoing queue, serialize them using + // sess.EventMessage() as needed, and send them to the client + // as websocket frames. + go func() { + for { + var ok bool + var data interface{} + select { + case <-ctx.Done(): + return + case data, ok = <-queue: + if !ok { + return + } + } + var e *event + var buf []byte + var err error + log := log + + switch data := data.(type) { + case []byte: + buf = data + case *event: + e = data + log = log.WithField("serial", e.Serial) + buf, err = sess.EventMessage(e) + if err != nil { + log.WithError(err).Error("EventMessage failed") + cancel() + break + } else if len(buf) == 0 { + log.Debug("skip") + continue + } + default: + log.WithField("data", data).Error("bad object in client queue") + continue + } + + log.WithField("frame", string(buf)).Debug("send event") + ws.SetWriteDeadline(time.Now().Add(h.PingTimeout)) + t0 := time.Now() + _, err = ws.Write(buf) + if err != nil { + log.WithError(err).Error("write failed") + cancel() + break + } + log.Debug("sent") + + if e != nil { + hStats.QueueDelayNs += t0.Sub(e.Ready) + h.mtx.Lock() + h.lastDelay[queue] = stats.Duration(time.Since(e.Ready)) + h.mtx.Unlock() + } + hStats.WriteDelayNs += time.Since(t0) + hStats.EventBytes += uint64(len(buf)) + hStats.EventCount++ + } + }() + + // Filter incoming events against the current subscription + // list, and forward matching events to the outgoing message + // queue. Close the queue and return when the request context + // is done/cancelled or the incoming event stream ends. Shut + // down the handler if the outgoing queue fills up. + go func() { + ticker := time.NewTicker(h.PingTimeout) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // If the outgoing queue is empty, + // send an empty message. This can + // help detect a disconnected network + // socket, and prevent an idle socket + // from being closed. + if len(queue) == 0 { + select { + case queue <- []byte(`{}`): + default: + } + } + continue + case e, ok := <-incoming.Channel(): + if !ok { + cancel() + return + } + if !sess.Filter(e) { + continue + } + select { + case queue <- e: + default: + log.WithError(errQueueFull).Error("terminate") + cancel() + return + } + } + } + }() + + <-ctx.Done() + return +} + +func (h *handler) DebugStatus() interface{} { + h.mtx.Lock() + defer h.mtx.Unlock() + + var s struct { + QueueCount int + QueueMin int + QueueMax int + QueueTotal uint64 + QueueDelayMin stats.Duration + QueueDelayMax stats.Duration + } + for q, lastDelay := range h.lastDelay { + s.QueueCount++ + n := len(q) + s.QueueTotal += uint64(n) + if s.QueueMax < n { + s.QueueMax = n + } + if s.QueueMin > n || s.QueueCount == 1 { + s.QueueMin = n + } + if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 { + s.QueueDelayMin = lastDelay + } + if s.QueueDelayMax < lastDelay { + s.QueueDelayMax = lastDelay + } + } + return &s +} + +func (h *handler) setup() { + h.lastDelay = make(map[chan interface{}]stats.Duration) +} diff --git a/services/ws/main.go b/services/ws/main.go new file mode 100644 index 0000000000..7c3625bdad --- /dev/null +++ b/services/ws/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "time" + + "git.curoverse.com/arvados.git/sdk/go/config" + "git.curoverse.com/arvados.git/sdk/go/ctxlog" + "github.com/coreos/go-systemd/daemon" +) + +var logger = ctxlog.FromContext + +func main() { + log := logger(nil) + + configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file") + dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit") + cfg := defaultConfig() + flag.Parse() + + err := config.LoadFile(&cfg, *configPath) + if err != nil { + log.Fatal(err) + } + + ctxlog.SetLevel(cfg.LogLevel) + ctxlog.SetFormat(cfg.LogFormat) + + if *dumpConfig { + txt, err := config.Dump(&cfg) + if err != nil { + log.Fatal(err) + } + fmt.Print(string(txt)) + return + } + + log.Info("started") + eventSource := &pgEventSource{ + DataSource: cfg.Postgres.ConnectionString(), + QueueSize: cfg.ServerEventQueue, + } + srv := &http.Server{ + Addr: cfg.Listen, + ReadTimeout: time.Minute, + WriteTimeout: time.Minute, + MaxHeaderBytes: 1 << 20, + Handler: &router{ + Config: &cfg, + eventSource: eventSource, + newPermChecker: func() permChecker { return newPermChecker(cfg.Client) }, + }, + } + // Bootstrap the eventSource by attaching a dummy subscriber + // and hanging up. + eventSource.NewSink().Stop() + + if _, err := daemon.SdNotify(false, "READY=1"); err != nil { + log.WithError(err).Warn("error notifying init daemon") + } + + log.WithField("Listen", srv.Addr).Info("listening") + log.Fatal(srv.ListenAndServe()) +} diff --git a/services/ws/permission.go b/services/ws/permission.go new file mode 100644 index 0000000000..e467e06720 --- /dev/null +++ b/services/ws/permission.go @@ -0,0 +1,94 @@ +package main + +import ( + "net/http" + "net/url" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +const ( + maxPermCacheAge = time.Hour + minPermCacheAge = 5 * time.Minute +) + +type permChecker interface { + SetToken(token string) + Check(uuid string) (bool, error) +} + +func newPermChecker(ac arvados.Client) permChecker { + ac.AuthToken = "" + return &cachingPermChecker{ + Client: &ac, + cache: make(map[string]cacheEnt), + maxCurrent: 16, + } +} + +type cacheEnt struct { + time.Time + allowed bool +} + +type cachingPermChecker struct { + *arvados.Client + cache map[string]cacheEnt + maxCurrent int +} + +func (pc *cachingPermChecker) SetToken(token string) { + pc.Client.AuthToken = token +} + +func (pc *cachingPermChecker) Check(uuid string) (bool, error) { + logger := logger(nil). + WithField("token", pc.Client.AuthToken). + WithField("uuid", uuid) + pc.tidy() + now := time.Now() + if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge { + logger.WithField("allowed", perm.allowed).Debug("cache hit") + return perm.allowed, nil + } + var buf map[string]interface{} + path, err := pc.PathForUUID("get", uuid) + if err != nil { + return false, err + } + err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{ + "select": {`["uuid"]`}, + }) + + var allowed bool + if err == nil { + allowed = true + } else if txErr, ok := err.(*arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound { + allowed = false + } else if txErr.StatusCode == http.StatusForbidden { + // Some requests are expressly forbidden for reasons + // other than "you aren't allowed to know whether this + // UUID exists" (404). + allowed = false + } else { + logger.WithError(err).Error("lookup error") + return false, err + } + logger.WithField("allowed", allowed).Debug("cache miss") + pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed} + return allowed, nil +} + +func (pc *cachingPermChecker) tidy() { + if len(pc.cache) <= pc.maxCurrent*2 { + return + } + tooOld := time.Now().Add(-minPermCacheAge) + for uuid, t := range pc.cache { + if t.Before(tooOld) { + delete(pc.cache, uuid) + } + } + pc.maxCurrent = len(pc.cache) +} diff --git a/services/ws/router.go b/services/ws/router.go new file mode 100644 index 0000000000..15b825f2ab --- /dev/null +++ b/services/ws/router.go @@ -0,0 +1,140 @@ +package main + +import ( + "encoding/json" + "io" + "net/http" + "strconv" + "sync" + "sync/atomic" + "time" + + "git.curoverse.com/arvados.git/sdk/go/ctxlog" + "github.com/Sirupsen/logrus" + "golang.org/x/net/websocket" +) + +type wsConn interface { + io.ReadWriter + Request() *http.Request + SetReadDeadline(time.Time) error + SetWriteDeadline(time.Time) error +} + +type router struct { + Config *wsConfig + eventSource eventSource + newPermChecker func() permChecker + + handler *handler + mux *http.ServeMux + setupOnce sync.Once + + lastReqID int64 + lastReqMtx sync.Mutex + + status routerDebugStatus +} + +type routerDebugStatus struct { + ReqsReceived int64 + ReqsActive int64 +} + +type debugStatuser interface { + DebugStatus() interface{} +} + +func (rtr *router) setup() { + rtr.handler = &handler{ + PingTimeout: rtr.Config.PingTimeout.Duration(), + QueueSize: rtr.Config.ClientEventQueue, + } + rtr.mux = http.NewServeMux() + rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0)) + rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1)) + rtr.mux.HandleFunc("/debug.json", jsonHandler(rtr.DebugStatus)) + rtr.mux.HandleFunc("/status.json", jsonHandler(rtr.Status)) +} + +func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server { + return &websocket.Server{ + Handshake: func(c *websocket.Config, r *http.Request) error { + return nil + }, + Handler: websocket.Handler(func(ws *websocket.Conn) { + t0 := time.Now() + log := logger(ws.Request().Context()) + log.Info("connected") + + stats := rtr.handler.Handle(ws, rtr.eventSource, + func(ws wsConn, sendq chan<- interface{}) (session, error) { + return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.Config.Client) + }) + + log.WithFields(logrus.Fields{ + "elapsed": time.Now().Sub(t0).Seconds(), + "stats": stats, + }).Info("disconnect") + ws.Close() + }), + } +} + +func (rtr *router) newReqID() string { + rtr.lastReqMtx.Lock() + defer rtr.lastReqMtx.Unlock() + id := time.Now().UnixNano() + if id <= rtr.lastReqID { + id = rtr.lastReqID + 1 + } + return strconv.FormatInt(id, 36) +} + +func (rtr *router) DebugStatus() interface{} { + s := map[string]interface{}{ + "HTTP": rtr.status, + "Outgoing": rtr.handler.DebugStatus(), + } + if es, ok := rtr.eventSource.(debugStatuser); ok { + s["EventSource"] = es.DebugStatus() + } + return s +} + +func (rtr *router) Status() interface{} { + return map[string]interface{}{ + "Clients": atomic.LoadInt64(&rtr.status.ReqsActive), + } +} + +func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + rtr.setupOnce.Do(rtr.setup) + atomic.AddInt64(&rtr.status.ReqsReceived, 1) + atomic.AddInt64(&rtr.status.ReqsActive, 1) + defer atomic.AddInt64(&rtr.status.ReqsActive, -1) + + logger := logger(req.Context()). + WithField("RequestID", rtr.newReqID()) + ctx := ctxlog.Context(req.Context(), logger) + req = req.WithContext(ctx) + logger.WithFields(logrus.Fields{ + "remoteAddr": req.RemoteAddr, + "reqForwardedFor": req.Header.Get("X-Forwarded-For"), + }).Info("accept request") + rtr.mux.ServeHTTP(resp, req) +} + +func jsonHandler(fn func() interface{}) http.HandlerFunc { + return func(resp http.ResponseWriter, req *http.Request) { + logger := logger(req.Context()) + resp.Header().Set("Content-Type", "application/json") + enc := json.NewEncoder(resp) + err := enc.Encode(fn()) + if err != nil { + msg := "encode failed" + logger.WithError(err).Error(msg) + http.Error(resp, msg, http.StatusInternalServerError) + } + } +} diff --git a/services/ws/session.go b/services/ws/session.go new file mode 100644 index 0000000000..67f460865c --- /dev/null +++ b/services/ws/session.go @@ -0,0 +1,33 @@ +package main + +import ( + "database/sql" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +type session interface { + // Receive processes a message received from the client. If a + // non-nil error is returned, the connection will be + // terminated. + Receive([]byte) error + + // Filter returns true if the event should be queued for + // sending to the client. It should return as fast as + // possible, and must not block. + Filter(*event) bool + + // EventMessage encodes the given event (from the front of the + // queue) into a form suitable to send to the client. If a + // non-nil error is returned, the connection is terminated. If + // the returned buffer is empty, nothing is sent to the client + // and the event is not counted in statistics. + // + // Unlike Filter, EventMessage can block without affecting + // other connections. If EventMessage is slow, additional + // incoming events will be queued. If the event queue fills + // up, the connection will be dropped. + EventMessage(*event) ([]byte, error) +} + +type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker, *arvados.Client) (session, error) diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go new file mode 100644 index 0000000000..44e2a1deb5 --- /dev/null +++ b/services/ws/session_v0.go @@ -0,0 +1,295 @@ +package main + +import ( + "database/sql" + "encoding/json" + "errors" + "sync" + "sync/atomic" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/Sirupsen/logrus" +) + +var ( + errQueueFull = errors.New("client queue full") + errFrameTooBig = errors.New("frame too big") + + sendObjectAttributes = []string{"state", "name", "owner_uuid", "portable_data_hash"} + + v0subscribeOK = []byte(`{"status":200}`) + v0subscribeFail = []byte(`{"status":400}`) +) + +type v0session struct { + ac *arvados.Client + ws wsConn + sendq chan<- interface{} + db *sql.DB + permChecker permChecker + subscriptions []v0subscribe + lastMsgID uint64 + log *logrus.Entry + mtx sync.Mutex + setupOnce sync.Once +} + +// newSessionV0 returns a v0 session: a partial port of the Rails/puma +// implementation, with just enough functionality to support Workbench +// and arv-mount. +func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) { + sess := &v0session{ + sendq: sendq, + ws: ws, + db: db, + ac: ac, + permChecker: pc, + log: logger(ws.Request().Context()), + } + + err := ws.Request().ParseForm() + if err != nil { + sess.log.WithError(err).Error("ParseForm failed") + return nil, err + } + token := ws.Request().Form.Get("api_token") + sess.permChecker.SetToken(token) + sess.log.WithField("token", token).Debug("set token") + + return sess, nil +} + +func (sess *v0session) Receive(buf []byte) error { + var sub v0subscribe + if err := json.Unmarshal(buf, &sub); err != nil { + sess.log.WithError(err).Info("invalid message from client") + } else if sub.Method == "subscribe" { + sub.prepare(sess) + sess.log.WithField("sub", sub).Debug("sub prepared") + sess.sendq <- v0subscribeOK + sess.mtx.Lock() + sess.subscriptions = append(sess.subscriptions, sub) + sess.mtx.Unlock() + sub.sendOldEvents(sess) + return nil + } else { + sess.log.WithField("Method", sub.Method).Info("unknown method") + } + sess.sendq <- v0subscribeFail + return nil +} + +func (sess *v0session) EventMessage(e *event) ([]byte, error) { + detail := e.Detail() + if detail == nil { + return nil, nil + } + + ok, err := sess.permChecker.Check(detail.ObjectUUID) + if err != nil || !ok { + return nil, err + } + + kind, _ := sess.ac.KindForUUID(detail.ObjectUUID) + msg := map[string]interface{}{ + "msgID": atomic.AddUint64(&sess.lastMsgID, 1), + "id": detail.ID, + "uuid": detail.UUID, + "object_uuid": detail.ObjectUUID, + "object_owner_uuid": detail.ObjectOwnerUUID, + "object_kind": kind, + "event_type": detail.EventType, + "event_at": detail.EventAt, + } + if detail.Properties != nil && detail.Properties["text"] != nil { + msg["properties"] = detail.Properties + } else { + msgProps := map[string]map[string]interface{}{} + for _, ak := range []string{"old_attributes", "new_attributes"} { + eventAttrs, ok := detail.Properties[ak].(map[string]interface{}) + if !ok { + continue + } + msgAttrs := map[string]interface{}{} + for _, k := range sendObjectAttributes { + if v, ok := eventAttrs[k]; ok { + msgAttrs[k] = v + } + } + msgProps[ak] = msgAttrs + } + msg["properties"] = msgProps + } + return json.Marshal(msg) +} + +func (sess *v0session) Filter(e *event) bool { + sess.mtx.Lock() + defer sess.mtx.Unlock() + for _, sub := range sess.subscriptions { + if sub.match(sess, e) { + return true + } + } + return false +} + +func (sub *v0subscribe) sendOldEvents(sess *v0session) { + if sub.LastLogID == 0 { + return + } + sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents") + // Here we do a "select id" query and queue an event for every + // log since the given ID, then use (*event)Detail() to + // retrieve the whole row and decide whether to send it. This + // approach is very inefficient if the subscriber asks for + // last_log_id==1, even if the filters end up matching very + // few events. + // + // To mitigate this, filter on "created > 10 minutes ago" when + // retrieving the list of old event IDs to consider. + rows, err := sess.db.Query( + `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`, + sub.LastLogID, + time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano)) + if err != nil { + sess.log.WithError(err).Error("db.Query failed") + return + } + for rows.Next() { + var id uint64 + err := rows.Scan(&id) + if err != nil { + sess.log.WithError(err).Error("row Scan failed") + continue + } + for len(sess.sendq)*2 > cap(sess.sendq) { + // Ugly... but if we fill up the whole client + // queue with a backlog of old events, a + // single new event will overflow it and + // terminate the connection, and then the + // client will probably reconnect and do the + // same thing all over again. + time.Sleep(100 * time.Millisecond) + } + now := time.Now() + e := &event{ + LogID: id, + Received: now, + Ready: now, + db: sess.db, + } + if sub.match(sess, e) { + select { + case sess.sendq <- e: + case <-sess.ws.Request().Context().Done(): + return + } + } + } + if err := rows.Err(); err != nil { + sess.log.WithError(err).Error("db.Query failed") + } +} + +type v0subscribe struct { + Method string + Filters []v0filter + LastLogID int64 `json:"last_log_id"` + + funcs []func(*event) bool +} + +type v0filter [3]interface{} + +func (sub *v0subscribe) match(sess *v0session, e *event) bool { + log := sess.log.WithField("LogID", e.LogID) + detail := e.Detail() + if detail == nil { + log.Error("match failed, no detail") + return false + } + log = log.WithField("funcs", len(sub.funcs)) + for i, f := range sub.funcs { + if !f(e) { + log.WithField("func", i).Debug("match failed") + return false + } + } + log.Debug("match passed") + return true +} + +func (sub *v0subscribe) prepare(sess *v0session) { + for _, f := range sub.Filters { + if len(f) != 3 { + continue + } + if col, ok := f[0].(string); ok && col == "event_type" { + op, ok := f[1].(string) + if !ok || op != "in" { + continue + } + arr, ok := f[2].([]interface{}) + if !ok { + continue + } + var strs []string + for _, s := range arr { + if s, ok := s.(string); ok { + strs = append(strs, s) + } + } + sub.funcs = append(sub.funcs, func(e *event) bool { + for _, s := range strs { + if s == e.Detail().EventType { + return true + } + } + return false + }) + } else if ok && col == "created_at" { + op, ok := f[1].(string) + if !ok { + continue + } + tstr, ok := f[2].(string) + if !ok { + continue + } + t, err := time.Parse(time.RFC3339Nano, tstr) + if err != nil { + sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed") + continue + } + var fn func(*event) bool + switch op { + case ">=": + fn = func(e *event) bool { + return !e.Detail().CreatedAt.Before(t) + } + case "<=": + fn = func(e *event) bool { + return !e.Detail().CreatedAt.After(t) + } + case ">": + fn = func(e *event) bool { + return e.Detail().CreatedAt.After(t) + } + case "<": + fn = func(e *event) bool { + return e.Detail().CreatedAt.Before(t) + } + case "=": + fn = func(e *event) bool { + return e.Detail().CreatedAt.Equal(t) + } + default: + sess.log.WithField("operator", op).Info("bogus operator") + continue + } + sub.funcs = append(sub.funcs, fn) + } + } +} diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go new file mode 100644 index 0000000000..71a130346a --- /dev/null +++ b/services/ws/session_v1.go @@ -0,0 +1,14 @@ +package main + +import ( + "database/sql" + "errors" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +// newSessionV1 returns a v1 session -- see +// https://dev.arvados.org/projects/arvados/wiki/Websocket_server +func newSessionV1(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) { + return nil, errors.New("Not implemented") +}