Merge branch '10684-crunch-run-ca-certs' closes #10684
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 16 Dec 2016 19:55:54 +0000 (14:55 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 16 Dec 2016 19:55:54 +0000 (14:55 -0500)
57 files changed:
apps/workbench/app/controllers/container_requests_controller.rb
apps/workbench/app/controllers/pipeline_instances_controller.rb
apps/workbench/app/views/application/_extra_tab_line_buttons.html.erb [new file with mode: 0644]
apps/workbench/app/views/application/_title_and_buttons.html.erb
apps/workbench/app/views/container_requests/_extra_tab_line_buttons.html.erb [new file with mode: 0644]
apps/workbench/config/application.default.yml
apps/workbench/config/routes.rb
apps/workbench/lib/tasks/config_dump.rake [new file with mode: 0644]
apps/workbench/test/controllers/container_requests_controller_test.rb
apps/workbench/test/diagnostics/container_request_test.rb [new file with mode: 0644]
apps/workbench/test/diagnostics/pipeline_test.rb
apps/workbench/test/diagnostics_test_helper.rb
build/run-build-packages.sh
build/run-tests.sh
doc/install/install-ws.html.textile.liquid [new file with mode: 0644]
sdk/cli/test/test_arv-keep-put.rb
sdk/cwl/tests/test_container.py
sdk/go/arvados/client.go
sdk/go/arvados/log.go [new file with mode: 0644]
sdk/go/config/load.go
sdk/go/ctxlog/log.go [new file with mode: 0644]
sdk/go/stats/duration.go [new file with mode: 0644]
sdk/go/stats/duration_test.go [new file with mode: 0644]
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/commands/put.py
sdk/python/arvados/keep.py
sdk/python/setup.py
sdk/python/tests/nginx.conf
sdk/python/tests/run_test_server.py
sdk/python/tests/test_arv_put.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py
sdk/python/tests/test_events.py
services/api/app/models/arvados_model.rb
services/api/app/models/job.rb
services/api/config/application.default.yml
services/api/db/migrate/20161213172944_full_text_search_indexes.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/lib/tasks/config_dump.rake [new file with mode: 0644]
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/command.py
services/fuse/tests/mount_test_base.py
services/fuse/tests/test_retry.py
services/keepstore/logging_router.go
services/ws/arvados-ws.service [new file with mode: 0644]
services/ws/config.go [new file with mode: 0644]
services/ws/doc.go [new file with mode: 0644]
services/ws/event.go [new file with mode: 0644]
services/ws/event_source.go [new file with mode: 0644]
services/ws/handler.go [new file with mode: 0644]
services/ws/main.go [new file with mode: 0644]
services/ws/permission.go [new file with mode: 0644]
services/ws/router.go [new file with mode: 0644]
services/ws/session.go [new file with mode: 0644]
services/ws/session_v0.go [new file with mode: 0644]
services/ws/session_v1.go [new file with mode: 0644]

index b67d100887c838917ee4a2fc6ba8ac2871893cd3..b286a9456e14e3a2538df122c31e6071fab33558 100644 (file)
@@ -59,4 +59,32 @@ class ContainerRequestsController < ApplicationController
     end
   end
 
+  def copy
+    src = @object
+
+    @object = ContainerRequest.new
+
+    @object.command = src.command
+    @object.container_image = src.container_image
+    @object.cwd = src.cwd
+    @object.description = src.description
+    @object.environment = src.environment
+    @object.mounts = src.mounts
+    @object.name = src.name
+    @object.output_path = src.output_path
+    @object.priority = 1
+    @object.properties[:template_uuid] = src.properties[:template_uuid]
+    @object.runtime_constraints = src.runtime_constraints
+    @object.scheduling_parameters = src.scheduling_parameters
+    @object.state = 'Uncommitted'
+    @object.use_existing = false
+
+    # set owner_uuid to that of source, provided it is a project and writable by current user
+    current_project = Group.find(src.owner_uuid) rescue nil
+    if (current_project && current_project.writable_by.andand.include?(current_user.uuid))
+      @object.owner_uuid = src.owner_uuid
+    end
+
+    super
+  end
 end
index c5fbda0cf349177801a0bcbbd75c7c95634b56ef..83fe0dda4645a0437a962aa95e9572c9c897afe2 100644 (file)
@@ -53,7 +53,7 @@ class PipelineInstancesController < ApplicationController
     end
     @object.state = 'New'
 
-    # set owner_uuid to that of source, provided it is a project and wriable by current user
+    # set owner_uuid to that of source, provided it is a project and writable by current user
     current_project = Group.find(source.owner_uuid) rescue nil
     if (current_project && current_project.writable_by.andand.include?(current_user.uuid))
       @object.owner_uuid = source.owner_uuid
diff --git a/apps/workbench/app/views/application/_extra_tab_line_buttons.html.erb b/apps/workbench/app/views/application/_extra_tab_line_buttons.html.erb
new file mode 100644 (file)
index 0000000..e69de29
index 398f248a39bc478b016b64537a310a6b578bb9b8..46a949aa12d083ac1baeb1138e5395030889dd77 100644 (file)
@@ -13,6 +13,9 @@
 
 <% if @object.class.goes_in_projects? && @object.uuid != current_user.andand.uuid # Not the "Home" project %>
   <% content_for :tab_line_buttons do %>
+    <% if current_user.andand.is_active %>
+      <%= render partial: 'extra_tab_line_buttons' %>
+    <% end %>
     <% if current_user.andand.is_active && @object.class.copies_to_projects? %>
       <%= link_to(
           choose_projects_path(
diff --git a/apps/workbench/app/views/container_requests/_extra_tab_line_buttons.html.erb b/apps/workbench/app/views/container_requests/_extra_tab_line_buttons.html.erb
new file mode 100644 (file)
index 0000000..662309f
--- /dev/null
@@ -0,0 +1,10 @@
+<% if @object.state == 'Final' %>
+  <%= link_to(copy_container_request_path('id' => @object.uuid),
+      class: 'btn btn-primary',
+      title: 'Re-run',
+      data: {toggle: :tooltip, placement: :top}, title: 'This will make a copy and take you there. You can then make any needed changes and run it',
+      method: :post,
+      ) do %>
+    <i class="fa fa-fw fa-play"></i> Re-run
+  <% end %>
+<% end %>
index e4e27829866a0748558efdd639496b4a420fb510..c2dcba80942a68d5107918a81aa723de5d24db9e 100644 (file)
@@ -26,6 +26,11 @@ diagnostics:
     pipeline_2:
       template_uuid: zzzzz-p5p6p-1xbobfobk94ppbv
       input_paths: [zzzzz-4zz18-nz98douzhaa3jh2, zzzzz-4zz18-gpw9o5wpcti3nib]
+  container_requests_to_test:
+    container_request_1:
+      workflow_uuid: zzzzz-7fd4e-60e96shgwspt4mw
+      input_paths: []
+      max_wait_seconds: 10
 
 # Below is a sample setting for performance testing.
 # Configure workbench URL as "arvados_workbench_url"
index 7c2312c1cee57c7ffef86ad300539d8c575db5bf..21cb7c40bc7755c22dc026a9f23c917369614149 100644 (file)
@@ -26,6 +26,7 @@ ArvadosWorkbench::Application.routes.draw do
   resources :containers
   resources :container_requests do
     post 'cancel', :on => :member
+    post 'copy', on: :member
   end
   get '/virtual_machines/:id/webshell/:login' => 'virtual_machines#webshell', :as => :webshell_virtual_machine
   resources :authorized_keys
diff --git a/apps/workbench/lib/tasks/config_dump.rake b/apps/workbench/lib/tasks/config_dump.rake
new file mode 100644 (file)
index 0000000..c7e0214
--- /dev/null
@@ -0,0 +1,6 @@
+namespace :config do
+  desc 'Show site configuration'
+  task dump: :environment do
+    puts $application_config.to_yaml
+  end
+end
index 8dbbbd07c11d5add20dc91f2402ed015e8d43d2a..70e042cd3dcad21a38de4b639343bef6ab7c5098 100644 (file)
@@ -29,4 +29,31 @@ class ContainerRequestsControllerTest < ActionController::TestCase
     assert_includes @response.body, '<div id="event_log_div"'
     assert_select 'Download the log', false
   end
+
+  test "completed container request offers re-run option" do
+    use_token 'active'
+
+    uuid = api_fixture('container_requests')['completed']['uuid']
+
+    get :show, {id: uuid}, session_for(:active)
+    assert_response :success
+
+   assert_includes @response.body, "href=\"/container_requests/#{uuid}/copy\""
+  end
+
+  test "container request copy" do
+    completed_cr = api_fixture('container_requests')['completed']
+    post(:copy,
+         {
+           id: completed_cr['uuid']
+         },
+         session_for(:active))
+    assert_response 302
+    copied_cr = assigns(:object)
+    assert_not_nil copied_cr
+    assert_equal 'Uncommitted', copied_cr[:state]
+    assert_equal "Copy of #{completed_cr['name']}", copied_cr['name']
+    assert_equal completed_cr['cmd'], copied_cr['cmd']
+    assert_equal completed_cr['runtime_constraints']['ram'], copied_cr['runtime_constraints'][:ram]
+  end
 end
diff --git a/apps/workbench/test/diagnostics/container_request_test.rb b/apps/workbench/test/diagnostics/container_request_test.rb
new file mode 100644 (file)
index 0000000..257159a
--- /dev/null
@@ -0,0 +1,49 @@
+require 'diagnostics_test_helper'
+
+# This test assumes that the configured workflow_uuid corresponds to a cwl workflow.
+# Ex: configure a workflow using the steps below and use the resulting workflow uuid:
+#   > cd arvados/doc/user/cwl/bwa-mem
+#   > arvados-cwl-runner --create-workflow bwa-mem.cwl bwa-mem-input.yml
+
+class ContainerRequestTest < DiagnosticsTest
+  crs_to_test = Rails.configuration.container_requests_to_test.andand.keys
+
+  setup do
+    need_selenium 'to make websockets work'
+  end
+
+  crs_to_test.andand.each do |cr_to_test|
+    test "run container_request: #{cr_to_test}" do
+      cr_config = Rails.configuration.container_requests_to_test[cr_to_test]
+
+      visit_page_with_token 'active'
+
+      find('.btn', text: 'Run a process').click
+
+      within('.modal-dialog') do
+        page.find_field('Search').set cr_config['workflow_uuid']
+        wait_for_ajax
+        find('.selectable', text: 'bwa-mem.cwl').click
+        find('.btn', text: 'Next: choose inputs').click
+      end
+
+      page.assert_selector('a.disabled,button.disabled', text: 'Run') if cr_config['input_paths'].any?
+
+      # Choose input for the workflow
+      cr_config['input_paths'].each do |look_for|
+        select_input look_for
+      end
+      wait_for_ajax
+
+      # All needed input are already filled in. Run this workflow now
+      page.assert_no_selector('a.disabled,button.disabled', text: 'Run')
+      find('a,button', text: 'Run').click
+
+      # container_request is running. Run button is no longer available.
+      page.assert_no_selector('a', text: 'Run')
+
+      # Wait for container_request run to complete
+      wait_until_page_has 'completed', cr_config['max_wait_seconds']
+    end
+  end
+end
index d038222cf0cf58278818bd087e288a4e1c11b52c..11d0e42629af4593d59c41ffddba31535fdce6f2 100644 (file)
@@ -42,54 +42,11 @@ class PipelineTest < DiagnosticsTest
       find('a,button', text: 'Components').click
       find('a,button', text: 'Run').click
 
-      # Pipeline is running. We have a "Stop" button instead now.
+      # Pipeline is running. We have a "Pause" button instead now.
       page.assert_selector 'a,button', text: 'Pause'
 
       # Wait for pipeline run to complete
       wait_until_page_has 'completed', pipeline_config['max_wait_seconds']
     end
   end
-
-  def select_input look_for
-    inputs_needed = page.all('.btn', text: 'Choose')
-    return if (!inputs_needed || !inputs_needed.any?)
-
-    look_for_uuid = nil
-    look_for_file = nil
-    if look_for.andand.index('/').andand.>0
-      partitions = look_for.partition('/')
-      look_for_uuid = partitions[0]
-      look_for_file = partitions[2]
-    else
-      look_for_uuid = look_for
-      look_for_file = nil
-    end
-
-    assert_triggers_dom_event 'shown.bs.modal' do
-      inputs_needed[0].click
-    end
-
-    within('.modal-dialog') do
-      if look_for_uuid
-        fill_in('Search', with: look_for_uuid, exact: true)
-        wait_for_ajax
-      end
-             
-      page.all('.selectable').first.click
-      wait_for_ajax
-      # ajax reload is wiping out input selection after search results; so, select again.
-      page.all('.selectable').first.click
-      wait_for_ajax
-
-      if look_for_file
-        wait_for_ajax
-        within('.collection_files_name', text: look_for_file) do
-          find('.fa-file').click
-        end
-      end
-      
-      find('button', text: 'OK').click
-      wait_for_ajax
-    end
-  end
 end
index 3587721edae7bc6e96778efceaaedfadddbeacd3..46b961ae11923e7db4b94e4e78215c407390099c 100644 (file)
@@ -21,6 +21,49 @@ class DiagnosticsTest < ActionDispatch::IntegrationTest
     visit page_with_token(tokens[token_name], (workbench_url + path))
   end
 
+  def select_input look_for
+    inputs_needed = page.all('.btn', text: 'Choose')
+    return if (!inputs_needed || !inputs_needed.any?)
+
+    look_for_uuid = nil
+    look_for_file = nil
+    if look_for.andand.index('/').andand.>0
+      partitions = look_for.partition('/')
+      look_for_uuid = partitions[0]
+      look_for_file = partitions[2]
+    else
+      look_for_uuid = look_for
+      look_for_file = nil
+    end
+
+    assert_triggers_dom_event 'shown.bs.modal' do
+      inputs_needed[0].click
+    end
+
+    within('.modal-dialog') do
+      if look_for_uuid
+        fill_in('Search', with: look_for_uuid, exact: true)
+        wait_for_ajax
+      end
+
+      page.all('.selectable').first.click
+      wait_for_ajax
+      # ajax reload is wiping out input selection after search results; so, select again.
+      page.all('.selectable').first.click
+      wait_for_ajax
+
+      if look_for_file
+        wait_for_ajax
+        within('.collection_files_name', text: look_for_file) do
+          find('.fa-file').click
+        end
+      end
+
+      find('button', text: 'OK').click
+      wait_for_ajax
+    end
+  end
+
   # Looks for the text_to_look_for for up to the max_time provided
   def wait_until_page_has text_to_look_for, max_time=30
     max_time = 30 if (!max_time || (max_time.to_s != max_time.to_i.to_s))
index 6f9aca98b424e1c975ff244e48b2904c2f783df1..ddd0124c70abf913db63ffde06d22b3b07bfeda7 100755 (executable)
@@ -432,6 +432,8 @@ package_go_binary services/keepstore keepstore \
     "Keep storage daemon, accessible to clients on the LAN"
 package_go_binary services/keep-web keep-web \
     "Static web hosting service for user data stored in Arvados Keep"
+package_go_binary services/ws arvados-ws \
+    "Arvados Websocket server"
 package_go_binary tools/keep-block-check keep-block-check \
     "Verify that all data from one set of Keep servers to another was copied"
 package_go_binary tools/keep-rsync keep-rsync \
index 560a6933e8afe63e48f963dfa8c990a2da2b9897..e0e1ce28524327851e6925d26284530ced1dd5a0 100755 (executable)
@@ -79,6 +79,7 @@ services/nodemanager
 services/crunch-run
 services/crunch-dispatch-local
 services/crunch-dispatch-slurm
+services/ws
 sdk/cli
 sdk/pam
 sdk/python
@@ -90,6 +91,7 @@ sdk/go/httpserver
 sdk/go/manifest
 sdk/go/blockdigest
 sdk/go/streamer
+sdk/go/stats
 sdk/go/crunchrunner
 sdk/cwl
 tools/crunchstat-summary
@@ -270,15 +272,18 @@ start_api() {
         && eval $(python sdk/python/tests/run_test_server.py start --auth admin) \
         && export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
         && export ARVADOS_TEST_API_INSTALLED="$$" \
+        && python sdk/python/tests/run_test_server.py start_ws \
+        && python sdk/python/tests/run_test_server.py start_nginx \
         && (env | egrep ^ARVADOS)
 }
 
 start_nginx_proxy_services() {
-    echo 'Starting keepproxy, keep-web, arv-git-httpd, and nginx ssl proxy...'
+    echo 'Starting keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
     cd "$WORKSPACE" \
         && python sdk/python/tests/run_test_server.py start_keep_proxy \
         && python sdk/python/tests/run_test_server.py start_keep-web \
         && python sdk/python/tests/run_test_server.py start_arv-git-httpd \
+        && python sdk/python/tests/run_test_server.py start_ws \
         && python sdk/python/tests/run_test_server.py start_nginx \
         && export ARVADOS_TEST_PROXY_SERVICES=1
 }
@@ -289,12 +294,15 @@ stop_services() {
         cd "$WORKSPACE" \
             && python sdk/python/tests/run_test_server.py stop_nginx \
             && python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
+            && python sdk/python/tests/run_test_server.py stop_ws \
             && python sdk/python/tests/run_test_server.py stop_keep-web \
             && python sdk/python/tests/run_test_server.py stop_keep_proxy
     fi
     if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
         unset ARVADOS_TEST_API_HOST
         cd "$WORKSPACE" \
+            && python sdk/python/tests/run_test_server.py stop_nginx \
+            && python sdk/python/tests/run_test_server.py stop_ws \
             && python sdk/python/tests/run_test_server.py stop
     fi
 }
@@ -756,6 +764,7 @@ gostuff=(
     sdk/go/manifest
     sdk/go/streamer
     sdk/go/crunchrunner
+    sdk/go/stats
     lib/crunchstat
     services/arv-git-httpd
     services/crunchstat
@@ -767,6 +776,7 @@ gostuff=(
     services/crunch-dispatch-local
     services/crunch-dispatch-slurm
     services/crunch-run
+    services/ws
     tools/keep-block-check
     tools/keep-exercise
     tools/keep-rsync
diff --git a/doc/install/install-ws.html.textile.liquid b/doc/install/install-ws.html.textile.liquid
new file mode 100644 (file)
index 0000000..a36a59a
--- /dev/null
@@ -0,0 +1,204 @@
+---
+layout: default
+navsection: installguide
+title: Install the websocket server
+...
+
+{% include 'notebox_begin_warning' %}
+
+This websocket server is an alternative to the puma server that comes with the API server. It is available as an *experimental pre-release* and is not recommended for production sites.
+
+{% include 'notebox_end' %}
+
+The arvados-ws server provides event notifications to websocket clients. It can be installed anywhere with access to Postgres database and the Arvados API server, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for additional information.
+
+By convention, we use the following hostname for the websocket service.
+
+<notextile>
+<pre><code>ws.<span class="userinput">uuid_prefix.your.domain</span></code></pre>
+</notextile>
+
+The above hostname should resolve from anywhere on the internet.
+
+h2. Install arvados-ws
+
+Typically arvados-ws runs on the same host as the API server.
+
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install arvados-ws</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install arvados-ws</span>
+</code></pre>
+</notextile>
+
+Verify that @arvados-ws@ is functional:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arvados-ws -h</span>
+Usage of arvados-ws:
+  -config path
+        path to config file (default "/etc/arvados/ws/ws.yml")
+  -dump-config
+        show current configuration and exit
+</code></pre>
+</notextile>
+
+h3. Create a configuration file
+
+Create @/etc/arvados/ws/ws.yml@ using the following template. Replace @xxxxxxxx@ with the "password you generated during database setup":install-postgresql.html#api.
+
+<notextile>
+<pre><code>Client:
+  APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
+Listen: ":<span class="userinput">9003</span>"
+Postgres:
+  dbname: arvados_production
+  host: localhost
+  password: <span class="userinput">xxxxxxxx</span>
+  user: arvados
+</code></pre>
+</notextile>
+
+h3. Start the service (option 1: systemd)
+
+If your system does not use systemd, skip this section and follow the "runit instructions":#runit instead.
+
+If your system uses systemd, the arvados-ws service should already be set up. Start it and check its status:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo systemctl restart arvados-ws</span>
+~$ <span class="userinput">sudo systemctl status arvados-ws</span>
+&#x25cf; arvados-ws.service - Arvados websocket server
+   Loaded: loaded (/lib/systemd/system/arvados-ws.service; enabled)
+   Active: active (running) since Tue 2016-12-06 11:20:48 EST; 10s ago
+     Docs: https://doc.arvados.org/
+ Main PID: 9421 (arvados-ws)
+   CGroup: /system.slice/arvados-ws.service
+           â””─9421 /usr/bin/arvados-ws
+
+Dec 06 11:20:48 zzzzz arvados-ws[9421]: {"level":"info","msg":"started","time":"2016-12-06T11:20:48.207617188-05:00"}
+Dec 06 11:20:48 zzzzz arvados-ws[9421]: {"Listen":":9003","level":"info","msg":"listening","time":"2016-12-06T11:20:48.244956506-05:00"}
+Dec 06 11:20:48 zzzzz systemd[1]: Started Arvados websocket server.
+</code></pre>
+</notextile>
+
+If it is not running, use @journalctl@ to check logs for errors:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo journalctl -n10 -u arvados-ws</span>
+...
+Dec 06 11:12:48 zzzzz systemd[1]: Starting Arvados websocket server...
+Dec 06 11:12:48 zzzzz arvados-ws[8918]: {"level":"info","msg":"started","time":"2016-12-06T11:12:48.030496636-05:00"}
+Dec 06 11:12:48 zzzzz arvados-ws[8918]: {"error":"pq: password authentication failed for user \"arvados\"","level":"fatal","msg":"db.Ping failed","time":"2016-12-06T11:12:48.058206400-05:00"}
+</code></pre>
+</notextile>
+
+Skip ahead to "confirm the service is working":#confirm.
+
+h3(#runit). Start the service (option 2: runit)
+
+Install runit to supervise the arvados-ws daemon.  {% include 'install_runit' %}
+
+Create a supervised service.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo mkdir /etc/service/arvados-ws</span>
+~$ <span class="userinput">cd /etc/service/arvados-ws</span>
+~$ <span class="userinput">sudo mkdir log log/main</span>
+~$ <span class="userinput">printf '#!/bin/sh\nexec arvados-ws 2>&1\n' | sudo tee run</span>
+~$ <span class="userinput">printf '#!/bin/sh\nexec svlogd main\n' | sudo tee log/run</span>
+~$ <span class="userinput">sudo chmod +x run log/run</span>
+~$ <span class="userinput">sudo sv exit .</span>
+~$ <span class="userinput">cd -</span>
+</code></pre>
+</notextile>
+
+Use @sv stat@ and check the log file to verify the service is running.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sv stat /etc/service/arvados-ws</span>
+run: /etc/service/arvados-ws: (pid 12520) 2s; run: log: (pid 12519) 2s
+~$ <span class="userinput">tail /etc/service/arvados-ws/log/main/current</span>
+{"level":"info","msg":"started","time":"2016-12-06T11:56:20.669171449-05:00"}
+{"Listen":":9003","level":"info","msg":"listening","time":"2016-12-06T11:56:20.708847627-05:00"}
+</code></pre>
+</notextile>
+
+h3(#confirm). Confirm the service is working
+
+Confirm the service is listening on its assigned port and responding to requests.
+
+<notextile>
+<pre><code>~$ <span class="userinput">curl http://0.0.0.0:<b>9003</b>/status.json</span>
+{"Clients":1}
+</code></pre>
+</notextile>
+
+h3. Set up a reverse proxy with SSL support
+
+The arvados-ws service will be accessible from anywhere on the internet, so we recommend using SSL for transport encryption.
+
+This is best achieved by putting a reverse proxy with SSL support in front of arvados-ws, running on port 443 and passing requests to arvados-ws on port 9003 (or whatever port you chose in your configuration file).
+
+For example, using Nginx:
+
+<notextile><pre>
+upstream arvados-ws {
+  server                127.0.0.1:<span class="userinput">9003</span>;
+}
+
+server {
+  listen                <span class="userinput">[your public IP address]</span>:443 ssl;
+  server_name           ws.<span class="userinput">uuid_prefix.your.domain</span>;
+
+  proxy_connect_timeout 90s;
+  proxy_read_timeout    300s;
+
+  ssl                   on;
+  ssl_certificate       <span class="userinput"/>YOUR/PATH/TO/cert.pem</span>;
+  ssl_certificate_key   <span class="userinput"/>YOUR/PATH/TO/cert.key</span>;
+
+  location / {
+    proxy_pass          http://arvados-ws;
+    proxy_set_header    Upgrade         $http_upgrade;
+    proxy_set_header    Connection      "upgrade";
+    proxy_set_header    Host            $host;
+    proxy_set_header    X-Forwarded-For $proxy_add_x_forwarded_for;
+  }
+}
+</pre></notextile>
+
+If Nginx is already configured to proxy @ws@ requests to puma, move that configuration out of the way or change its @server_name@ so it doesn't conflict.
+
+h3. Update API server configuration
+
+Ensure the websocket server address is correct in the API server configuration file @/etc/arvados/api/application.yml@.
+
+<notextile>
+<pre><code>websocket_address: wss://ws.<span class="userinput">uuid_prefix.your.domain</span>/websocket
+</code></pre>
+</notextile>
+
+Restart Nginx to reload the API server configuration.
+
+<notextile>
+<pre><code>$ sudo nginx -s reload</span>
+</code></pre>
+</notextile>
+
+h3. Verify DNS and proxy setup
+
+Use a host elsewhere on the Internet to confirm that your DNS, proxy, and SSL are configured correctly.
+
+<notextile>
+<pre><code>$ <span class="userinput">curl https://ws.<b>uuid_prefix.your.domain</b>/status.json</span>
+{"Clients":1}
+</code></pre>
+</notextile>
index fefbc2729875e70cb890f69d56fe1d7f1c614b8d..e6ead25b807e70eb03d293bc9d3a92aad2a78c7b 100644 (file)
@@ -40,7 +40,7 @@ class TestArvKeepPut < Minitest::Test
 
   def test_raw_file
     out, err = capture_subprocess_io do
-      assert arv_put('--raw', './tmp/foo')
+      assert arv_put('--no-cache', '--raw', './tmp/foo')
     end
     $stderr.write err
     assert_match '', err
@@ -87,7 +87,7 @@ class TestArvKeepPut < Minitest::Test
 
   def test_as_stream
     out, err = capture_subprocess_io do
-      assert arv_put('--as-stream', './tmp/foo')
+      assert arv_put('--no-cache', '--as-stream', './tmp/foo')
     end
     $stderr.write err
     assert_match '', err
@@ -96,7 +96,7 @@ class TestArvKeepPut < Minitest::Test
 
   def test_progress
     out, err = capture_subprocess_io do
-      assert arv_put('--manifest', '--progress', './tmp/foo')
+      assert arv_put('--no-cache', '--manifest', '--progress', './tmp/foo')
     end
     assert_match /%/, err
     assert match_collection_uuid(out)
@@ -104,7 +104,7 @@ class TestArvKeepPut < Minitest::Test
 
   def test_batch_progress
     out, err = capture_subprocess_io do
-      assert arv_put('--manifest', '--batch-progress', './tmp/foo')
+      assert arv_put('--no-cache', '--manifest', '--batch-progress', './tmp/foo')
     end
     assert_match /: 0 written 3 total/, err
     assert_match /: 3 written 3 total/, err
index bb661550da45ae3b8fe0be1c3d5f78e69bdaabe6..0394988ccd166df7ddb00e36a04f9bbc4d7b4a1f 100644 (file)
@@ -8,7 +8,7 @@ import functools
 import cwltool.process
 from schema_salad.ref_resolver import Loader
 
-from schema_salad.ref_resolver import Loader
+from .matcher import JsonDiffMatcher
 
 if not os.getenv('ARVADOS_DEBUG'):
     logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
@@ -48,7 +48,7 @@ class TestContainer(unittest.TestCase):
                                  make_fs_access=make_fs_access, tmpdir="/tmp"):
                 j.run(enable_reuse=enable_reuse)
                 runner.api.container_requests().create.assert_called_with(
-                    body={
+                    body=JsonDiffMatcher({
                         'environment': {
                             'HOME': '/var/spool/cwl',
                             'TMPDIR': '/tmp'
@@ -69,8 +69,9 @@ class TestContainer(unittest.TestCase):
                         'container_image': '99999999999999999999999999999993+99',
                         'command': ['ls', '/var/spool/cwl'],
                         'cwd': '/var/spool/cwl',
-                        'scheduling_parameters': {}
-                    })
+                        'scheduling_parameters': {},
+                        'properties': {},
+                    }))
 
     # The test passes some fields in builder.resources
     # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@@ -141,7 +142,8 @@ class TestContainer(unittest.TestCase):
                 'cwd': '/var/spool/cwl',
                 'scheduling_parameters': {
                     'partitions': ['blurb']
-                }
+                },
+                'properties': {}
         }
 
         call_body = call_kwargs.get('body', None)
index 36f4eb52ae298982dfa09ddf82b0cea08c2604f7..fc937494e5a679728aea021dbe80a95231582d72 100644 (file)
@@ -41,6 +41,8 @@ type Client struct {
        // callers who use a Client to initialize an
        // arvadosclient.ArvadosClient.)
        KeepServiceURIs []string `json:",omitempty"`
+
+       dd *DiscoveryDocument
 }
 
 // The default http.Client used by a Client with Insecure==true and
@@ -198,14 +200,103 @@ func (c *Client) apiURL(path string) string {
 
 // DiscoveryDocument is the Arvados server's description of itself.
 type DiscoveryDocument struct {
-       DefaultCollectionReplication int   `json:"defaultCollectionReplication"`
-       BlobSignatureTTL             int64 `json:"blobSignatureTtl"`
+       BasePath                     string              `json:"basePath"`
+       DefaultCollectionReplication int                 `json:"defaultCollectionReplication"`
+       BlobSignatureTTL             int64               `json:"blobSignatureTtl"`
+       Schemas                      map[string]Schema   `json:"schemas"`
+       Resources                    map[string]Resource `json:"resources"`
+}
+
+type Resource struct {
+       Methods map[string]ResourceMethod `json:"methods"`
+}
+
+type ResourceMethod struct {
+       HTTPMethod string         `json:"httpMethod"`
+       Path       string         `json:"path"`
+       Response   MethodResponse `json:"response"`
+}
+
+type MethodResponse struct {
+       Ref string `json:"$ref"`
+}
+
+type Schema struct {
+       UUIDPrefix string `json:"uuidPrefix"`
 }
 
 // DiscoveryDocument returns a *DiscoveryDocument. The returned object
 // should not be modified: the same object may be returned by
 // subsequent calls.
 func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+       if c.dd != nil {
+               return c.dd, nil
+       }
        var dd DiscoveryDocument
-       return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+       err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+       if err != nil {
+               return nil, err
+       }
+       c.dd = &dd
+       return c.dd, nil
+}
+
+func (c *Client) modelForUUID(dd *DiscoveryDocument, uuid string) (string, error) {
+       if len(uuid) != 27 {
+               return "", fmt.Errorf("invalid UUID: %q", uuid)
+       }
+       infix := uuid[6:11]
+       var model string
+       for m, s := range dd.Schemas {
+               if s.UUIDPrefix == infix {
+                       model = m
+                       break
+               }
+       }
+       if model == "" {
+               return "", fmt.Errorf("unrecognized type portion %q in UUID %q", infix, uuid)
+       }
+       return model, nil
+}
+
+func (c *Client) KindForUUID(uuid string) (string, error) {
+       dd, err := c.DiscoveryDocument()
+       if err != nil {
+               return "", err
+       }
+       model, err := c.modelForUUID(dd, uuid)
+       if err != nil {
+               return "", err
+       }
+       return "arvados#" + strings.ToLower(model[:1]) + model[1:], nil
+}
+
+func (c *Client) PathForUUID(method, uuid string) (string, error) {
+       dd, err := c.DiscoveryDocument()
+       if err != nil {
+               return "", err
+       }
+       model, err := c.modelForUUID(dd, uuid)
+       if err != nil {
+               return "", err
+       }
+       var resource string
+       for r, rsc := range dd.Resources {
+               if rsc.Methods["get"].Response.Ref == model {
+                       resource = r
+                       break
+               }
+       }
+       if resource == "" {
+               return "", fmt.Errorf("no resource for model: %q", model)
+       }
+       m, ok := dd.Resources[resource].Methods[method]
+       if !ok {
+               return "", fmt.Errorf("no method %q for resource %q", method, resource)
+       }
+       path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1)
+       if path[0] == '/' {
+               path = path[1:]
+       }
+       return path, nil
 }
diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go
new file mode 100644 (file)
index 0000000..a48f1c6
--- /dev/null
@@ -0,0 +1,25 @@
+package arvados
+
+import (
+       "time"
+)
+
+// Log is an arvados#log record
+type Log struct {
+       ID              uint64                 `json:"id"`
+       UUID            string                 `json:"uuid"`
+       ObjectUUID      string                 `json:"object_uuid"`
+       ObjectOwnerUUID string                 `json:"object_owner_uuid"`
+       EventType       string                 `json:"event_type"`
+       EventAt         *time.Time             `json:"event,omitempty"`
+       Properties      map[string]interface{} `json:"properties"`
+       CreatedAt       *time.Time             `json:"created_at,omitempty"`
+}
+
+// LogList is an arvados#logList resource.
+type LogList struct {
+       Items          []Log `json:"items"`
+       ItemsAvailable int   `json:"items_available"`
+       Offset         int   `json:"offset"`
+       Limit          int   `json:"limit"`
+}
index 9c65d65e84a57d9120d69dd84912615ff3949e35..2bbb440fb31211241a78a6f15c788f7e4d706334 100644 (file)
@@ -22,3 +22,8 @@ func LoadFile(cfg interface{}, configPath string) error {
        }
        return nil
 }
+
+// Dump returns a YAML representation of cfg.
+func Dump(cfg interface{}) ([]byte, error) {
+       return yaml.Marshal(cfg)
+}
diff --git a/sdk/go/ctxlog/log.go b/sdk/go/ctxlog/log.go
new file mode 100644 (file)
index 0000000..6565c88
--- /dev/null
@@ -0,0 +1,59 @@
+package ctxlog
+
+import (
+       "context"
+
+       "github.com/Sirupsen/logrus"
+)
+
+var (
+       loggerCtxKey = new(int)
+       rootLogger   = logrus.New()
+)
+
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
+// Context returns a new child context such that FromContext(child)
+// returns the given logger.
+func Context(ctx context.Context, logger *logrus.Entry) context.Context {
+       return context.WithValue(ctx, loggerCtxKey, logger)
+}
+
+// FromContext returns the logger suitable for the given context -- the one
+// attached by contextWithLogger() if applicable, otherwise the
+// top-level logger with no fields/values.
+func FromContext(ctx context.Context) *logrus.Entry {
+       if ctx != nil {
+               if logger, ok := ctx.Value(loggerCtxKey).(*logrus.Entry); ok {
+                       return logger
+               }
+       }
+       return rootLogger.WithFields(nil)
+}
+
+// SetLevel sets the current logging level. See logrus for level
+// names.
+func SetLevel(level string) {
+       lvl, err := logrus.ParseLevel(level)
+       if err != nil {
+               logrus.Fatal(err)
+       }
+       rootLogger.Level = lvl
+}
+
+// SetFormat sets the current logging format to "json" or "text".
+func SetFormat(format string) {
+       switch format {
+       case "text":
+               rootLogger.Formatter = &logrus.TextFormatter{
+                       FullTimestamp:   true,
+                       TimestampFormat: rfc3339NanoFixed,
+               }
+       case "json":
+               rootLogger.Formatter = &logrus.JSONFormatter{
+                       TimestampFormat: rfc3339NanoFixed,
+               }
+       default:
+               logrus.WithField("LogFormat", format).Fatal("unknown log format")
+       }
+}
diff --git a/sdk/go/stats/duration.go b/sdk/go/stats/duration.go
new file mode 100644 (file)
index 0000000..103dea0
--- /dev/null
@@ -0,0 +1,35 @@
+package stats
+
+import (
+       "fmt"
+       "strconv"
+       "time"
+)
+
+// Duration is a duration that is displayed as a number of seconds in
+// fixed-point notation.
+type Duration time.Duration
+
+// MarshalJSON implements json.Marshaler.
+func (d Duration) MarshalJSON() ([]byte, error) {
+       return []byte(d.String()), nil
+}
+
+// String implements fmt.Stringer.
+func (d Duration) String() string {
+       return fmt.Sprintf("%.6f", time.Duration(d).Seconds())
+}
+
+// UnmarshalJSON implements json.Unmarshaler
+func (d *Duration) UnmarshalJSON(data []byte) error {
+       return d.Set(string(data))
+}
+
+// Value implements flag.Value
+func (d *Duration) Set(s string) error {
+       sec, err := strconv.ParseFloat(s, 64)
+       if err == nil {
+               *d = Duration(sec * float64(time.Second))
+       }
+       return err
+}
diff --git a/sdk/go/stats/duration_test.go b/sdk/go/stats/duration_test.go
new file mode 100644 (file)
index 0000000..730e646
--- /dev/null
@@ -0,0 +1,23 @@
+package stats
+
+import (
+       "testing"
+       "time"
+)
+
+func TestString(t *testing.T) {
+       d := Duration(123123123123 * time.Nanosecond)
+       if s, expect := d.String(), "123.123123"; s != expect {
+               t.Errorf("got %s, expect %s", s, expect)
+       }
+}
+
+func TestSet(t *testing.T) {
+       var d Duration
+       if err := d.Set("123.456"); err != nil {
+               t.Fatal(err)
+       }
+       if got, expect := time.Duration(d).Nanoseconds(), int64(123456000000); got != expect {
+               t.Errorf("got %d, expect %d", got, expect)
+       }
+}
index 517d617d8c4f8403953b5d0b105808e0bd18ac0d..4cc2591ebb25034d0145de40c11f6638e3973864 100644 (file)
@@ -759,6 +759,14 @@ class ArvadosFile(object):
     def writable(self):
         return self.parent.writable()
 
+    @synchronized
+    def permission_expired(self, as_of_dt=None):
+        """Returns True if any of the segment's locators is expired"""
+        for r in self._segments:
+            if KeepLocator(r.locator).permission_expired(as_of_dt):
+                return True
+        return False
+
     @synchronized
     def segments(self):
         return copy.copy(self._segments)
index 27aad033ae55523de4fd14ae7bf9cf8a7d2b654f..812438e2ccf493507d06ca9468e3c9418f9e0e69 100644 (file)
@@ -565,16 +565,23 @@ class RichCollectionBase(CollectionBase):
     def find(self, path):
         """Recursively search the specified file path.
 
-        May return either a Collection or ArvadosFile.  Return None if not
+        May return either a Collection or ArvadosFile. Return None if not
         found.
+        If path is invalid (ex: starts with '/'), an IOError exception will be
+        raised.
 
         """
         if not path:
             raise errors.ArgumentError("Parameter 'path' is empty.")
 
         pathcomponents = path.split("/", 1)
+        if pathcomponents[0] == '':
+            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+
         item = self._items.get(pathcomponents[0])
-        if len(pathcomponents) == 1:
+        if item is None:
+            return None
+        elif len(pathcomponents) == 1:
             return item
         else:
             if isinstance(item, RichCollectionBase):
@@ -829,7 +836,7 @@ class RichCollectionBase(CollectionBase):
         if target_dir is None:
             raise IOError(errno.ENOENT, "Target directory not found", target_name)
 
-        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
             target_dir = target_dir[target_name]
             target_name = sourcecomponents[-1]
 
index e3b41b26d370abbd37210ae477ea25002b66c781..38e43496149ca2786580c257f729242695071c36 100644 (file)
@@ -7,21 +7,22 @@ import argparse
 import arvados
 import arvados.collection
 import base64
+import copy
 import datetime
 import errno
 import fcntl
 import hashlib
 import json
+import logging
 import os
 import pwd
-import time
+import re
 import signal
 import socket
 import sys
 import tempfile
 import threading
-import copy
-import logging
+import time
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
 
@@ -43,13 +44,7 @@ Local file or directory. Default: read from standard input.
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
-                    default=-1, help="""
-Maximum depth of directory tree to represent in the manifest
-structure. A directory structure deeper than this will be represented
-as a single stream in the manifest. If N=0, the manifest will contain
-a single stream. Default: -1 (unlimited), i.e., exactly one manifest
-stream per filesystem directory that contains files.
-""")
+                    default=-1, help=argparse.SUPPRESS)
 
 _group.add_argument('--normalize', action='store_true',
                     help="""
@@ -57,6 +52,12 @@ Normalize the manifest by re-ordering files and streams after writing
 data.
 """)
 
+_group.add_argument('--dry-run', action='store_true', default=False,
+                    help="""
+Don't actually upload files, but only check if any file should be
+uploaded. Exit with code=2 when files are pending for upload.
+""")
+
 _group = upload_opts.add_mutually_exclusive_group()
 
 _group.add_argument('--as-stream', action='store_true', dest='stream',
@@ -100,6 +101,12 @@ separated by commas, with a trailing newline. Do not store a
 manifest.
 """)
 
+upload_opts.add_argument('--update-collection', type=str, default=None,
+                         dest='update_collection', metavar="UUID", help="""
+Update an existing collection identified by the given Arvados collection
+UUID. All new local files will be uploaded.
+""")
+
 upload_opts.add_argument('--use-filename', type=str, default=None,
                          dest='filename', help="""
 Synonym for --filename.
@@ -167,6 +174,16 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
+                    help="""
+Save upload state in a cache file for resuming (default).
+""")
+_group.add_argument('--no-cache', action='store_false', dest='use_cache',
+                    help="""
+Do not save upload state in a cache file for resuming.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -191,17 +208,54 @@ def parse_arguments(arguments):
         and os.isatty(sys.stderr.fileno())):
         args.progress = True
 
+    # Turn off --resume (default) if --no-cache is used.
+    if not args.use_cache:
+        args.resume = False
+
     if args.paths == ['-']:
+        if args.update_collection:
+            arg_parser.error("""
+    --update-collection cannot be used when reading from stdin.
+    """)
         args.resume = False
+        args.use_cache = False
         if not args.filename:
             args.filename = 'stdin'
 
     return args
 
+
+class CollectionUpdateError(Exception):
+    pass
+
+
 class ResumeCacheConflict(Exception):
     pass
 
 
+class ArvPutArgumentConflict(Exception):
+    pass
+
+
+class ArvPutUploadIsPending(Exception):
+    pass
+
+
+class ArvPutUploadNotPending(Exception):
+    pass
+
+
+class FileUploadList(list):
+    def __init__(self, dry_run=False):
+        list.__init__(self)
+        self.dry_run = dry_run
+
+    def append(self, other):
+        if self.dry_run:
+            raise ArvPutUploadIsPending()
+        super(FileUploadList, self).append(other)
+
+
 class ResumeCache(object):
     CACHE_DIR = '.cache/arvados/arv-put'
 
@@ -217,7 +271,7 @@ class ResumeCache(object):
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
         md5.update('\0'.join(realpaths))
         if any(os.path.isdir(path) for path in realpaths):
-            md5.update(str(max(args.max_manifest_depth, -1)))
+            md5.update("-1")
         elif args.filename:
             md5.update(args.filename)
         return os.path.join(
@@ -291,12 +345,15 @@ class ArvPutUploadJob(object):
         'files' : {} # Previous run file list: {path : {size, mtime}}
     }
 
-    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
-                 name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, replication_desired=None,
-                 filename=None, update_time=1.0):
+    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
+                 bytes_expected=None, name=None, owner_uuid=None,
+                 ensure_unique_name=False, num_retries=None, replication_desired=None,
+                 filename=None, update_time=20.0, update_collection=None,
+                 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
         self.paths = paths
         self.resume = resume
+        self.use_cache = use_cache
+        self.update = False
         self.reporter = reporter
         self.bytes_expected = bytes_expected
         self.bytes_written = 0
@@ -311,51 +368,108 @@ class ArvPutUploadJob(object):
         self._state = None # Previous run state (file list & manifest)
         self._current_files = [] # Current run file list
         self._cache_file = None
-        self._collection = None
         self._collection_lock = threading.Lock()
+        self._remote_collection = None # Collection being updated (if asked)
+        self._local_collection = None # Collection from previous run manifest
+        self._file_paths = [] # Files to be updated in remote collection
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
+        self._checkpointer.daemon = True
         self._update_task_time = update_time  # How many seconds wait between update runs
-        self.logger = logging.getLogger('arvados.arv_put')
+        self._files_to_upload = FileUploadList(dry_run=dry_run)
+        self.logger = logger
+        self.dry_run = dry_run
+
+        if not self.use_cache and self.resume:
+            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
+
+        # Check for obvious dry-run responses
+        if self.dry_run and (not self.use_cache or not self.resume):
+            raise ArvPutUploadIsPending()
+
         # Load cached data if any and if needed
-        self._setup_state()
+        self._setup_state(update_collection)
 
-    def start(self):
+    def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
-        self._checkpointer.daemon = True
-        self._checkpointer.start()
+        if not self.dry_run:
+            self._checkpointer.start()
         try:
             for path in self.paths:
                 # Test for stdin first, in case some file named '-' exist
                 if path == '-':
+                    if self.dry_run:
+                        raise ArvPutUploadIsPending()
                     self._write_stdin(self.filename or 'stdin')
                 elif os.path.isdir(path):
-                    self._write_directory_tree(path)
+                    # Use absolute paths on cache index so CWD doesn't interfere
+                    # with the caching logic.
+                    prefixdir = path = os.path.abspath(path)
+                    if prefixdir != '/':
+                        prefixdir += '/'
+                    for root, dirs, files in os.walk(path):
+                        # Make os.walk()'s dir traversing order deterministic
+                        dirs.sort()
+                        files.sort()
+                        for f in files:
+                            self._check_file(os.path.join(root, f),
+                                             os.path.join(root[len(prefixdir):], f))
                 else:
-                    self._write_file(path, self.filename or os.path.basename(path))
-        finally:
-            # Stop the thread before doing anything else
-            self._stop_checkpointer.set()
-            self._checkpointer.join()
-            # Commit all & one last _update()
-            self.manifest_text()
+                    self._check_file(os.path.abspath(path),
+                                     self.filename or os.path.basename(path))
+            # If dry-mode is on, and got up to this point, then we should notify that
+            # there aren't any file to upload.
+            if self.dry_run:
+                raise ArvPutUploadNotPending()
+            # Remove local_collection's files that don't exist locally anymore, so the
+            # bytes_written count is correct.
+            for f in self.collection_file_paths(self._local_collection,
+                                                path_prefix=""):
+                if f != 'stdin' and not f in self._file_paths:
+                    self._local_collection.remove(f)
+            # Update bytes_written from current local collection and
+            # report initial progress.
             self._update()
-            if self.resume:
+            # Actual file upload
+            self._upload_files()
+        finally:
+            if not self.dry_run:
+                # Stop the thread before doing anything else
+                self._stop_checkpointer.set()
+                self._checkpointer.join()
+                # Commit all pending blocks & one last _update()
+                self._local_collection.manifest_text()
+                self._update(final=True)
+                if save_collection:
+                    self.save_collection()
+            if self.use_cache:
                 self._cache_file.close()
-                # Correct the final written bytes count
-                self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
-        with self._collection_lock:
-            self._my_collection().save_new(
+        if self.update:
+            # Check if files should be updated on the remote collection.
+            for fp in self._file_paths:
+                remote_file = self._remote_collection.find(fp)
+                if not remote_file:
+                    # File don't exist on remote collection, copy it.
+                    self._remote_collection.copy(fp, fp, self._local_collection)
+                elif remote_file != self._local_collection.find(fp):
+                    # A different file exist on remote collection, overwrite it.
+                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
+                else:
+                    # The file already exist on remote collection, skip it.
+                    pass
+            self._remote_collection.save(num_retries=self.num_retries)
+        else:
+            self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
                 ensure_unique_name=self.ensure_unique_name,
                 num_retries=self.num_retries)
 
     def destroy_cache(self):
-        if self.resume:
+        if self.use_cache:
             try:
                 os.unlink(self._cache_filename)
             except OSError as error:
@@ -384,17 +498,20 @@ class ArvPutUploadJob(object):
         while not self._stop_checkpointer.wait(self._update_task_time):
             self._update()
 
-    def _update(self):
+    def _update(self, final=False):
         """
         Update cached manifest text and report progress.
         """
         with self._collection_lock:
-            self.bytes_written = self._collection_size(self._my_collection())
-            # Update cache, if resume enabled
-            if self.resume:
+            self.bytes_written = self._collection_size(self._local_collection)
+            if self.use_cache:
+                # Update cache
                 with self._state_lock:
-                    # Get the manifest text without comitting pending blocks
-                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+                    if final:
+                        self._state['manifest'] = self._local_collection.manifest_text()
+                    else:
+                        # Get the manifest text without comitting pending blocks
+                        self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
                 self._save_state()
         # Call the reporter, if any
         self.report_progress()
@@ -403,114 +520,116 @@ class ArvPutUploadJob(object):
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
-    def _write_directory_tree(self, path, stream_name="."):
-        # TODO: Check what happens when multiple directories are passed as
-        # arguments.
-        # If the code below is uncommented, integration test
-        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
-        # fails, I suppose it is because the manifest_uuid changes because
-        # of the dir addition to stream_name.
-
-        # if stream_name == '.':
-        #     stream_name = os.path.join('.', os.path.basename(path))
-        for item in os.listdir(path):
-            if os.path.isdir(os.path.join(path, item)):
-                self._write_directory_tree(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-            else:
-                self._write_file(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-
     def _write_stdin(self, filename):
-        with self._collection_lock:
-            output = self._my_collection().open(filename, 'w')
+        output = self._local_collection.open(filename, 'w')
         self._write(sys.stdin, output)
         output.close()
 
-    def _write_file(self, source, filename):
+    def _check_file(self, source, filename):
+        """Check if this file needs to be uploaded"""
         resume_offset = 0
-        if self.resume:
-            # Check if file was already uploaded (at least partially)
-            with self._collection_lock:
-                try:
-                    file_in_collection = self._my_collection().find(filename)
-                except IOError:
-                    # Not found
-                    file_in_collection = None
+        should_upload = False
+        new_file_in_cache = False
+        # Record file path for updating the remote collection before exiting
+        self._file_paths.append(filename)
+
+        with self._state_lock:
             # If no previous cached data on this file, store it for an eventual
             # repeated run.
             if source not in self._state['files']:
-                with self._state_lock:
-                    self._state['files'][source] = {
-                        'mtime': os.path.getmtime(source),
-                        'size' : os.path.getsize(source)
-                    }
-            with self._state_lock:
-                cached_file_data = self._state['files'][source]
-            # See if this file was already uploaded at least partially
-            if file_in_collection:
-                if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
-                    if cached_file_data['size'] == file_in_collection.size():
-                        # File already there, skip it.
-                        self.bytes_skipped += cached_file_data['size']
-                        return
-                    elif cached_file_data['size'] > file_in_collection.size():
-                        # File partially uploaded, resume!
-                        resume_offset = file_in_collection.size()
-                    else:
-                        # Inconsistent cache, re-upload the file
-                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
-                else:
-                    # Local file differs from cached data, re-upload it
-                    pass
-        with open(source, 'r') as source_fd:
-            if resume_offset > 0:
-                # Start upload where we left off
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'a')
-                source_fd.seek(resume_offset)
+                self._state['files'][source] = {
+                    'mtime': os.path.getmtime(source),
+                    'size' : os.path.getsize(source)
+                }
+                new_file_in_cache = True
+            cached_file_data = self._state['files'][source]
+
+        # Check if file was already uploaded (at least partially)
+        file_in_local_collection = self._local_collection.find(filename)
+
+        # If not resuming, upload the full file.
+        if not self.resume:
+            should_upload = True
+        # New file detected from last run, upload it.
+        elif new_file_in_cache:
+            should_upload = True
+        # Local file didn't change from last run.
+        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+            if not file_in_local_collection:
+                # File not uploaded yet, upload it completely
+                should_upload = True
+            elif file_in_local_collection.permission_expired():
+                # Permission token expired, re-upload file. This will change whenever
+                # we have a API for refreshing tokens.
+                should_upload = True
+                self._local_collection.remove(filename)
+            elif cached_file_data['size'] == file_in_local_collection.size():
+                # File already there, skip it.
+                self.bytes_skipped += cached_file_data['size']
+            elif cached_file_data['size'] > file_in_local_collection.size():
+                # File partially uploaded, resume!
+                resume_offset = file_in_local_collection.size()
                 self.bytes_skipped += resume_offset
+                should_upload = True
             else:
-                # Start from scratch
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'w')
-            self._write(source_fd, output)
-            output.close(flush=False)
+                # Inconsistent cache, re-upload the file
+                should_upload = True
+                self._local_collection.remove(filename)
+                self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+        # Local file differs from cached data, re-upload it.
+        else:
+            if file_in_local_collection:
+                self._local_collection.remove(filename)
+            should_upload = True
+
+        if should_upload:
+            self._files_to_upload.append((source, resume_offset, filename))
+
+    def _upload_files(self):
+        for source, resume_offset, filename in self._files_to_upload:
+            with open(source, 'r') as source_fd:
+                with self._state_lock:
+                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
+                    self._state['files'][source]['size'] = os.path.getsize(source)
+                if resume_offset > 0:
+                    # Start upload where we left off
+                    output = self._local_collection.open(filename, 'a')
+                    source_fd.seek(resume_offset)
+                else:
+                    # Start from scratch
+                    output = self._local_collection.open(filename, 'w')
+                self._write(source_fd, output)
+                output.close(flush=False)
 
     def _write(self, source_fd, output):
-        first_read = True
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-            # Allow an empty file to be written
-            if not data and not first_read:
+            if not data:
                 break
-            if first_read:
-                first_read = False
             output.write(data)
 
     def _my_collection(self):
-        """
-        Create a new collection if none cached. Load it from cache otherwise.
-        """
-        if self._collection is None:
-            with self._state_lock:
-                manifest = self._state['manifest']
-            if self.resume and manifest is not None:
-                # Create collection from saved state
-                self._collection = arvados.collection.Collection(
-                    manifest,
-                    replication_desired=self.replication_desired)
-            else:
-                # Create new collection
-                self._collection = arvados.collection.Collection(
-                    replication_desired=self.replication_desired)
-        return self._collection
+        return self._remote_collection if self.update else self._local_collection
 
-    def _setup_state(self):
+    def _setup_state(self, update_collection):
         """
         Create a new cache file or load a previously existing one.
         """
-        if self.resume:
+        # Load an already existing collection for update
+        if update_collection and re.match(arvados.util.collection_uuid_pattern,
+                                          update_collection):
+            try:
+                self._remote_collection = arvados.collection.Collection(update_collection)
+            except arvados.errors.ApiError as error:
+                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
+            else:
+                self.update = True
+        elif update_collection:
+            # Collection locator provided, but unknown format
+            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+
+        if self.use_cache:
+            # Set up cache file name from input paths.
             md5 = hashlib.md5()
             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
             realpaths = sorted(os.path.realpath(path) for path in self.paths)
@@ -518,13 +637,20 @@ class ArvPutUploadJob(object):
             if self.filename:
                 md5.update(self.filename)
             cache_filename = md5.hexdigest()
-            self._cache_file = open(os.path.join(
+            cache_filepath = os.path.join(
                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-                cache_filename), 'a+')
+                cache_filename)
+            if self.resume:
+                self._cache_file = open(cache_filepath, 'a+')
+            else:
+                # --no-resume means start with a empty cache file.
+                self._cache_file = open(cache_filepath, 'w+')
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
             self._cache_file.seek(0)
-            with self._state_lock:
+
+        with self._state_lock:
+            if self.use_cache:
                 try:
                     self._state = json.load(self._cache_file)
                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
@@ -533,13 +659,22 @@ class ArvPutUploadJob(object):
                 except ValueError:
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
-            # Load how many bytes were uploaded on previous run
-            with self._collection_lock:
-                self.bytes_written = self._collection_size(self._my_collection())
-        # No resume required
-        else:
-            with self._state_lock:
+            else:
+                # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
+            # Load the previous manifest so we can check if files were modified remotely.
+            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
+
+    def collection_file_paths(self, col, path_prefix='.'):
+        """Return a list of file paths by recursively go through the entire collection `col`"""
+        file_paths = []
+        for name, item in col.items():
+            if isinstance(item, arvados.arvfile.ArvadosFile):
+                file_paths.append(os.path.join(path_prefix, name))
+            elif isinstance(item, arvados.collection.Subcollection):
+                new_prefix = os.path.join(path_prefix, name)
+                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
+        return file_paths
 
     def _lock_file(self, fileobj):
         try:
@@ -553,7 +688,7 @@ class ArvPutUploadJob(object):
         """
         try:
             with self._state_lock:
-                state = self._state
+                state = copy.deepcopy(self._state)
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self._cache_filename))
             self._lock_file(new_cache_fd)
@@ -573,24 +708,16 @@ class ArvPutUploadJob(object):
             self._cache_file = new_cache
 
     def collection_name(self):
-        with self._collection_lock:
-            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
-        return name
+        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
     def manifest_locator(self):
-        with self._collection_lock:
-            locator = self._my_collection().manifest_locator()
-        return locator
+        return self._my_collection().manifest_locator()
 
     def portable_data_hash(self):
-        with self._collection_lock:
-            datahash = self._my_collection().portable_data_hash()
-        return datahash
+        return self._my_collection().portable_data_hash()
 
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
-        with self._collection_lock:
-            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
-        return manifest
+        return self._my_collection().manifest_text(stream_name, strip, normalize)
 
     def _datablocks_on_item(self, item):
         """
@@ -673,6 +800,7 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
+    logger = logging.getLogger('arvados.arv_put')
     args = parse_arguments(arguments)
     status = 0
     if api_client is None:
@@ -681,7 +809,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
-            print >>stderr, "Cannot use --name with --stream or --raw"
+            logger.error("Cannot use --name with --stream or --raw")
+            sys.exit(1)
+        elif args.update_collection:
+            logger.error("Cannot use --name with --update-collection")
             sys.exit(1)
         collection_name = args.name
     else:
@@ -691,7 +822,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             socket.gethostname())
 
     if args.project_uuid and (args.stream or args.raw):
-        print >>stderr, "Cannot use --project-uuid with --stream or --raw"
+        logger.error("Cannot use --project-uuid with --stream or --raw")
         sys.exit(1)
 
     # Determine the parent project
@@ -699,7 +830,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         project_uuid = desired_project_uuid(api_client, args.project_uuid,
                                             args.retries)
     except (apiclient_errors.Error, ValueError) as error:
-        print >>stderr, error
+        logger.error(error)
         sys.exit(1)
 
     if args.progress:
@@ -710,9 +841,11 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         reporter = None
 
     bytes_expected = expected_bytes_for(args.paths)
+
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
+                                 use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
                                  bytes_expected = bytes_expected,
@@ -720,28 +853,54 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                                  replication_desired = args.replication,
                                  name = collection_name,
                                  owner_uuid = project_uuid,
-                                 ensure_unique_name = True)
+                                 ensure_unique_name = True,
+                                 update_collection = args.update_collection,
+                                 logger=logger,
+                                 dry_run=args.dry_run)
     except ResumeCacheConflict:
-        print >>stderr, "\n".join([
+        logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
-            "         Use --no-resume if this is really what you want."])
+            "         Use --no-cache if this is really what you want."]))
         sys.exit(1)
+    except CollectionUpdateError as error:
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
+        sys.exit(1)
+    except ArvPutUploadIsPending:
+        # Dry run check successful, return proper exit code.
+        sys.exit(2)
+    except ArvPutUploadNotPending:
+        # No files pending for upload
+        sys.exit(0)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if args.resume and writer.bytes_written > 0:
-        print >>stderr, "\n".join([
-                "arv-put: Resuming previous upload from last checkpoint.",
-                "         Use the --no-resume option to start over."])
+    if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
+        logger.warning("\n".join([
+            "arv-put: Resuming previous upload from last checkpoint.",
+            "         Use the --no-resume option to start over."]))
 
-    writer.report_progress()
+    if not args.dry_run:
+        writer.report_progress()
     output = None
-    writer.start()
+    try:
+        writer.start(save_collection=not(args.stream or args.raw))
+    except arvados.errors.ApiError as error:
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
+        sys.exit(1)
+    except ArvPutUploadIsPending:
+        # Dry run check successful, return proper exit code.
+        sys.exit(2)
+    except ArvPutUploadNotPending:
+        # No files pending for upload
+        sys.exit(0)
+
     if args.progress:  # Print newline to split stderr from stdout for humans.
-        print >>stderr
+        logger.info("\n")
 
     if args.stream:
         if args.normalize:
@@ -752,14 +911,16 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         output = ','.join(writer.data_locators())
     else:
         try:
-            writer.save_collection()
-            print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+            if args.update_collection:
+                logger.info("Collection updated: '{}'".format(writer.collection_name()))
+            else:
+                logger.info("Collection saved as '{}'".format(writer.collection_name()))
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
                 output = writer.manifest_locator()
         except apiclient_errors.Error as error:
-            print >>stderr, (
+            logger.error(
                 "arv-put: Error creating Collection on project: {}.".format(
                     error))
             status = 1
@@ -779,7 +940,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         sys.exit(status)
 
     # Success!
-    writer.destroy_cache()
     return output
 
 
index c98947945669338384147d9a8a0baf6917c43db9..38f332b38e2d51aae9b6a3fa5007a59aad6b006f 100644 (file)
@@ -296,14 +296,14 @@ class KeepClient(object):
 
         def _get_user_agent(self):
             try:
-                return self._user_agent_pool.get(False)
+                return self._user_agent_pool.get(block=False)
             except Queue.Empty:
                 return pycurl.Curl()
 
         def _put_user_agent(self, ua):
             try:
                 ua.reset()
-                self._user_agent_pool.put(ua, False)
+                self._user_agent_pool.put(ua, block=False)
             except:
                 ua.close()
 
index 9d7d2481fdbb69c1635f932e7171663be9739e14..d470ab4d0046f01a40017d9aaa58dc0e231b1c51 100644 (file)
@@ -46,7 +46,6 @@ setup(name='arvados-python-client',
       install_requires=[
           'google-api-python-client==1.4.2',
           'oauth2client >=1.4.6, <2',
-          'pyasn1-modules==0.0.5',
           'ciso8601',
           'httplib2',
           'pycurl >=7.19.5.1, <7.21.5',
index 2b8b6ca1c4ad531c29bfd1c9a149da7c9bdf3599..006604077d457d286cdb9148332e087a0204313c 100644 (file)
@@ -54,4 +54,20 @@ http {
       proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/;
     }
   }
+  upstream ws {
+    server localhost:{{WSPORT}};
+  }
+  server {
+    listen *:{{WSSPORT}} ssl default_server;
+    server_name ~^(?<request_host>.*)$;
+    ssl_certificate {{SSLCERT}};
+    ssl_certificate_key {{SSLKEY}};
+    location  / {
+      proxy_pass http://ws;
+      proxy_set_header Upgrade $http_upgrade;
+      proxy_set_header Connection "upgrade";
+      proxy_set_header Host $request_host:{{WSPORT}};
+      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+    }
+  }
 }
index 642b7ccbad51846a9f1c25acc86f6b0505897c62..b969b12a7abc6374ae0993d0d95c2f5cf85f784b 100644 (file)
@@ -44,6 +44,7 @@ if not os.path.exists(TEST_TMPDIR):
 
 my_api_host = None
 _cached_config = {}
+_cached_db_config = {}
 
 def find_server_pid(PID_PATH, wait=10):
     now = time.time()
@@ -284,10 +285,19 @@ def run(leave_running_atexit=False):
         os.makedirs(gitdir)
     subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball])
 
+    # The nginx proxy isn't listening here yet, but we need to choose
+    # the wss:// port now so we can write the API server config file.
+    wss_port = find_available_port()
+    _setport('wss', wss_port)
+
     port = find_available_port()
     env = os.environ.copy()
     env['RAILS_ENV'] = 'test'
-    env['ARVADOS_WEBSOCKETS'] = 'yes'
+    env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
+    if env.get('ARVADOS_TEST_EXPERIMENTAL_WS'):
+        env.pop('ARVADOS_WEBSOCKETS', None)
+    else:
+        env['ARVADOS_WEBSOCKETS'] = 'yes'
     env.pop('ARVADOS_TEST_API_HOST', None)
     env.pop('ARVADOS_API_HOST', None)
     env.pop('ARVADOS_API_HOST_INSECURE', None)
@@ -360,6 +370,47 @@ def stop(force=False):
         kill_server_pid(_pidfile('api'))
         my_api_host = None
 
+def run_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    stop_ws()
+    port = find_available_port()
+    conf = os.path.join(TEST_TMPDIR, 'ws.yml')
+    with open(conf, 'w') as f:
+        f.write("""
+Client:
+  APIHost: {}
+  Insecure: true
+Listen: :{}
+LogLevel: {}
+Postgres:
+  host: {}
+  dbname: {}
+  user: {}
+  password: {}
+  sslmode: require
+        """.format(os.environ['ARVADOS_API_HOST'],
+                   port,
+                   ('info' if os.environ.get('ARVADOS_DEBUG', '') in ['','0'] else 'debug'),
+                   _dbconfig('host'),
+                   _dbconfig('database'),
+                   _dbconfig('username'),
+                   _dbconfig('password')))
+    logf = open(_fifo2stderr('ws'), 'w')
+    ws = subprocess.Popen(
+        ["ws", "-config", conf],
+        stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+    with open(_pidfile('ws'), 'w') as f:
+        f.write(str(ws.pid))
+    _wait_until_port_listens(port)
+    _setport('ws', port)
+    return port
+
+def stop_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    kill_server_pid(_pidfile('ws'))
+
 def _start_keep(n, keep_args):
     keep0 = tempfile.mkdtemp()
     port = find_available_port()
@@ -537,6 +588,7 @@ def stop_keep_web():
 def run_nginx():
     if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
         return
+    stop_nginx()
     nginxconf = {}
     nginxconf['KEEPWEBPORT'] = _getport('keep-web')
     nginxconf['KEEPWEBDLSSLPORT'] = find_available_port()
@@ -545,6 +597,8 @@ def run_nginx():
     nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
     nginxconf['GITPORT'] = _getport('arv-git-httpd')
     nginxconf['GITSSLPORT'] = find_available_port()
+    nginxconf['WSPORT'] = _getport('ws')
+    nginxconf['WSSPORT'] = _getport('wss')
     nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
     nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
     nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
@@ -593,7 +647,15 @@ def _getport(program):
     except IOError:
         return 9
 
+def _dbconfig(key):
+    global _cached_db_config
+    if not _cached_db_config:
+        _cached_db_config = yaml.load(open(os.path.join(
+            SERVICES_SRC_DIR, 'api', 'config', 'database.yml')))
+    return _cached_db_config['test'][key]
+
 def _apiconfig(key):
+    global _cached_config
     if _cached_config:
         return _cached_config[key]
     def _load(f, required=True):
@@ -647,6 +709,7 @@ class TestCaseWithServers(unittest.TestCase):
     original environment.
     """
     MAIN_SERVER = None
+    WS_SERVER = None
     KEEP_SERVER = None
     KEEP_PROXY_SERVER = None
     KEEP_WEB_SERVER = None
@@ -667,6 +730,7 @@ class TestCaseWithServers(unittest.TestCase):
         os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
         for server_kwargs, start_func, stop_func in (
                 (cls.MAIN_SERVER, run, reset),
+                (cls.WS_SERVER, run_ws, stop_ws),
                 (cls.KEEP_SERVER, run_keep, stop_keep),
                 (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
                 (cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
@@ -693,6 +757,7 @@ class TestCaseWithServers(unittest.TestCase):
 if __name__ == "__main__":
     actions = [
         'start', 'stop',
+        'start_ws', 'stop_ws',
         'start_keep', 'stop_keep',
         'start_keep_proxy', 'stop_keep_proxy',
         'start_keep-web', 'stop_keep-web',
@@ -725,6 +790,10 @@ if __name__ == "__main__":
             print(host)
     elif args.action == 'stop':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
+    elif args.action == 'start_ws':
+        run_ws()
+    elif args.action == 'stop_ws':
+        stop_ws()
     elif args.action == 'start_keep':
         run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
     elif args.action == 'stop_keep':
index f35e4c725c1ebfb74062b44b4c4c4d5477684f9a..f1dfd03def33d09c1ede560f50c5a059020f9c0c 100644 (file)
@@ -32,9 +32,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
         [],
         ['/dev/null'],
         ['/dev/null', '--filename', 'empty'],
-        ['/tmp'],
-        ['/tmp', '--max-manifest-depth', '0'],
-        ['/tmp', '--max-manifest-depth', '1']
+        ['/tmp']
         ]
 
     def tearDown(self):
@@ -241,6 +239,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
 
 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                           ArvadosBaseTestCase):
+
     def setUp(self):
         super(ArvPutUploadJobTest, self).setUp()
         run_test_server.authorize_with('active')
@@ -271,7 +270,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
 
     def test_writer_works_without_cache(self):
         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
-        cwriter.start()
+        cwriter.start(save_collection=False)
         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
     def test_writer_works_with_cache(self):
@@ -279,13 +278,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             f.write('foo')
             f.flush()
             cwriter = arv_put.ArvPutUploadJob([f.name])
-            cwriter.start()
-            self.assertEqual(3, cwriter.bytes_written)
+            cwriter.start(save_collection=False)
+            self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
             # Don't destroy the cache, and start another upload
             cwriter_new = arv_put.ArvPutUploadJob([f.name])
-            cwriter_new.start()
+            cwriter_new.start(save_collection=False)
             cwriter_new.destroy_cache()
-            self.assertEqual(0, cwriter_new.bytes_written)
+            self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
 
     def make_progress_tester(self):
         progression = []
@@ -301,13 +300,13 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                 progression, reporter = self.make_progress_tester()
                 cwriter = arv_put.ArvPutUploadJob([f.name],
                     reporter=reporter, bytes_expected=expect_count)
-                cwriter.start()
+                cwriter.start(save_collection=False)
                 cwriter.destroy_cache()
                 self.assertIn((3, expect_count), progression)
 
     def test_writer_upload_directory(self):
         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
-        cwriter.start()
+        cwriter.start(save_collection=False)
         cwriter.destroy_cache()
         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
 
@@ -325,17 +324,128 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             writer = arv_put.ArvPutUploadJob([self.large_file_name],
                                              replication_desired=1)
             with self.assertRaises(SystemExit):
-                writer.start()
-                self.assertLess(writer.bytes_written,
-                                os.path.getsize(self.large_file_name))
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
         # Retry the upload
         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
                                           replication_desired=1)
-        writer2.start()
-        self.assertEqual(writer.bytes_written + writer2.bytes_written,
+        writer2.start(save_collection=False)
+        self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
                          os.path.getsize(self.large_file_name))
         writer2.destroy_cache()
 
+    def test_no_resume_when_asked(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload, this time without resume
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          resume=False)
+        writer2.start(save_collection=False)
+        self.assertEqual(writer2.bytes_skipped, 0)
+        self.assertEqual(writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
+
+    def test_no_resume_when_no_cache(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload, this time without cache usage
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          resume=False,
+                                          use_cache=False)
+        writer2.start(save_collection=False)
+        self.assertEqual(writer2.bytes_skipped, 0)
+        self.assertEqual(writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
+
+
+    def test_dry_run_feature(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start(save_collection=False)
+            # Confirm that the file was partially uploaded
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
+        # Retry the upload using dry_run to check if there is a pending upload
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          dry_run=True)
+        with self.assertRaises(arv_put.ArvPutUploadIsPending):
+            writer2.start(save_collection=False)
+        # Complete the pending upload
+        writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1)
+        writer3.start(save_collection=False)
+        # Confirm there's no pending upload with dry_run=True
+        writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1,
+                                          dry_run=True)
+        with self.assertRaises(arv_put.ArvPutUploadNotPending):
+            writer4.start(save_collection=False)
+        writer4.destroy_cache()
+        # Test obvious cases
+        with self.assertRaises(arv_put.ArvPutUploadIsPending):
+            arv_put.ArvPutUploadJob([self.large_file_name],
+                                    replication_desired=1,
+                                    dry_run=True,
+                                    resume=False,
+                                    use_cache=False)
+        with self.assertRaises(arv_put.ArvPutUploadIsPending):
+            arv_put.ArvPutUploadJob([self.large_file_name],
+                                    replication_desired=1,
+                                    dry_run=True,
+                                    resume=False)
+
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
     TEST_SIZE = os.path.getsize(__file__)
@@ -634,6 +744,21 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         self.assertEqual(1, len(collection_list))
         return collection_list[0]
 
+    def test_put_collection_with_later_update(self):
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        # Add a new file to the directory
+        with open(os.path.join(tmpdir, 'file2'), 'w') as f:
+            f.write('The quick brown fox jumped over the lazy dog')
+        updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
+        self.assertEqual(col['uuid'], updated_col['uuid'])
+        # Get the manifest and check that the new file is being included
+        c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
+        self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
+
     def test_put_collection_with_high_redundancy(self):
         # Write empty data: we're not testing CollectionWriter, just
         # making sure collections.create tells the API server what our
index 6b3562602aa69601021b04d93a116c04972abab5..8f02d517fc54ff531755dbacdefdb48378ab13ea 100644 (file)
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 
 import bz2
+import datetime
 import gzip
 import io
 import mock
@@ -570,6 +571,26 @@ class ArvadosFileReadlinesTestCase(ArvadosFileReadTestCase):
     def read_for_test(self, reader, byte_count, **kwargs):
         return ''.join(reader.readlines(**kwargs))
 
+
+class ArvadosFileTestCase(unittest.TestCase):
+    def datetime_to_hex(self, dt):
+        return hex(int(time.mktime(dt.timetuple())))[2:]
+
+    def test_permission_expired(self):
+        base_manifest = ". 781e5e245d69b566979b86e28d23f2c7+10+A715fd31f8111894f717eb1003c1b0216799dd9ec@{} 0:10:count.txt\n"
+        now = datetime.datetime.now()
+        a_week_ago = now - datetime.timedelta(days=7)
+        a_month_ago = now - datetime.timedelta(days=30)
+        a_week_from_now = now + datetime.timedelta(days=7)
+        with Collection(base_manifest.format(self.datetime_to_hex(a_week_from_now))) as c:
+            self.assertFalse(c.find('count.txt').permission_expired())
+        with Collection(base_manifest.format(self.datetime_to_hex(a_week_ago))) as c:
+            f = c.find('count.txt')
+            self.assertTrue(f.permission_expired())
+            self.assertTrue(f.permission_expired(a_week_from_now))
+            self.assertFalse(f.permission_expired(a_month_ago))
+
+
 class BlockManagerTest(unittest.TestCase):
     def test_bufferblock_append(self):
         keep = ArvadosFileWriterTestCase.MockKeep({})
index fc30a242eba1bfc665a05747de66f999869ef8a4..0e3d5e13f135c84f2fde2f741bd554b0ccdf3a85 100644 (file)
@@ -861,6 +861,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
             c.find("/.")
         with self.assertRaises(arvados.errors.ArgumentError):
             c.find("")
+        self.assertIs(c.find("./nonexistant.txt"), None)
+        self.assertIs(c.find("./nonexistantsubdir/nonexistant.txt"), None)
 
     def test_remove_in_subdir(self):
         c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
index 0199724339b76f0fb37e5af89c8d62c53f332711..7e8c84ec11279495d55fd47770378847886ae76e 100644 (file)
@@ -53,21 +53,22 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         self.assertEqual(200, events.get(True, 5)['status'])
         human = arvados.api('v1').humans().create(body={}).execute()
 
-        log_object_uuids = []
-        for i in range(0, expected):
-            log_object_uuids.append(events.get(True, 5)['object_uuid'])
-
+        want_uuids = []
         if expected > 0:
-            self.assertIn(human['uuid'], log_object_uuids)
-
+            want_uuids.append(human['uuid'])
         if expected > 1:
-            self.assertIn(ancestor['uuid'], log_object_uuids)
+            want_uuids.append(ancestor['uuid'])
+        log_object_uuids = []
+        while set(want_uuids) - set(log_object_uuids):
+            log_object_uuids.append(events.get(True, 5)['object_uuid'])
 
-        with self.assertRaises(Queue.Empty):
-            # assertEqual just serves to show us what unexpected thing
-            # comes out of the queue when the assertRaises fails; when
-            # the test passes, this assertEqual doesn't get called.
-            self.assertEqual(events.get(True, 2), None)
+        if expected < 2:
+            with self.assertRaises(Queue.Empty):
+                # assertEqual just serves to show us what unexpected
+                # thing comes out of the queue when the assertRaises
+                # fails; when the test passes, this assertEqual
+                # doesn't get called.
+                self.assertEqual(events.get(True, 2), None)
 
     def test_subscribe_websocket(self):
         self._test_subscribe(
@@ -145,8 +146,8 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
 
     def isotz(self, offset):
-        """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
-        return '{:+03d}{:02d}'.format(offset/60, offset%60)
+        """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
+        return '{:+03d}:{:02d}'.format(offset/60, offset%60)
 
     # Test websocket reconnection on (un)execpted close
     def _test_websocket_reconnect(self, close_unexpected):
index aed0309591e4ecbfa4c309747daee695417b16c8..910db7e0c7bad2bb0a38ebbc8d0b02a40b63561f 100644 (file)
@@ -252,12 +252,7 @@ class ArvadosModel < ActiveRecord::Base
     parts = full_text_searchable_columns.collect do |column|
       "coalesce(#{column},'')"
     end
-    # We prepend a space to the tsvector() argument here. Otherwise,
-    # it might start with a column that has its own (non-full-text)
-    # index, which causes Postgres to use the column index instead of
-    # the tsvector index, which causes full text queries to be just as
-    # slow as if we had no index at all.
-    "to_tsvector('english', ' ' || #{parts.join(" || ' ' || ")})"
+    "to_tsvector('english', #{parts.join(" || ' ' || ")})"
   end
 
   def self.apply_filters query, filters
index ef3d0b5e1028b41a2bab4d63aa300efb4743bc0a..9556799aba9f74375d0419fae8e8d153bc97ffa1 100644 (file)
@@ -119,6 +119,10 @@ class Job < ArvadosModel
     super - ["script_parameters_digest"]
   end
 
+  def self.full_text_searchable_columns
+    super - ["script_parameters_digest"]
+  end
+
   def self.load_job_specific_filters attrs, orig_filters, read_users
     # Convert Job-specific @filters entries into general SQL filters.
     script_info = {"repository" => nil, "script" => nil}
index a9aa953f9f36e948dc57d34336c7d3f1cc1df43c..c8d2d1d6f1e6665b5b5b67dbd028a9be049248fc 100644 (file)
@@ -444,3 +444,4 @@ test:
   workbench_address: https://localhost:3001/
   git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %>
   git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %>
+  websocket_address: <% if ENV['ARVADOS_TEST_EXPERIMENTAL_WS'] %>"wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"<% else %>false<% end %>
diff --git a/services/api/db/migrate/20161213172944_full_text_search_indexes.rb b/services/api/db/migrate/20161213172944_full_text_search_indexes.rb
new file mode 100644 (file)
index 0000000..4cb4d97
--- /dev/null
@@ -0,0 +1,32 @@
+class FullTextSearchIndexes < ActiveRecord::Migration
+  def fts_indexes
+    {
+      "collections" => "collections_full_text_search_idx",
+      "container_requests" => "container_requests_full_text_search_idx",
+      "groups" => "groups_full_text_search_idx",
+      "jobs" => "jobs_full_text_search_idx",
+      "pipeline_instances" => "pipeline_instances_full_text_search_idx",
+      "pipeline_templates" => "pipeline_templates_full_text_search_idx",
+      "workflows" => "workflows_full_text_search_idx",
+    }
+  end
+
+  def up
+    # remove existing fts indexes and create up to date ones with no leading space
+    fts_indexes.each do |t, i|
+      ActiveRecord::Base.connection.indexes(t).each do |idx|
+        if idx.name == i
+          remove_index t.to_sym, :name => i
+          break
+        end
+      end
+      execute "CREATE INDEX #{i} ON #{t} USING gin(#{t.classify.constantize.full_text_tsvector});"
+    end
+  end
+
+  def down
+    fts_indexes.each do |t, i|
+      remove_index t.to_sym, :name => i
+    end
+  end
+end
index e715cd60c4fcc0bb56e7a3df0ca7bbf01362cf9a..7ee7ea6ff98fbd1845eed558737cf16c557343ce 100644 (file)
@@ -1506,7 +1506,7 @@ CREATE UNIQUE INDEX collection_owner_uuid_name_unique ON collections USING btree
 -- Name: collections_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
 
-CREATE INDEX collections_full_text_search_idx ON collections USING gin (to_tsvector('english'::regconfig, (((((((((((((((((' '::text || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(portable_data_hash, ''::character varying))::text) || ' '::text) || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(file_names, ''::character varying))::text)));
+CREATE INDEX collections_full_text_search_idx ON collections USING gin (to_tsvector('english'::regconfig, (((((((((((((((((COALESCE(owner_uuid, ''::character varying))::text || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(portable_data_hash, ''::character varying))::text) || ' '::text) || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(file_names, ''::character varying))::text)));
 
 
 --
@@ -1520,7 +1520,7 @@ CREATE INDEX collections_search_index ON collections USING btree (owner_uuid, mo
 -- Name: container_requests_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
 
-CREATE INDEX container_requests_full_text_search_idx ON container_requests USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(mounts, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text))));
+CREATE INDEX container_requests_full_text_search_idx ON container_requests USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(mounts, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)) || ' '::text) || COALESCE(scheduling_parameters, ''::text)) || ' '::text) || (COALESCE(output_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log_uuid, ''::character varying))::text)));
 
 
 --
@@ -1541,7 +1541,7 @@ CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid
 -- Name: groups_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
 
-CREATE INDEX groups_full_text_search_idx ON groups USING gin (to_tsvector('english'::regconfig, (((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(group_class, ''::character varying))::text)));
+CREATE INDEX groups_full_text_search_idx ON groups USING gin (to_tsvector('english'::regconfig, (((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(group_class, ''::character varying))::text)));
 
 
 --
@@ -2283,7 +2283,7 @@ CREATE INDEX job_tasks_search_index ON job_tasks USING btree (uuid, owner_uuid,
 -- Name: jobs_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
 
-CREATE INDEX jobs_full_text_search_idx ON jobs USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(submit_id, ''::character varying))::text) || ' '::text) || (COALESCE(script, ''::character varying))::text) || ' '::text) || (COALESCE(script_version, ''::character varying))::text) || ' '::text) || COALESCE(script_parameters, ''::text)) || ' '::text) || (COALESCE(cancelled_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(cancelled_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output, ''::character varying))::text) || ' '::text) || (COALESCE(is_locked_by_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log, ''::character varying))::text) || ' '::text) || COALESCE(tasks_summary, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(repository, ''::character varying))::text) || ' '::text) || (COALESCE(supplied_script_version, ''::character varying))::text) || ' '::text) || (COALESCE(docker_image_locator, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(arvados_sdk_version, ''::character varying))::text)));
+CREATE INDEX jobs_full_text_search_idx ON jobs USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(submit_id, ''::character varying))::text) || ' '::text) || (COALESCE(script, ''::character varying))::text) || ' '::text) || (COALESCE(script_version, ''::character varying))::text) || ' '::text) || COALESCE(script_parameters, ''::text)) || ' '::text) || (COALESCE(cancelled_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(cancelled_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output, ''::character varying))::text) || ' '::text) || (COALESCE(is_locked_by_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log, ''::character varying))::text) || ' '::text) || COALESCE(tasks_summary, ''::text)) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(repository, ''::character varying))::text) || ' '::text) || (COALESCE(supplied_script_version, ''::character varying))::text) || ' '::text) || (COALESCE(docker_image_locator, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(arvados_sdk_version, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text))));
 
 
 --
@@ -2339,7 +2339,7 @@ CREATE INDEX nodes_search_index ON nodes USING btree (uuid, owner_uuid, modified
 -- Name: pipeline_instances_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
 
-CREATE INDEX pipeline_instances_full_text_search_idx ON pipeline_instances USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(pipeline_template_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || COALESCE(components_summary, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text)));
+CREATE INDEX pipeline_instances_full_text_search_idx ON pipeline_instances USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(pipeline_template_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || COALESCE(components_summary, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text)));
 
 
 --
@@ -2360,7 +2360,7 @@ CREATE UNIQUE INDEX pipeline_template_owner_uuid_name_unique ON pipeline_templat
 -- Name: pipeline_templates_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
 
-CREATE INDEX pipeline_templates_full_text_search_idx ON pipeline_templates USING gin (to_tsvector('english'::regconfig, (((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text)));
+CREATE INDEX pipeline_templates_full_text_search_idx ON pipeline_templates USING gin (to_tsvector('english'::regconfig, (((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(components, ''::text)) || ' '::text) || (COALESCE(description, ''::character varying))::text)));
 
 
 --
@@ -2416,7 +2416,7 @@ CREATE INDEX virtual_machines_search_index ON virtual_machines USING btree (uuid
 -- Name: workflows_full_text_search_idx; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
 
-CREATE INDEX workflows_full_text_search_idx ON workflows USING gin (to_tsvector('english'::regconfig, (((((((((((((' '::text || (COALESCE(uuid, ''::character varying))::text) || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(definition, ''::text))));
+CREATE INDEX workflows_full_text_search_idx ON workflows USING gin (to_tsvector('english'::regconfig, (((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(definition, ''::text))));
 
 
 --
@@ -2704,4 +2704,6 @@ INSERT INTO schema_migrations (version) VALUES ('20161111143147');
 
 INSERT INTO schema_migrations (version) VALUES ('20161115171221');
 
-INSERT INTO schema_migrations (version) VALUES ('20161115174218');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20161115174218');
+
+INSERT INTO schema_migrations (version) VALUES ('20161213172944');
\ No newline at end of file
diff --git a/services/api/lib/tasks/config_dump.rake b/services/api/lib/tasks/config_dump.rake
new file mode 100644 (file)
index 0000000..c7e0214
--- /dev/null
@@ -0,0 +1,6 @@
+namespace :config do
+  desc 'Show site configuration'
+  task dump: :environment do
+    puts $application_config.to_yaml
+  end
+end
index 1828e150bb76bdf6185f7fcbb3fe3172fb68e616..51b0a57a7fe11e573361e242eb80d324a540dfda 100644 (file)
@@ -377,36 +377,28 @@ class Operations(llfuse.Operations):
         if 'event_type' not in ev:
             return
         with llfuse.lock:
+            new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
+            pdh = new_attrs.get("portable_data_hash")
+            # new_attributes.modified_at currently lacks
+            # subsecond precision (see #6347) so use event_at
+            # which should always be the same.
+            stamp = ev.get("event_at")
+
             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
                 item.invalidate()
-                if ev["object_kind"] == "arvados#collection":
-                    new_attr = (ev.get("properties") and
-                                ev["properties"].get("new_attributes") and
-                                ev["properties"]["new_attributes"])
-
-                    # new_attributes.modified_at currently lacks
-                    # subsecond precision (see #6347) so use event_at
-                    # which should always be the same.
-                    record_version = (
-                        (ev["event_at"], new_attr["portable_data_hash"])
-                        if new_attr else None)
-
-                    item.update(to_record_version=record_version)
+                if stamp and pdh and ev.get("object_kind") == "arvados#collection":
+                    item.update(to_record_version=(stamp, pdh))
                 else:
                     item.update()
 
-            oldowner = (
-                ev.get("properties") and
-                ev["properties"].get("old_attributes") and
-                ev["properties"]["old_attributes"].get("owner_uuid"))
-            newowner = ev["object_owner_uuid"]
+            oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
+            newowner = ev.get("object_owner_uuid")
             for parent in (
                     self.inodes.inode_cache.find_by_uuid(oldowner) +
                     self.inodes.inode_cache.find_by_uuid(newowner)):
                 parent.invalidate()
                 parent.update()
 
-
     @catch_exceptions
     def getattr(self, inode):
         if inode not in self.inodes:
index f2948f9e45f295b43544615ad75c764c35856053..ffcfc6500f5c5ac31289da3c404166b386f74374 100644 (file)
@@ -126,6 +126,8 @@ class Mount(object):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
+        if self.operations.events:
+            self.operations.events.close(timeout=self.args.unmount_timeout)
         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
         if self.llfuse_thread.is_alive():
index 20192f9d84302e1d9967136bcfa19e03ef7012dd..1319aebdccaa1e9dcc0f3e4323fa66340cc05a7f 100644 (file)
@@ -66,6 +66,8 @@ class MountTestBase(unittest.TestCase):
 
     def tearDown(self):
         if self.llfuse_thread:
+            if self.operations.events:
+                self.operations.events.close(timeout=10)
             subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
             t0 = time.time()
             self.llfuse_thread.join(timeout=10)
index b46ba7839f72f2a578fb6fbcdcbe8fbe9e71d681..81d5c86072d8b0f6e7008235f128adfda2857550 100644 (file)
@@ -50,7 +50,7 @@ class RetryPUT(IntegrationTest):
         q.put(mockedCurl)
         q.put(pycurl.Curl())
         q.put(pycurl.Curl())
-        with mock.patch('arvados.keep.KeepClient.KeepService._get_user_agent', side_effect=lambda: q.get(block=None)):
+        with mock.patch('arvados.keep.KeepClient.KeepService._get_user_agent', side_effect=q.get_nowait):
             self.pool_test(os.path.join(self.mnt, 'zzz'))
             self.assertTrue(mockedCurl.perform.called)
     @staticmethod
index bfd006ee8d2f3576332b8a3be4c6b040cce14214..e34f8581fd5448d606a6305e52c0673f944a8898 100644 (file)
@@ -5,12 +5,12 @@ package main
 
 import (
        "context"
-       "fmt"
        "net/http"
        "strings"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
        log "github.com/Sirupsen/logrus"
 )
 
@@ -97,25 +97,11 @@ func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWrite
        }
 
        lgr.WithFields(log.Fields{
-               "timeTotal":      loggedDuration(tDone.Sub(tStart)),
-               "timeToStatus":   loggedDuration(resp.sentHdr.Sub(tStart)),
-               "timeWriteBody":  loggedDuration(tDone.Sub(resp.sentHdr)),
+               "timeTotal":      stats.Duration(tDone.Sub(tStart)),
+               "timeToStatus":   stats.Duration(resp.sentHdr.Sub(tStart)),
+               "timeWriteBody":  stats.Duration(tDone.Sub(resp.sentHdr)),
                "respStatusCode": resp.Status,
                "respStatus":     statusText,
                "respBytes":      resp.Length,
        }).Info("response")
 }
-
-type loggedDuration time.Duration
-
-// MarshalJSON formats a duration as a number of seconds, using
-// fixed-point notation with no more than 6 decimal places.
-func (d loggedDuration) MarshalJSON() ([]byte, error) {
-       return []byte(d.String()), nil
-}
-
-// String formats a duration as a number of seconds, using
-// fixed-point notation with no more than 6 decimal places.
-func (d loggedDuration) String() string {
-       return fmt.Sprintf("%.6f", time.Duration(d).Seconds())
-}
diff --git a/services/ws/arvados-ws.service b/services/ws/arvados-ws.service
new file mode 100644 (file)
index 0000000..ebccf0c
--- /dev/null
@@ -0,0 +1,13 @@
+[Unit]
+Description=Arvados websocket server
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/ws/ws.yml
+
+[Service]
+Type=notify
+ExecStart=/usr/bin/arvados-ws
+Restart=always
+
+[Install]
+WantedBy=multi-user.target
diff --git a/services/ws/config.go b/services/ws/config.go
new file mode 100644 (file)
index 0000000..0faa863
--- /dev/null
@@ -0,0 +1,40 @@
+package main
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type wsConfig struct {
+       Client    arvados.Client
+       Postgres  pgConfig
+       Listen    string
+       LogLevel  string
+       LogFormat string
+
+       PingTimeout      arvados.Duration
+       ClientEventQueue int
+       ServerEventQueue int
+}
+
+func defaultConfig() wsConfig {
+       return wsConfig{
+               Client: arvados.Client{
+                       APIHost: "localhost:443",
+               },
+               Postgres: pgConfig{
+                       "dbname":          "arvados_production",
+                       "user":            "arvados",
+                       "password":        "xyzzy",
+                       "host":            "localhost",
+                       "connect_timeout": "30",
+                       "sslmode":         "require",
+               },
+               LogLevel:         "info",
+               LogFormat:        "json",
+               PingTimeout:      arvados.Duration(time.Minute),
+               ClientEventQueue: 64,
+               ServerEventQueue: 4,
+       }
+}
diff --git a/services/ws/doc.go b/services/ws/doc.go
new file mode 100644 (file)
index 0000000..7ccb588
--- /dev/null
@@ -0,0 +1,55 @@
+// Arvados-ws exposes Arvados APIs (currently just one, the
+// cache-invalidation event feed at "ws://.../websocket") to
+// websocket clients.
+//
+// Installation
+//
+// See https://doc.arvados.org/install/install-ws.html.
+//
+// Developer info
+//
+// See https://dev.arvados.org/projects/arvados/wiki/Hacking_websocket_server.
+//
+// Usage
+//
+//     arvados-ws [-config /etc/arvados/ws/ws.yml] [-dump-config]
+//
+// Minimal configuration
+//
+//     Client:
+//       APIHost: localhost:443
+//     Listen: ":1234"
+//     Postgres:
+//       dbname: arvados_production
+//       host: localhost
+//       password: xyzzy
+//       user: arvados
+//
+// Options
+//
+// -config path
+//
+// Load configuration from the given file instead of the default
+// /etc/arvados/ws/ws.yml
+//
+// -dump-config
+//
+// Print the loaded configuration to stdout and exit.
+//
+// Logs
+//
+// Logs are printed to stderr, formatted as JSON.
+//
+// A log is printed each time a client connects or disconnects.
+//
+// Enable additional logs by configuring:
+//
+//     LogLevel: debug
+//
+// Runtime status
+//
+// GET /debug.json responds with debug stats.
+//
+// GET /status.json responds with health check results and
+// activity/usage metrics.
+package main
diff --git a/services/ws/event.go b/services/ws/event.go
new file mode 100644 (file)
index 0000000..304f86b
--- /dev/null
@@ -0,0 +1,65 @@
+package main
+
+import (
+       "database/sql"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/ghodss/yaml"
+)
+
+type eventSink interface {
+       Channel() <-chan *event
+       Stop()
+}
+
+type eventSource interface {
+       NewSink() eventSink
+       DB() *sql.DB
+}
+
+type event struct {
+       LogID    uint64
+       Received time.Time
+       Ready    time.Time
+       Serial   uint64
+
+       db     *sql.DB
+       logRow *arvados.Log
+       err    error
+       mtx    sync.Mutex
+}
+
+// Detail returns the database row corresponding to the event. It can
+// be called safely from multiple goroutines. Only one attempt will be
+// made. If the database row cannot be retrieved, Detail returns nil.
+func (e *event) Detail() *arvados.Log {
+       e.mtx.Lock()
+       defer e.mtx.Unlock()
+       if e.logRow != nil || e.err != nil {
+               return e.logRow
+       }
+       var logRow arvados.Log
+       var propYAML []byte
+       e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), event_at, created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
+               &logRow.ID,
+               &logRow.UUID,
+               &logRow.ObjectUUID,
+               &logRow.ObjectOwnerUUID,
+               &logRow.EventType,
+               &logRow.EventAt,
+               &logRow.CreatedAt,
+               &propYAML)
+       if e.err != nil {
+               logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
+               return nil
+       }
+       e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
+       if e.err != nil {
+               logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
+               return nil
+       }
+       e.logRow = &logRow
+       return e.logRow
+}
diff --git a/services/ws/event_source.go b/services/ws/event_source.go
new file mode 100644 (file)
index 0000000..ea90ec7
--- /dev/null
@@ -0,0 +1,216 @@
+package main
+
+import (
+       "database/sql"
+       "strconv"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/stats"
+       "github.com/lib/pq"
+)
+
+type pgConfig map[string]string
+
+func (c pgConfig) ConnectionString() string {
+       s := ""
+       for k, v := range c {
+               s += k
+               s += "='"
+               s += strings.Replace(
+                       strings.Replace(v, `\`, `\\`, -1),
+                       `'`, `\'`, -1)
+               s += "' "
+       }
+       return s
+}
+
+type pgEventSource struct {
+       DataSource string
+       QueueSize  int
+
+       db         *sql.DB
+       pqListener *pq.Listener
+       queue      chan *event
+       sinks      map[*pgEventSink]bool
+       setupOnce  sync.Once
+       mtx        sync.Mutex
+       shutdown   chan error
+
+       lastQDelay time.Duration
+       eventsIn   uint64
+       eventsOut  uint64
+}
+
+var _ debugStatuser = (*pgEventSource)(nil)
+
+func (ps *pgEventSource) setup() {
+       ps.shutdown = make(chan error, 1)
+       ps.sinks = make(map[*pgEventSink]bool)
+
+       db, err := sql.Open("postgres", ps.DataSource)
+       if err != nil {
+               logger(nil).WithError(err).Fatal("sql.Open failed")
+       }
+       if err = db.Ping(); err != nil {
+               logger(nil).WithError(err).Fatal("db.Ping failed")
+       }
+       ps.db = db
+
+       ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+               if err != nil {
+                       // Until we have a mechanism for catching up
+                       // on missed events, we cannot recover from a
+                       // dropped connection without breaking our
+                       // promises to clients.
+                       logger(nil).WithError(err).Error("listener problem")
+                       ps.shutdown <- err
+               }
+       })
+       err = ps.pqListener.Listen("logs")
+       if err != nil {
+               logger(nil).WithError(err).Fatal("pq Listen failed")
+       }
+       logger(nil).Debug("pgEventSource listening")
+
+       go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+       ps.queue = make(chan *event, ps.QueueSize)
+
+       go func() {
+               for e := range ps.queue {
+                       // Wait for the "select ... from logs" call to
+                       // finish. This limits max concurrent queries
+                       // to ps.QueueSize. Without this, max
+                       // concurrent queries would be bounded by
+                       // client_count X client_queue_size.
+                       e.Detail()
+
+                       logger(nil).
+                               WithField("serial", e.Serial).
+                               WithField("detail", e.Detail()).
+                               Debug("event ready")
+                       e.Ready = time.Now()
+                       ps.lastQDelay = e.Ready.Sub(e.Received)
+
+                       ps.mtx.Lock()
+                       atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
+                       for sink := range ps.sinks {
+                               sink.channel <- e
+                       }
+                       ps.mtx.Unlock()
+               }
+       }()
+
+       var serial uint64
+       ticker := time.NewTicker(time.Minute)
+       defer ticker.Stop()
+       for {
+               select {
+               case err, ok := <-ps.shutdown:
+                       if ok {
+                               logger(nil).WithError(err).Info("shutdown")
+                       }
+                       close(ps.queue)
+                       return
+
+               case <-ticker.C:
+                       logger(nil).Debug("listener ping")
+                       ps.pqListener.Ping()
+
+               case pqEvent, ok := <-ps.pqListener.Notify:
+                       if !ok {
+                               close(ps.queue)
+                               return
+                       }
+                       if pqEvent.Channel != "logs" {
+                               continue
+                       }
+                       logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+                       if err != nil {
+                               logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
+                               continue
+                       }
+                       serial++
+                       e := &event{
+                               LogID:    logID,
+                               Received: time.Now(),
+                               Serial:   serial,
+                               db:       ps.db,
+                       }
+                       logger(nil).WithField("event", e).Debug("incoming")
+                       atomic.AddUint64(&ps.eventsIn, 1)
+                       ps.queue <- e
+                       go e.Detail()
+               }
+       }
+}
+
+// NewSink subscribes to the event source. NewSink returns an
+// eventSink, whose Channel() method returns a channel: a pointer to
+// each subsequent event will be sent to that channel.
+//
+// The caller must ensure events are received from the sink channel as
+// quickly as possible because when one sink stops being ready, all
+// other sinks block.
+func (ps *pgEventSource) NewSink() eventSink {
+       ps.setupOnce.Do(ps.setup)
+       sink := &pgEventSink{
+               channel: make(chan *event, 1),
+               source:  ps,
+       }
+       ps.mtx.Lock()
+       ps.sinks[sink] = true
+       ps.mtx.Unlock()
+       return sink
+}
+
+func (ps *pgEventSource) DB() *sql.DB {
+       ps.setupOnce.Do(ps.setup)
+       return ps.db
+}
+
+func (ps *pgEventSource) DebugStatus() interface{} {
+       ps.mtx.Lock()
+       defer ps.mtx.Unlock()
+       blocked := 0
+       for sink := range ps.sinks {
+               blocked += len(sink.channel)
+       }
+       return map[string]interface{}{
+               "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
+               "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
+               "Queue":        len(ps.queue),
+               "QueueLimit":   cap(ps.queue),
+               "QueueDelay":   stats.Duration(ps.lastQDelay),
+               "Sinks":        len(ps.sinks),
+               "SinksBlocked": blocked,
+       }
+}
+
+type pgEventSink struct {
+       channel chan *event
+       source  *pgEventSource
+}
+
+func (sink *pgEventSink) Channel() <-chan *event {
+       return sink.channel
+}
+
+func (sink *pgEventSink) Stop() {
+       go func() {
+               // Ensure this sink cannot fill up and block the
+               // server-side queue (which otherwise could in turn
+               // block our mtx.Lock() here)
+               for _ = range sink.channel {
+               }
+       }()
+       sink.source.mtx.Lock()
+       delete(sink.source.sinks, sink)
+       sink.source.mtx.Unlock()
+       close(sink.channel)
+}
diff --git a/services/ws/handler.go b/services/ws/handler.go
new file mode 100644 (file)
index 0000000..7229190
--- /dev/null
@@ -0,0 +1,235 @@
+package main
+
+import (
+       "context"
+       "io"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
+)
+
+type handler struct {
+       Client      arvados.Client
+       PingTimeout time.Duration
+       QueueSize   int
+
+       mtx       sync.Mutex
+       lastDelay map[chan interface{}]stats.Duration
+       setupOnce sync.Once
+}
+
+type handlerStats struct {
+       QueueDelayNs time.Duration
+       WriteDelayNs time.Duration
+       EventBytes   uint64
+       EventCount   uint64
+}
+
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
+       h.setupOnce.Do(h.setup)
+
+       ctx, cancel := context.WithCancel(ws.Request().Context())
+       defer cancel()
+       log := logger(ctx)
+
+       incoming := eventSource.NewSink()
+       defer incoming.Stop()
+
+       queue := make(chan interface{}, h.QueueSize)
+       h.mtx.Lock()
+       h.lastDelay[queue] = 0
+       h.mtx.Unlock()
+       defer func() {
+               h.mtx.Lock()
+               delete(h.lastDelay, queue)
+               h.mtx.Unlock()
+       }()
+
+       sess, err := newSession(ws, queue)
+       if err != nil {
+               log.WithError(err).Error("newSession failed")
+               return
+       }
+
+       // Receive websocket frames from the client and pass them to
+       // sess.Receive().
+       go func() {
+               buf := make([]byte, 2<<20)
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       default:
+                       }
+                       ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
+                       n, err := ws.Read(buf)
+                       buf := buf[:n]
+                       log.WithField("frame", string(buf[:n])).Debug("received frame")
+                       if err == nil && n == cap(buf) {
+                               err = errFrameTooBig
+                       }
+                       if err != nil {
+                               if err != io.EOF {
+                                       log.WithError(err).Info("read error")
+                               }
+                               cancel()
+                               return
+                       }
+                       err = sess.Receive(buf)
+                       if err != nil {
+                               log.WithError(err).Error("sess.Receive() failed")
+                               cancel()
+                               return
+                       }
+               }
+       }()
+
+       // Take items from the outgoing queue, serialize them using
+       // sess.EventMessage() as needed, and send them to the client
+       // as websocket frames.
+       go func() {
+               for {
+                       var ok bool
+                       var data interface{}
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case data, ok = <-queue:
+                               if !ok {
+                                       return
+                               }
+                       }
+                       var e *event
+                       var buf []byte
+                       var err error
+                       log := log
+
+                       switch data := data.(type) {
+                       case []byte:
+                               buf = data
+                       case *event:
+                               e = data
+                               log = log.WithField("serial", e.Serial)
+                               buf, err = sess.EventMessage(e)
+                               if err != nil {
+                                       log.WithError(err).Error("EventMessage failed")
+                                       cancel()
+                                       break
+                               } else if len(buf) == 0 {
+                                       log.Debug("skip")
+                                       continue
+                               }
+                       default:
+                               log.WithField("data", data).Error("bad object in client queue")
+                               continue
+                       }
+
+                       log.WithField("frame", string(buf)).Debug("send event")
+                       ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+                       t0 := time.Now()
+                       _, err = ws.Write(buf)
+                       if err != nil {
+                               log.WithError(err).Error("write failed")
+                               cancel()
+                               break
+                       }
+                       log.Debug("sent")
+
+                       if e != nil {
+                               hStats.QueueDelayNs += t0.Sub(e.Ready)
+                               h.mtx.Lock()
+                               h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
+                               h.mtx.Unlock()
+                       }
+                       hStats.WriteDelayNs += time.Since(t0)
+                       hStats.EventBytes += uint64(len(buf))
+                       hStats.EventCount++
+               }
+       }()
+
+       // Filter incoming events against the current subscription
+       // list, and forward matching events to the outgoing message
+       // queue. Close the queue and return when the request context
+       // is done/cancelled or the incoming event stream ends. Shut
+       // down the handler if the outgoing queue fills up.
+       go func() {
+               ticker := time.NewTicker(h.PingTimeout)
+               defer ticker.Stop()
+
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case <-ticker.C:
+                               // If the outgoing queue is empty,
+                               // send an empty message. This can
+                               // help detect a disconnected network
+                               // socket, and prevent an idle socket
+                               // from being closed.
+                               if len(queue) == 0 {
+                                       select {
+                                       case queue <- []byte(`{}`):
+                                       default:
+                                       }
+                               }
+                               continue
+                       case e, ok := <-incoming.Channel():
+                               if !ok {
+                                       cancel()
+                                       return
+                               }
+                               if !sess.Filter(e) {
+                                       continue
+                               }
+                               select {
+                               case queue <- e:
+                               default:
+                                       log.WithError(errQueueFull).Error("terminate")
+                                       cancel()
+                                       return
+                               }
+                       }
+               }
+       }()
+
+       <-ctx.Done()
+       return
+}
+
+func (h *handler) DebugStatus() interface{} {
+       h.mtx.Lock()
+       defer h.mtx.Unlock()
+
+       var s struct {
+               QueueCount    int
+               QueueMin      int
+               QueueMax      int
+               QueueTotal    uint64
+               QueueDelayMin stats.Duration
+               QueueDelayMax stats.Duration
+       }
+       for q, lastDelay := range h.lastDelay {
+               s.QueueCount++
+               n := len(q)
+               s.QueueTotal += uint64(n)
+               if s.QueueMax < n {
+                       s.QueueMax = n
+               }
+               if s.QueueMin > n || s.QueueCount == 1 {
+                       s.QueueMin = n
+               }
+               if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
+                       s.QueueDelayMin = lastDelay
+               }
+               if s.QueueDelayMax < lastDelay {
+                       s.QueueDelayMax = lastDelay
+               }
+       }
+       return &s
+}
+
+func (h *handler) setup() {
+       h.lastDelay = make(map[chan interface{}]stats.Duration)
+}
diff --git a/services/ws/main.go b/services/ws/main.go
new file mode 100644 (file)
index 0000000..7c3625b
--- /dev/null
@@ -0,0 +1,67 @@
+package main
+
+import (
+       "flag"
+       "fmt"
+       "net/http"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "github.com/coreos/go-systemd/daemon"
+)
+
+var logger = ctxlog.FromContext
+
+func main() {
+       log := logger(nil)
+
+       configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+       dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
+       cfg := defaultConfig()
+       flag.Parse()
+
+       err := config.LoadFile(&cfg, *configPath)
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       ctxlog.SetLevel(cfg.LogLevel)
+       ctxlog.SetFormat(cfg.LogFormat)
+
+       if *dumpConfig {
+               txt, err := config.Dump(&cfg)
+               if err != nil {
+                       log.Fatal(err)
+               }
+               fmt.Print(string(txt))
+               return
+       }
+
+       log.Info("started")
+       eventSource := &pgEventSource{
+               DataSource: cfg.Postgres.ConnectionString(),
+               QueueSize:  cfg.ServerEventQueue,
+       }
+       srv := &http.Server{
+               Addr:           cfg.Listen,
+               ReadTimeout:    time.Minute,
+               WriteTimeout:   time.Minute,
+               MaxHeaderBytes: 1 << 20,
+               Handler: &router{
+                       Config:         &cfg,
+                       eventSource:    eventSource,
+                       newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
+               },
+       }
+       // Bootstrap the eventSource by attaching a dummy subscriber
+       // and hanging up.
+       eventSource.NewSink().Stop()
+
+       if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+               log.WithError(err).Warn("error notifying init daemon")
+       }
+
+       log.WithField("Listen", srv.Addr).Info("listening")
+       log.Fatal(srv.ListenAndServe())
+}
diff --git a/services/ws/permission.go b/services/ws/permission.go
new file mode 100644 (file)
index 0000000..e467e06
--- /dev/null
@@ -0,0 +1,94 @@
+package main
+
+import (
+       "net/http"
+       "net/url"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+const (
+       maxPermCacheAge = time.Hour
+       minPermCacheAge = 5 * time.Minute
+)
+
+type permChecker interface {
+       SetToken(token string)
+       Check(uuid string) (bool, error)
+}
+
+func newPermChecker(ac arvados.Client) permChecker {
+       ac.AuthToken = ""
+       return &cachingPermChecker{
+               Client:     &ac,
+               cache:      make(map[string]cacheEnt),
+               maxCurrent: 16,
+       }
+}
+
+type cacheEnt struct {
+       time.Time
+       allowed bool
+}
+
+type cachingPermChecker struct {
+       *arvados.Client
+       cache      map[string]cacheEnt
+       maxCurrent int
+}
+
+func (pc *cachingPermChecker) SetToken(token string) {
+       pc.Client.AuthToken = token
+}
+
+func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+       logger := logger(nil).
+               WithField("token", pc.Client.AuthToken).
+               WithField("uuid", uuid)
+       pc.tidy()
+       now := time.Now()
+       if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
+               logger.WithField("allowed", perm.allowed).Debug("cache hit")
+               return perm.allowed, nil
+       }
+       var buf map[string]interface{}
+       path, err := pc.PathForUUID("get", uuid)
+       if err != nil {
+               return false, err
+       }
+       err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
+               "select": {`["uuid"]`},
+       })
+
+       var allowed bool
+       if err == nil {
+               allowed = true
+       } else if txErr, ok := err.(*arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+               allowed = false
+       } else if txErr.StatusCode == http.StatusForbidden {
+               // Some requests are expressly forbidden for reasons
+               // other than "you aren't allowed to know whether this
+               // UUID exists" (404).
+               allowed = false
+       } else {
+               logger.WithError(err).Error("lookup error")
+               return false, err
+       }
+       logger.WithField("allowed", allowed).Debug("cache miss")
+       pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
+       return allowed, nil
+}
+
+func (pc *cachingPermChecker) tidy() {
+       if len(pc.cache) <= pc.maxCurrent*2 {
+               return
+       }
+       tooOld := time.Now().Add(-minPermCacheAge)
+       for uuid, t := range pc.cache {
+               if t.Before(tooOld) {
+                       delete(pc.cache, uuid)
+               }
+       }
+       pc.maxCurrent = len(pc.cache)
+}
diff --git a/services/ws/router.go b/services/ws/router.go
new file mode 100644 (file)
index 0000000..15b825f
--- /dev/null
@@ -0,0 +1,140 @@
+package main
+
+import (
+       "encoding/json"
+       "io"
+       "net/http"
+       "strconv"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "github.com/Sirupsen/logrus"
+       "golang.org/x/net/websocket"
+)
+
+type wsConn interface {
+       io.ReadWriter
+       Request() *http.Request
+       SetReadDeadline(time.Time) error
+       SetWriteDeadline(time.Time) error
+}
+
+type router struct {
+       Config         *wsConfig
+       eventSource    eventSource
+       newPermChecker func() permChecker
+
+       handler   *handler
+       mux       *http.ServeMux
+       setupOnce sync.Once
+
+       lastReqID  int64
+       lastReqMtx sync.Mutex
+
+       status routerDebugStatus
+}
+
+type routerDebugStatus struct {
+       ReqsReceived int64
+       ReqsActive   int64
+}
+
+type debugStatuser interface {
+       DebugStatus() interface{}
+}
+
+func (rtr *router) setup() {
+       rtr.handler = &handler{
+               PingTimeout: rtr.Config.PingTimeout.Duration(),
+               QueueSize:   rtr.Config.ClientEventQueue,
+       }
+       rtr.mux = http.NewServeMux()
+       rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
+       rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
+       rtr.mux.HandleFunc("/debug.json", jsonHandler(rtr.DebugStatus))
+       rtr.mux.HandleFunc("/status.json", jsonHandler(rtr.Status))
+}
+
+func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
+       return &websocket.Server{
+               Handshake: func(c *websocket.Config, r *http.Request) error {
+                       return nil
+               },
+               Handler: websocket.Handler(func(ws *websocket.Conn) {
+                       t0 := time.Now()
+                       log := logger(ws.Request().Context())
+                       log.Info("connected")
+
+                       stats := rtr.handler.Handle(ws, rtr.eventSource,
+                               func(ws wsConn, sendq chan<- interface{}) (session, error) {
+                                       return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker(), &rtr.Config.Client)
+                               })
+
+                       log.WithFields(logrus.Fields{
+                               "elapsed": time.Now().Sub(t0).Seconds(),
+                               "stats":   stats,
+                       }).Info("disconnect")
+                       ws.Close()
+               }),
+       }
+}
+
+func (rtr *router) newReqID() string {
+       rtr.lastReqMtx.Lock()
+       defer rtr.lastReqMtx.Unlock()
+       id := time.Now().UnixNano()
+       if id <= rtr.lastReqID {
+               id = rtr.lastReqID + 1
+       }
+       return strconv.FormatInt(id, 36)
+}
+
+func (rtr *router) DebugStatus() interface{} {
+       s := map[string]interface{}{
+               "HTTP":     rtr.status,
+               "Outgoing": rtr.handler.DebugStatus(),
+       }
+       if es, ok := rtr.eventSource.(debugStatuser); ok {
+               s["EventSource"] = es.DebugStatus()
+       }
+       return s
+}
+
+func (rtr *router) Status() interface{} {
+       return map[string]interface{}{
+               "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
+       }
+}
+
+func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       rtr.setupOnce.Do(rtr.setup)
+       atomic.AddInt64(&rtr.status.ReqsReceived, 1)
+       atomic.AddInt64(&rtr.status.ReqsActive, 1)
+       defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
+
+       logger := logger(req.Context()).
+               WithField("RequestID", rtr.newReqID())
+       ctx := ctxlog.Context(req.Context(), logger)
+       req = req.WithContext(ctx)
+       logger.WithFields(logrus.Fields{
+               "remoteAddr":      req.RemoteAddr,
+               "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+       }).Info("accept request")
+       rtr.mux.ServeHTTP(resp, req)
+}
+
+func jsonHandler(fn func() interface{}) http.HandlerFunc {
+       return func(resp http.ResponseWriter, req *http.Request) {
+               logger := logger(req.Context())
+               resp.Header().Set("Content-Type", "application/json")
+               enc := json.NewEncoder(resp)
+               err := enc.Encode(fn())
+               if err != nil {
+                       msg := "encode failed"
+                       logger.WithError(err).Error(msg)
+                       http.Error(resp, msg, http.StatusInternalServerError)
+               }
+       }
+}
diff --git a/services/ws/session.go b/services/ws/session.go
new file mode 100644 (file)
index 0000000..67f4608
--- /dev/null
@@ -0,0 +1,33 @@
+package main
+
+import (
+       "database/sql"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type session interface {
+       // Receive processes a message received from the client. If a
+       // non-nil error is returned, the connection will be
+       // terminated.
+       Receive([]byte) error
+
+       // Filter returns true if the event should be queued for
+       // sending to the client. It should return as fast as
+       // possible, and must not block.
+       Filter(*event) bool
+
+       // EventMessage encodes the given event (from the front of the
+       // queue) into a form suitable to send to the client. If a
+       // non-nil error is returned, the connection is terminated. If
+       // the returned buffer is empty, nothing is sent to the client
+       // and the event is not counted in statistics.
+       //
+       // Unlike Filter, EventMessage can block without affecting
+       // other connections. If EventMessage is slow, additional
+       // incoming events will be queued. If the event queue fills
+       // up, the connection will be dropped.
+       EventMessage(*event) ([]byte, error)
+}
+
+type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker, *arvados.Client) (session, error)
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
new file mode 100644 (file)
index 0000000..44e2a1d
--- /dev/null
@@ -0,0 +1,295 @@
+package main
+
+import (
+       "database/sql"
+       "encoding/json"
+       "errors"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+)
+
+var (
+       errQueueFull   = errors.New("client queue full")
+       errFrameTooBig = errors.New("frame too big")
+
+       sendObjectAttributes = []string{"state", "name", "owner_uuid", "portable_data_hash"}
+
+       v0subscribeOK   = []byte(`{"status":200}`)
+       v0subscribeFail = []byte(`{"status":400}`)
+)
+
+type v0session struct {
+       ac            *arvados.Client
+       ws            wsConn
+       sendq         chan<- interface{}
+       db            *sql.DB
+       permChecker   permChecker
+       subscriptions []v0subscribe
+       lastMsgID     uint64
+       log           *logrus.Entry
+       mtx           sync.Mutex
+       setupOnce     sync.Once
+}
+
+// newSessionV0 returns a v0 session: a partial port of the Rails/puma
+// implementation, with just enough functionality to support Workbench
+// and arv-mount.
+func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
+       sess := &v0session{
+               sendq:       sendq,
+               ws:          ws,
+               db:          db,
+               ac:          ac,
+               permChecker: pc,
+               log:         logger(ws.Request().Context()),
+       }
+
+       err := ws.Request().ParseForm()
+       if err != nil {
+               sess.log.WithError(err).Error("ParseForm failed")
+               return nil, err
+       }
+       token := ws.Request().Form.Get("api_token")
+       sess.permChecker.SetToken(token)
+       sess.log.WithField("token", token).Debug("set token")
+
+       return sess, nil
+}
+
+func (sess *v0session) Receive(buf []byte) error {
+       var sub v0subscribe
+       if err := json.Unmarshal(buf, &sub); err != nil {
+               sess.log.WithError(err).Info("invalid message from client")
+       } else if sub.Method == "subscribe" {
+               sub.prepare(sess)
+               sess.log.WithField("sub", sub).Debug("sub prepared")
+               sess.sendq <- v0subscribeOK
+               sess.mtx.Lock()
+               sess.subscriptions = append(sess.subscriptions, sub)
+               sess.mtx.Unlock()
+               sub.sendOldEvents(sess)
+               return nil
+       } else {
+               sess.log.WithField("Method", sub.Method).Info("unknown method")
+       }
+       sess.sendq <- v0subscribeFail
+       return nil
+}
+
+func (sess *v0session) EventMessage(e *event) ([]byte, error) {
+       detail := e.Detail()
+       if detail == nil {
+               return nil, nil
+       }
+
+       ok, err := sess.permChecker.Check(detail.ObjectUUID)
+       if err != nil || !ok {
+               return nil, err
+       }
+
+       kind, _ := sess.ac.KindForUUID(detail.ObjectUUID)
+       msg := map[string]interface{}{
+               "msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
+               "id":                detail.ID,
+               "uuid":              detail.UUID,
+               "object_uuid":       detail.ObjectUUID,
+               "object_owner_uuid": detail.ObjectOwnerUUID,
+               "object_kind":       kind,
+               "event_type":        detail.EventType,
+               "event_at":          detail.EventAt,
+       }
+       if detail.Properties != nil && detail.Properties["text"] != nil {
+               msg["properties"] = detail.Properties
+       } else {
+               msgProps := map[string]map[string]interface{}{}
+               for _, ak := range []string{"old_attributes", "new_attributes"} {
+                       eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
+                       if !ok {
+                               continue
+                       }
+                       msgAttrs := map[string]interface{}{}
+                       for _, k := range sendObjectAttributes {
+                               if v, ok := eventAttrs[k]; ok {
+                                       msgAttrs[k] = v
+                               }
+                       }
+                       msgProps[ak] = msgAttrs
+               }
+               msg["properties"] = msgProps
+       }
+       return json.Marshal(msg)
+}
+
+func (sess *v0session) Filter(e *event) bool {
+       sess.mtx.Lock()
+       defer sess.mtx.Unlock()
+       for _, sub := range sess.subscriptions {
+               if sub.match(sess, e) {
+                       return true
+               }
+       }
+       return false
+}
+
+func (sub *v0subscribe) sendOldEvents(sess *v0session) {
+       if sub.LastLogID == 0 {
+               return
+       }
+       sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
+       // Here we do a "select id" query and queue an event for every
+       // log since the given ID, then use (*event)Detail() to
+       // retrieve the whole row and decide whether to send it. This
+       // approach is very inefficient if the subscriber asks for
+       // last_log_id==1, even if the filters end up matching very
+       // few events.
+       //
+       // To mitigate this, filter on "created > 10 minutes ago" when
+       // retrieving the list of old event IDs to consider.
+       rows, err := sess.db.Query(
+               `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
+               sub.LastLogID,
+               time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
+       if err != nil {
+               sess.log.WithError(err).Error("db.Query failed")
+               return
+       }
+       for rows.Next() {
+               var id uint64
+               err := rows.Scan(&id)
+               if err != nil {
+                       sess.log.WithError(err).Error("row Scan failed")
+                       continue
+               }
+               for len(sess.sendq)*2 > cap(sess.sendq) {
+                       // Ugly... but if we fill up the whole client
+                       // queue with a backlog of old events, a
+                       // single new event will overflow it and
+                       // terminate the connection, and then the
+                       // client will probably reconnect and do the
+                       // same thing all over again.
+                       time.Sleep(100 * time.Millisecond)
+               }
+               now := time.Now()
+               e := &event{
+                       LogID:    id,
+                       Received: now,
+                       Ready:    now,
+                       db:       sess.db,
+               }
+               if sub.match(sess, e) {
+                       select {
+                       case sess.sendq <- e:
+                       case <-sess.ws.Request().Context().Done():
+                               return
+                       }
+               }
+       }
+       if err := rows.Err(); err != nil {
+               sess.log.WithError(err).Error("db.Query failed")
+       }
+}
+
+type v0subscribe struct {
+       Method    string
+       Filters   []v0filter
+       LastLogID int64 `json:"last_log_id"`
+
+       funcs []func(*event) bool
+}
+
+type v0filter [3]interface{}
+
+func (sub *v0subscribe) match(sess *v0session, e *event) bool {
+       log := sess.log.WithField("LogID", e.LogID)
+       detail := e.Detail()
+       if detail == nil {
+               log.Error("match failed, no detail")
+               return false
+       }
+       log = log.WithField("funcs", len(sub.funcs))
+       for i, f := range sub.funcs {
+               if !f(e) {
+                       log.WithField("func", i).Debug("match failed")
+                       return false
+               }
+       }
+       log.Debug("match passed")
+       return true
+}
+
+func (sub *v0subscribe) prepare(sess *v0session) {
+       for _, f := range sub.Filters {
+               if len(f) != 3 {
+                       continue
+               }
+               if col, ok := f[0].(string); ok && col == "event_type" {
+                       op, ok := f[1].(string)
+                       if !ok || op != "in" {
+                               continue
+                       }
+                       arr, ok := f[2].([]interface{})
+                       if !ok {
+                               continue
+                       }
+                       var strs []string
+                       for _, s := range arr {
+                               if s, ok := s.(string); ok {
+                                       strs = append(strs, s)
+                               }
+                       }
+                       sub.funcs = append(sub.funcs, func(e *event) bool {
+                               for _, s := range strs {
+                                       if s == e.Detail().EventType {
+                                               return true
+                                       }
+                               }
+                               return false
+                       })
+               } else if ok && col == "created_at" {
+                       op, ok := f[1].(string)
+                       if !ok {
+                               continue
+                       }
+                       tstr, ok := f[2].(string)
+                       if !ok {
+                               continue
+                       }
+                       t, err := time.Parse(time.RFC3339Nano, tstr)
+                       if err != nil {
+                               sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
+                               continue
+                       }
+                       var fn func(*event) bool
+                       switch op {
+                       case ">=":
+                               fn = func(e *event) bool {
+                                       return !e.Detail().CreatedAt.Before(t)
+                               }
+                       case "<=":
+                               fn = func(e *event) bool {
+                                       return !e.Detail().CreatedAt.After(t)
+                               }
+                       case ">":
+                               fn = func(e *event) bool {
+                                       return e.Detail().CreatedAt.After(t)
+                               }
+                       case "<":
+                               fn = func(e *event) bool {
+                                       return e.Detail().CreatedAt.Before(t)
+                               }
+                       case "=":
+                               fn = func(e *event) bool {
+                                       return e.Detail().CreatedAt.Equal(t)
+                               }
+                       default:
+                               sess.log.WithField("operator", op).Info("bogus operator")
+                               continue
+                       }
+                       sub.funcs = append(sub.funcs, fn)
+               }
+       }
+}
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
new file mode 100644 (file)
index 0000000..71a1303
--- /dev/null
@@ -0,0 +1,14 @@
+package main
+
+import (
+       "database/sql"
+       "errors"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// newSessionV1 returns a v1 session -- see
+// https://dev.arvados.org/projects/arvados/wiki/Websocket_server
+func newSessionV1(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker, ac *arvados.Client) (session, error) {
+       return nil, errors.New("Not implemented")
+}