12483: Merge branch 'master' into 12483-writable-fs
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 30 Nov 2017 20:06:00 +0000 (15:06 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 30 Nov 2017 20:06:00 +0000 (15:06 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

75 files changed:
apps/workbench/app/assets/javascripts/components/search.js [moved from apps/workbench/app/assets/javascripts/components/collections.js with 65% similarity]
apps/workbench/app/assets/javascripts/components/sessions.js
apps/workbench/app/assets/javascripts/models/session_db.js
apps/workbench/app/controllers/collections_controller.rb
apps/workbench/app/controllers/search_controller.rb
apps/workbench/app/models/arvados_api_client.rb
apps/workbench/app/models/user.rb
apps/workbench/app/views/layouts/body.html.erb
apps/workbench/app/views/projects/_show_dashboard.html.erb
apps/workbench/app/views/search/index.html [moved from apps/workbench/app/views/collections/multisite.html with 66% similarity]
apps/workbench/config/routes.rb
apps/workbench/test/controllers/projects_controller_test.rb
apps/workbench/test/integration/application_layout_test.rb
build/libcloud-pin.sh
build/package-build-dockerfiles/ubuntu1204/Dockerfile [deleted file]
build/package-test-dockerfiles/ubuntu1204/Dockerfile [deleted file]
build/run-build-packages-all-targets.sh
build/run-tests.sh
doc/_config.yml
doc/user/cwl/cwl-run-options.html.textile.liquid
doc/user/topics/arv-sync-groups.html.textile.liquid [new file with mode: 0644]
docker/jobs/apt.arvados.org.list
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/setup.py
sdk/cwl/tests/arvados-tests.sh
sdk/cwl/tests/arvados-tests.yml
sdk/cwl/tests/wf/runin-wf.cwl [new file with mode: 0644]
sdk/go/asyncbuf/buf.go [new file with mode: 0644]
sdk/go/asyncbuf/buf_test.go [new file with mode: 0644]
sdk/go/httpserver/id_generator.go
sdk/go/httpserver/logger.go [new file with mode: 0644]
sdk/go/httpserver/logger_test.go [new file with mode: 0644]
sdk/go/keepclient/hashcheck.go
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/support.go
sdk/go/streamer/streamer.go [deleted file]
sdk/go/streamer/streamer_test.go [deleted file]
sdk/go/streamer/transfer.go [deleted file]
sdk/python/arvados/commands/get.py
sdk/python/tests/test_arv_get.py
services/api/Gemfile
services/api/Gemfile.lock
services/api/app/controllers/application_controller.rb
services/api/app/controllers/arvados/v1/api_client_authorizations_controller.rb
services/api/app/controllers/arvados/v1/schema_controller.rb
services/api/app/middlewares/arvados_api_token.rb
services/api/test/functional/arvados/v1/schema_controller_test.rb
services/api/test/integration/api_client_authorizations_scopes_test.rb
services/api/test/integration/reader_tokens_test.rb
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go
services/keepproxy/proxy_client.go
services/keepstore/handler_test.go
services/keepstore/keepstore.go
services/keepstore/logging_router.go [deleted file]
services/keepstore/logging_router_test.go [deleted file]
services/keepstore/s3_volume.go
services/nodemanager/arvnodeman/test/fake_driver.py
services/nodemanager/setup.py
tools/arvbox/bin/arvbox
tools/arvbox/lib/arvbox/docker/Dockerfile.base
tools/arvbox/lib/arvbox/docker/Dockerfile.demo
tools/arvbox/lib/arvbox/docker/common.sh
tools/arvbox/lib/arvbox/docker/createusers.sh
tools/arvbox/lib/arvbox/docker/service/composer/log/main/.gitstub [new file with mode: 0644]
tools/arvbox/lib/arvbox/docker/service/composer/log/run [new symlink]
tools/arvbox/lib/arvbox/docker/service/composer/run [new file with mode: 0755]
tools/arvbox/lib/arvbox/docker/service/composer/run-service [new file with mode: 0755]
tools/arvbox/lib/arvbox/docker/service/sso/run-service
tools/arvbox/lib/arvbox/docker/service/workbench/run
tools/arvbox/lib/arvbox/docker/service/workbench/run-service

similarity index 65%
rename from apps/workbench/app/assets/javascripts/components/collections.js
rename to apps/workbench/app/assets/javascripts/components/search.js
index 33fca6c7b770bf35354244e1d783de77c571cb80..2fe73193e7f116fcda6c2831c8c2250c2a266aee 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-window.CollectionsTable = {
+window.SearchResultsTable = {
     maybeLoadMore: function(dom) {
         var loader = this.loader
         if (loader.state != loader.READY)
@@ -37,6 +37,10 @@ window.CollectionsTable = {
     },
     view: function(vnode) {
         var loader = vnode.attrs.loader
+        var iconsMap = {
+            collections: m('i.fa.fa-fw.fa-archive'),
+            projects: m('i.fa.fa-fw.fa-folder'),
+        }
         return m('table.table.table-condensed', [
             m('thead', m('tr', [
                 m('th'),
@@ -48,14 +52,15 @@ window.CollectionsTable = {
                 loader.items().map(function(item) {
                     return m('tr', [
                         m('td', [
-                            // Guess workbench.{apihostport} is a
-                            // Workbench... unless the host part of
-                            // apihostport is an IPv4 or [IPv6]
-                            // address.
-                            item.session.baseURL.match('://(\\[|\\d+\\.\\d+\\.\\d+\\.\\d+[:/])') ? null :
+                            item.workbenchBaseURL() &&
                                 m('a.btn.btn-xs.btn-default', {
-                                    href: item.session.baseURL.replace('://', '://workbench.')+'collections/'+item.uuid,
-                                }, 'Show'),
+                                    'data-original-title': 'show '+item.objectType.description,
+                                    'data-placement': 'top',
+                                    'data-toggle': 'tooltip',
+                                    href: item.workbenchBaseURL()+'/'+item.objectType.wb_path+'/'+item.uuid,
+                                    // Bootstrap's tooltip feature
+                                    oncreate: function(vnode) { $(vnode.dom).tooltip() },
+                                }, iconsMap[item.objectType.wb_path]),
                         ]),
                         m('td.arvados-uuid', item.uuid),
                         m('td', item.name || '(unnamed)'),
@@ -83,7 +88,7 @@ window.CollectionsTable = {
     },
 }
 
-window.CollectionsSearch = {
+window.Search = {
     oninit: function(vnode) {
         vnode.state.sessionDB = new SessionDB()
         vnode.state.searchEntered = m.stream()
@@ -98,28 +103,53 @@ window.CollectionsSearch = {
             vnode.state.loader = new MergingLoader({
                 children: Object.keys(sessions).map(function(key) {
                     var session = sessions[key]
-                    return new MultipageLoader({
+                    var workbenchBaseURL = function() {
+                        return vnode.state.sessionDB.workbenchBaseURL(session)
+                    }
+                    var searchable_objects = [
+                        {
+                            wb_path: 'projects',
+                            api_path: 'arvados/v1/groups',
+                            filters: [['group_class', '=', 'project']],
+                            description: 'project',
+                        },
+                        {
+                            wb_path: 'collections',
+                            api_path: 'arvados/v1/collections',
+                            filters: [],
+                            description: 'collection',
+                        },
+                    ]
+                    return new MergingLoader({
                         sessionKey: key,
-                        loadFunc: function(filters) {
-                            var tsquery = to_tsquery(q)
-                            if (tsquery) {
-                                filters = filters.slice(0)
-                                filters.push(['any', '@@', tsquery])
-                            }
-                            return vnode.state.sessionDB.request(session, 'arvados/v1/collections', {
-                                data: {
-                                    filters: JSON.stringify(filters),
-                                    count: 'none',
+                        // For every session, search for every object type
+                        children: searchable_objects.map(function(obj_type) {
+                            return new MultipageLoader({
+                                sessionKey: key,
+                                loadFunc: function(filters) {
+                                    // Apply additional type dependant filters
+                                    filters = filters.concat(obj_type.filters)
+                                    var tsquery = to_tsquery(q)
+                                    if (tsquery) {
+                                        filters.push(['any', '@@', tsquery])
+                                    }
+                                    return vnode.state.sessionDB.request(session, obj_type.api_path, {
+                                        data: {
+                                            filters: JSON.stringify(filters),
+                                            count: 'none',
+                                        },
+                                    }).then(function(resp) {
+                                        resp.items.map(function(item) {
+                                            item.workbenchBaseURL = workbenchBaseURL
+                                            item.objectType = obj_type
+                                        })
+                                        return resp
+                                    })
                                 },
-                            }).then(function(resp) {
-                                resp.items.map(function(item) {
-                                    item.session = session
-                                })
-                                return resp
                             })
-                        },
+                        }),
                     })
-                })
+                }),
             })
         })
     },
@@ -142,7 +172,7 @@ window.CollectionsSearch = {
                 m('.row', [
                     m('.col-md-6', [
                         m('.input-group', [
-                            m('input#search.form-control[placeholder=Search]', {
+                            m('input#search.form-control[placeholder=Search collections and projects]', {
                                 oninput: m.withAttr('value', vnode.state.searchEntered),
                                 value: vnode.state.searchEntered(),
                             }),
@@ -164,7 +194,7 @@ window.CollectionsSearch = {
                         m('a[href="/sessions"]', 'Add/remove sites'),
                     ]),
                 ]),
-                m(CollectionsTable, {
+                m(SearchResultsTable, {
                     loader: vnode.state.loader,
                 }),
             ],
index 3d127f1714d58a98eac115eeba96d9ebf95d39d4..e7cc5055734d4edaa2144d7eb4d704ec7e737736 100644 (file)
@@ -21,8 +21,8 @@ window.SessionsTable = {
         return m('.container', [
             m('p', [
                 'You can log in to multiple Arvados sites here, then use the ',
-                m('a[href="/collections/multisite"]', 'multi-site search'),
-                ' page to search collections on all sites at once.',
+                m('a[href="/search"]', 'multi-site search'),
+                ' page to search collections and projects on all sites at once.',
             ]),
             m('table.table.table-condensed.table-hover', [
                 m('thead', m('tr', [
index 8330a68d188d9a4bd7d8947a23cbc83682dc17b5..ad9ad1878417370dfd75294e9bd9cecbe25880d1 100644 (file)
@@ -5,6 +5,7 @@
 window.SessionDB = function() {
     var db = this
     Object.assign(db, {
+        discoveryCache: {},
         loadFromLocalStorage: function() {
             try {
                 return JSON.parse(window.localStorage.getItem('sessions')) || {}
@@ -120,6 +121,35 @@ window.SessionDB = function() {
                 })
             })
         },
+        // Return the Workbench base URL advertised by the session's
+        // API server, or a reasonable guess, or (if neither strategy
+        // works out) null.
+        workbenchBaseURL: function(session) {
+            var dd = db.discoveryDoc(session)()
+            if (!dd)
+                // Don't fall back to guessing until we receive the discovery doc
+                return null
+            if (dd.workbenchUrl)
+                return dd.workbenchUrl
+            // Guess workbench.{apihostport} is a Workbench... unless
+            // the host part of apihostport is an IPv4 or [IPv6]
+            // address.
+            if (!session.baseURL.match('://(\\[|\\d+\\.\\d+\\.\\d+\\.\\d+[:/])'))
+                var wbUrl = session.baseURL.replace('://', '://workbench.')
+                // Remove the trailing slash, if it's there.
+                return wbUrl.slice(-1) == '/' ? wbUrl.slice(0, -1) : wbUrl
+            return null
+        },
+        // Return a m.stream that will get fulfilled with the
+        // discovery doc from a session's API server.
+        discoveryDoc: function(session) {
+            var cache = db.discoveryCache[session.baseURL]
+            if (!cache) {
+                db.discoveryCache[session.baseURL] = cache = m.stream()
+                m.request(session.baseURL+'discovery/v1/apis/arvados/v1/rest').then(cache)
+            }
+            return cache
+        },
         request: function(session, path, opts) {
             opts = opts || {}
             opts.headers = opts.headers || {}
index 779d95c45b874d4fec157768e19013a1c68a0022..5fcb2dc569ff6b2446c602dc26de61a069155ba2 100644 (file)
@@ -16,7 +16,7 @@ class CollectionsController < ApplicationController
   skip_around_filter(:require_thread_api_token,
                      only: [:show_file, :show_file_links])
   skip_before_filter(:find_object_by_uuid,
-                     only: [:provenance, :show_file, :show_file_links, :multisite])
+                     only: [:provenance, :show_file, :show_file_links])
   # We depend on show_file to display the user agreement:
   skip_before_filter :check_user_agreements, only: :show_file
   skip_before_filter :check_user_profile, only: :show_file
index 40e484ea062b449068923fdc8d9951f1f7adddbe..3775abd1ae9f1117926d7bde8c847fc32ad0cd60 100644 (file)
@@ -3,6 +3,8 @@
 # SPDX-License-Identifier: AGPL-3.0
 
 class SearchController < ApplicationController
+  skip_before_filter :ensure_arvados_api_exists
+
   def find_objects_for_index
     search_what = Group
     if params[:project_uuid]
index ac2fe3a97629f8ec957b6a99b21f779b9d363e63..5a8fd518d386ec89125552c9fe17730e0488d4c4 100644 (file)
@@ -82,7 +82,7 @@ class ArvadosApiClient
     @client_mtx = Mutex.new
   end
 
-  def api(resources_kind, action, data=nil, tokens={})
+  def api(resources_kind, action, data=nil, tokens={}, include_anon_token=true)
 
     profile_checkpoint
 
@@ -117,7 +117,7 @@ class ArvadosApiClient
       'reader_tokens' => ((tokens[:reader_tokens] ||
                            Thread.current[:reader_tokens] ||
                            []) +
-                          [Rails.configuration.anonymous_user_token]).to_json,
+                          (include_anon_token ? [Rails.configuration.anonymous_user_token] : [])).to_json,
     }
     if !data.nil?
       data.each do |k,v|
index 10da22db69e4540f8d7027df94b3fd7d43d98279..1f102dbf17acd3fb807110c34f4937686ebb9f2d 100644 (file)
@@ -10,7 +10,7 @@ class User < ArvadosBase
   end
 
   def self.current
-    res = arvados_api_client.api self, '/current'
+    res = arvados_api_client.api self, '/current', nil, {}, false
     arvados_api_client.unpack_api_response(res)
   end
 
index 15f654596cbb501e8e86ccb24138d986c2f625e3..c1399f2602dc151907253d080af08504a53c0875 100644 (file)
@@ -34,7 +34,7 @@ SPDX-License-Identifier: AGPL-3.0 %>
                     <%=
                        target = Rails.configuration.multi_site_search
                        if target == true
-                         target = {controller: 'collections', action: 'multisite'}
+                         target = {controller: 'search', action: 'index'}
                        end
                        link_to("Multi-site search", target, {class: 'btn btn-default'}) %>
                   </form>
index 00780c4089d00b3bbb5332e115e5de5d88f2f922..713582654fee93b3c9bd11432701e002b6fc5d0c 100644 (file)
@@ -12,24 +12,11 @@ SPDX-License-Identifier: AGPL-3.0 %>
 
   # fetch children of all the active crs in one call, if there are any
   active_crs = recent_crs.each {|cr| cr if (cr.priority.andand > 0 and cr.state != 'Final' and cr.container_uuid)}
-  active_cr_uuids = active_crs.map(&:uuid)
-  active_cr_containers = active_crs.map {|cr| cr.container_uuid}.compact.uniq
-  cr_children = {}
-  if active_cr_containers.any?
-    active_cr_containers.each { |c| cr_children[c] = []}
-    cols = ContainerRequest.columns.map(&:name) - %w(id updated_at mounts)
-    reqs = ContainerRequest.select(cols).where(requesting_container_uuid: active_cr_containers).results
-    reqs.each {|cr| cr_children[cr.requesting_container_uuid] << cr} if reqs
-  end
 
   wus = {}
   outputs = []
   recent_procs.each do |p|
-    if p.uuid.in?(active_cr_uuids)
-      wu = p.work_unit(nil, child_objects=cr_children[p.container_uuid])
-    else
-      wu = p.work_unit
-    end
+    wu = p.work_unit
 
     wus[p] = wu
     outputs << wu.outputs
@@ -129,40 +116,20 @@ SPDX-License-Identifier: AGPL-3.0 %>
             <% else %>
             <div class="dashboard-panel-info-row row-<%=wu.uuid%>">
               <div class="row">
-                <div class="col-md-6">
+                <div class="col-md-6 text-overflow-ellipsis">
                   <%= link_to_if_arvados_object p, {friendly_name: true} %>
                 </div>
-                <% if wu.is_running? %>
-                <div class="col-md-6">
-                  <div class="progress" style="margin-bottom: 0px">
-                    <% wu.progress %>
-                  </div>
-                </div>
-                <% else %>
                 <div class="col-md-2">
                   <span class="label label-<%=wu.state_bootstrap_class%>"><%=wu.state_label%></span>
                 </div>
-                <% end %>
               </div>
 
-              <%
-                children = wu.children
-                running = children.select { |c| c.state_label == "Running" }
-                queued = children.select { |c| c.state_label == "Queued" }
-              %>
-
               <div class="clearfix">
                 Started at <%= render_localized_date(wu.started_at || wu.created_at, "noseconds") %>.
                 <% wu_time = Time.now - (wu.started_at || wu.created_at) %>
                 Active for <%= render_runtime(wu_time, false) %>.
 
                 <div class="pull-right">
-                  <% running.each do |r| %>
-                    <span class="label label-<%= r.state_bootstrap_class %>"> <%= r.label || r.state_label || 'Not ready' %> </span>
-                  <% end %>
-                  <% queued.each do |q| %>
-                    <span class="label label-<%= q.state_bootstrap_class %>"> <%= q.label || r.state_label || 'Not ready' %> </span>
-                  <% end %>
                 </div>
               </div>
             </div>
similarity index 66%
rename from apps/workbench/app/views/collections/multisite.html
rename to apps/workbench/app/views/search/index.html
index 9b03f10f3dcd90f6fd14d4d730367359be620797..6bcad0b1ae2c245ccd9b65afda7d3eac373088a7 100644 (file)
@@ -2,4 +2,4 @@
 
 SPDX-License-Identifier: AGPL-3.0 -->
 
-<div data-mount-mithril="CollectionsSearch"></div>
+<div data-mount-mithril="Search"></div>
index fee49c14ce213aa2cd001252043f4cc7dfc79745..d969abd78c2b69d8de936e2a00df0c0d1f1ef0f1 100644 (file)
@@ -95,7 +95,7 @@ ArvadosWorkbench::Application.routes.draw do
     post 'remove_selected_files', on: :member
     get 'tags', on: :member
     post 'save_tags', on: :member
-    get 'multisite', on: :collection
+    get 'multisite', on: :collection, to: redirect('/search')
   end
   get('/collections/download/:uuid/:reader_token/*file' => 'collections#show_file',
       format: false)
@@ -109,7 +109,7 @@ ArvadosWorkbench::Application.routes.draw do
     get 'tab_counts', on: :member
     get 'public', on: :collection
   end
-
+  
   resources :search do
     get 'choose', :on => :collection
   end
@@ -131,9 +131,9 @@ ArvadosWorkbench::Application.routes.draw do
   match '/_health/ping', to: 'healthcheck#ping', via: [:get]
 
   get '/tests/mithril', to: 'tests#mithril'
-
+  
   get '/status', to: 'status#status'
-
+  
   # Send unroutable requests to an arbitrary controller
   # (ends up at ApplicationController#render_not_found)
   match '*a', to: 'links#render_not_found', via: [:get, :post]
index bdf983ff43f2cb2cd3fe16a90033c3a2d4987aba..61c882d9f9908ae83e22aaa6a220bfc28e8fb1f0 100644 (file)
@@ -439,7 +439,7 @@ class ProjectsControllerTest < ActionController::TestCase
         {
           fixture: 'container_requests',
           state: 'running',
-          selectors: [['div.progress', true]]
+          selectors: [['.label-info', true, 'Running']]
         },
         {
           fixture: 'pipeline_instances',
@@ -450,7 +450,7 @@ class ProjectsControllerTest < ActionController::TestCase
         {
           fixture: 'pipeline_instances',
           state: 'pipeline_in_running_state',
-          selectors: [['div.progress', true]]
+          selectors: [['.label-info', true, 'Running']]
         },
       ].each do |c|
         uuid = api_fixture(c[:fixture])[c[:state]]['uuid']
index 7692d8e5dc1b0df9a50c1dae8d258eb298e36e33..74a42877b1301f52b65e557b75bbac60165439f6 100644 (file)
@@ -260,10 +260,9 @@ class ApplicationLayoutTest < ActionDispatch::IntegrationTest
       end
 
       within('.recent-processes') do
-        assert_text 'running'
 
         within('.row-zzzzz-xvhdp-cr4runningcntnr') do
-          assert_text 'requester_for_running_cr'
+          assert_text 'running'
         end
 
         assert_text 'zzzzz-d1hrv-twodonepipeline'
index f9a0b551e9215f891f02740ed8572f38c8ebc5ed..4182575651ff91ec5e48376f7c1a2b2270b43340 100644 (file)
@@ -2,4 +2,4 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-LIBCLOUD_PIN=2.2.2.dev2
+LIBCLOUD_PIN=2.2.2.dev3
diff --git a/build/package-build-dockerfiles/ubuntu1204/Dockerfile b/build/package-build-dockerfiles/ubuntu1204/Dockerfile
deleted file mode 100644 (file)
index 1d07db7..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip build-essential unzip
-
-# Install RVM
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
-    curl -L https://get.rvm.io | bash -s stable && \
-    /usr/local/rvm/bin/rvm install 2.3 && \
-    /usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
-    /usr/local/rvm/bin/rvm-exec default gem install bundler && \
-    /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
-
-# Install golang binary
-ADD generated/go1.8.3.linux-amd64.tar.gz /usr/local/
-RUN ln -s /usr/local/go/bin/go /usr/local/bin/
-
-# Install nodejs and npm
-ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
-RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
-ENV WORKSPACE /arvados
-CMD ["/usr/local/rvm/bin/rvm-exec", "default", "bash", "/jenkins/run-build-packages.sh", "--target", "ubuntu1204"]
diff --git a/build/package-test-dockerfiles/ubuntu1204/Dockerfile b/build/package-test-dockerfiles/ubuntu1204/Dockerfile
deleted file mode 100644 (file)
index 75c0cea..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install RVM
-RUN apt-get update && \
-    apt-get -y install --no-install-recommends curl ca-certificates g++ && \
-    gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
-    curl -L https://get.rvm.io | bash -s stable && \
-    /usr/local/rvm/bin/rvm install 2.3 && \
-    /usr/local/rvm/bin/rvm alias create default ruby-2.3
-
-# udev daemon can't start in a container, so don't try.
-RUN mkdir -p /etc/udev/disabled
-
-RUN echo "deb file:///arvados/packages/ubuntu1204/ /" >>/etc/apt/sources.list
index 7dd21a363d451b473bce76fa017e5ae710fa22f1..4cba3e9a62a513c8cb18d816dab98ced7f5b5363 100755 (executable)
@@ -91,11 +91,16 @@ for dockerfile_path in $(find -name Dockerfile | grep package-build-dockerfiles)
         true
     else
         FINAL_EXITCODE=$?
+        echo
+        echo "Build packages failed for $(basename $(dirname "$dockerfile_path"))"
+        echo
     fi
 done
 
 if test $FINAL_EXITCODE != 0 ; then
+    echo
     echo "Build packages failed with code $FINAL_EXITCODE" >&2
+    echo
 fi
 
 exit $FINAL_EXITCODE
index 43e601ec07a54f55fdf84b9d6089b5f068d4eb86..365931d33281d533e88442d2beb7b9f7f454878f 100755 (executable)
@@ -100,7 +100,7 @@ sdk/go/health
 sdk/go/httpserver
 sdk/go/manifest
 sdk/go/blockdigest
-sdk/go/streamer
+sdk/go/asyncbuf
 sdk/go/stats
 sdk/go/crunchrunner
 sdk/cwl
@@ -212,6 +212,29 @@ sanity_checks() {
     echo -n 'cadaver: '
     cadaver --version | grep -w cadaver \
           || fatal "No cadaver. Try: apt-get install cadaver"
+    echo -n 'libattr1 xattr.h: '
+    find /usr/include -path '*/attr/xattr.h' | egrep --max-count=1 . \
+        || fatal "No libattr1 xattr.h. Try: apt-get install libattr1-dev"
+    echo -n 'libcurl curl.h: '
+    find /usr/include -path '*/curl/curl.h' | egrep --max-count=1 . \
+        || fatal "No libcurl curl.h. Try: apt-get install libcurl4-gnutls-dev"
+    echo -n 'libpq libpq-fe.h: '
+    find /usr/include -path '*/postgresql/libpq-fe.h' | egrep --max-count=1 . \
+        || fatal "No libpq libpq-fe.h. Try: apt-get install libpq-dev"
+    echo -n 'services/api/config/database.yml: '
+    if [[ ! -f "$WORKSPACE/services/api/config/database.yml" ]]; then
+           fatal "Please provide a database.yml file for the test suite"
+    else
+           echo "OK"
+    fi
+    echo -n 'postgresql: '
+    psql --version || fatal "No postgresql. Try: apt-get install postgresql postgresql-client-common"
+    echo -n 'phantomjs: '
+    phantomjs --version || fatal "No phantomjs. Try: apt-get install phantomjs"
+    echo -n 'xvfb: '
+    which Xvfb || fatal "No xvfb. Try: apt-get install xvfb"
+    echo -n 'graphviz: '
+    dot -V || fatal "No graphviz. Try: apt-get install graphviz"
 }
 
 rotate_logfile() {
@@ -283,6 +306,13 @@ done
 
 start_api() {
     echo 'Starting API server...'
+    if [[ ! -d "$WORKSPACE/services/api/log" ]]; then
+       mkdir -p "$WORKSPACE/services/api/log"
+    fi
+    # Remove empty api.pid file if it exists
+    if [[ -f "$WORKSPACE/tmp/api.pid" && ! -s "$WORKSPACE/tmp/api.pid" ]]; then
+       rm -f "$WORKSPACE/tmp/api.pid"
+    fi
     cd "$WORKSPACE" \
         && eval $(python sdk/python/tests/run_test_server.py start --auth admin) \
         && export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
@@ -800,7 +830,7 @@ gostuff=(
     sdk/go/health
     sdk/go/httpserver
     sdk/go/manifest
-    sdk/go/streamer
+    sdk/go/asyncbuf
     sdk/go/crunchrunner
     sdk/go/stats
     lib/crunchstat
index 3068647074ca2a82166860f47cd27bfbb0c079d7..e8a899c004c1c257ee05d36d757ac42644eca10a 100644 (file)
@@ -74,6 +74,7 @@ navbar:
       - user/topics/run-command.html.textile.liquid
       - user/reference/job-pipeline-ref.html.textile.liquid
       - user/examples/crunch-examples.html.textile.liquid
+      - user/topics/arv-sync-groups.html.textile.liquid
     - Query the metadata database:
       - user/topics/tutorial-trait-search.html.textile.liquid
     - Arvados License:
index 7598ab822e0c5852bc7409ddba21d189f19dd1dc..7f69c61feb0e445dd162a07f4d130a21f64ebfd2 100644 (file)
@@ -38,7 +38,7 @@ table(table table-bordered table-condensed).
 |==--submit-runner-ram== SUBMIT_RUNNER_RAM|RAM (in MiB) required for the workflow runner job (default 1024)|
 |==--submit-runner-image== SUBMIT_RUNNER_IMAGE|Docker image for workflow runner job, default arvados/jobs|
 |==--name== NAME|           Name to use for workflow execution instance.|
-|==--on-error {stop,continue}|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.|
+|==--on-error {stop,continue}==|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.|
 |==--enable-dev==|          Enable loading and running development versions of CWL spec.|
 |==--intermediate-output-ttl== N|If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).|
 |==--trash-intermediate==|  Immediately trash intermediate outputs on workflow success.|
diff --git a/doc/user/topics/arv-sync-groups.html.textile.liquid b/doc/user/topics/arv-sync-groups.html.textile.liquid
new file mode 100644 (file)
index 0000000..e2a42c8
--- /dev/null
@@ -0,0 +1,53 @@
+---
+layout: default
+navsection: userguide
+title: "Using arv-sync-groups"
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+The @arv-sync-groups@ tool allows to synchronize remote groups into Arvados from an external source.
+
+h1. Using arv-sync-groups
+
+This tool reads a CSV (comma-separated values) file having information about external groups and their members. When running it for the first time, it'll create a special group named 'Externally synchronized groups' meant to be the parent of all the remote groups.
+
+Every line on the file should have 2 values: a group name and a local user identifier, meaning that the named user is a member of the group. The tool will create the group if it doesn't exist, and add the user to it. If group member is not present on the input file, the account will be removed from the group.
+
+Users can be identified by their email address or username: the tool will check if every user exist on the system, and report back when not found. Groups on the other hand, are identified by their name.
+
+This tool is designed to be run periodically reading a file created by a remote auth system (ie: LDAP) dump script, applying what's included on the file as the source of truth.
+
+
+bq. NOTE: @arv-sync-groups@ needs to perform several administrative tasks on Arvados, so must be run using a superuser token
+
+h2. Options
+
+The following command line options are supported:
+
+table(table table-bordered table-condensed).
+|_. Option |_. Description |
+|==--help==|             This list of options|
+|==--parent-group-uuid==|   UUID of group to own all the externally synchronized groups|
+|==--user-id== |            Identifier to use in looking up user. One of 'email' or 'username' (Default: 'email')|
+|==--verbose==|             Log informational messages (Default: False)|
+|==--version==|             Print version and exit|
+
+h2. Examples
+
+To sync groups using the username to identify every account, reading from some @external_groups.csv@ file, the command should be called as follows:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arv-sync-groups --user-id username /path/to/external_groups.csv </span>
+</code></pre>
+</notextile>
+
+If you want to use a specific preexisting group as the parent of all the remote groups, you can do it this way:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arv-sync-groups --parent-group-uuid &lt;preexisting group UUID&gt; --user-id username /path/to/external_groups.csv </span>
+</code></pre>
+</notextile>
index 3ae6df42160b2c66025f832fa159e739aa975cb5..ae1a0862a67437e257fa1fdec2919473799ac56c 100644 (file)
@@ -1,2 +1,3 @@
 # apt.arvados.org
 deb http://apt.arvados.org/ jessie main
+deb http://apt.arvados.org/ jessie-dev main
index 5756789cb1ecd5068780ca0ae036069dbdc9af5b..d15acf767fc90cc3e594be1fca0e67ae96f034ec 100644 (file)
@@ -750,7 +750,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     arvargs.conformance_test = None
     arvargs.use_container = True
     arvargs.relax_path_checks = True
-    arvargs.validate = None
     arvargs.print_supported_versions = False
 
     make_fs_access = partial(CollectionFsAccess,
index fdf506effa68aeb8ab5b5267fef8f2ee937d51a9..316af0e52847d2caad516ef1b6d126577b948e92 100644 (file)
@@ -18,7 +18,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 import ruamel.yaml as yaml
 
 from .runner import upload_dependencies, packed_workflow, upload_workflow_collection, trim_anonymous_location, remove_redundant_fields
-from .pathmapper import trim_listing
+from .pathmapper import ArvPathMapper, trim_listing
 from .arvtool import ArvadosCommandTool
 from .perf import Perf
 
@@ -110,8 +110,17 @@ class ArvadosWorkflow(Workflow):
                                         False)
 
             with Perf(metrics, "subworkflow adjust"):
+                joborder_resolved = copy.deepcopy(joborder)
                 joborder_keepmount = copy.deepcopy(joborder)
 
+                reffiles = []
+                visit_class(joborder_keepmount, ("File", "Directory"), lambda x: reffiles.append(x))
+
+                mapper = ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+                                 "/keep/%s",
+                                 "/keep/%s/%s",
+                                 **kwargs)
+
                 def keepmount(obj):
                     remove_redundant_fields(obj)
                     with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
@@ -119,7 +128,7 @@ class ArvadosWorkflow(Workflow):
                             raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
                     with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
                         if obj["location"].startswith("keep:"):
-                            obj["location"] = "/keep/" + obj["location"][5:]
+                            obj["location"] = mapper.mapper(obj["location"]).target
                             if "listing" in obj:
                                 del obj["listing"]
                         elif obj["location"].startswith("_:"):
@@ -129,6 +138,12 @@ class ArvadosWorkflow(Workflow):
 
                 visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
 
+                def resolved(obj):
+                    if obj["location"].startswith("keep:"):
+                        obj["location"] = mapper.mapper(obj["location"]).resolved
+
+                visit_class(joborder_resolved, ("File", "Directory"), resolved)
+
                 if self.wf_pdh is None:
                     adjustFileObjs(packed, keepmount)
                     adjustDirObjs(packed, keepmount)
@@ -159,6 +174,6 @@ class ArvadosWorkflow(Workflow):
             })
             kwargs["loader"] = self.doc_loader
             kwargs["avsc_names"] = self.doc_schema
-            return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder, output_callback, **kwargs)
+            return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
         else:
             return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
index 9e237952e5692410ae4047a546d3c9bce3fbd1e4..577295c4fc645cc4fdb2531a1b49c7beaf4b5436 100644 (file)
@@ -52,7 +52,7 @@ setup(name='arvados-cwl-runner',
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
           'cwltool==1.0.20170928192020',
-          'schema-salad==2.6.20170927145003',
+          'schema-salad==2.6.20171116190026',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',
           'arvados-python-client>=0.1.20170526013812',
index f7cebd4249acf5e5af7a7033100896a760720fbb..26b31615ea41950a7f3f13535571b2755981eb7a 100755 (executable)
@@ -4,6 +4,6 @@
 # SPDX-License-Identifier: Apache-2.0
 
 if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
-    arv-put --portable-data-hash testdir
+    arv-put --portable-data-hash testdir/*
 fi
 exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum
index a0de23694c5c11298145804b04c500c42ed996ed..f6271b85d2f9acab6f6371e3def545b1473cd8d7 100644 (file)
     out: null
   tool: 12418-glob-empty-collection.cwl
   doc: "Test glob output on empty collection"
+
+- job: null
+  output:
+    out: out
+  tool: wf/runin-wf.cwl
+  doc: "RunInSingleContainer cwl.input.json needs to be consistent with pathmapper manipulations"
diff --git a/sdk/cwl/tests/wf/runin-wf.cwl b/sdk/cwl/tests/wf/runin-wf.cwl
new file mode 100644 (file)
index 0000000..a192b86
--- /dev/null
@@ -0,0 +1,64 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: Workflow
+cwlVersion: v1.0
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+inputs:
+  sleeptime:
+    type: int
+    default: 5
+  fileblub:
+    type: File
+    default:
+      class: File
+      location: keep:d7514270f356df848477718d58308cc4+94/a
+      secondaryFiles:
+        - class: File
+          location: keep:d7514270f356df848477718d58308cc4+94/b
+outputs:
+  out:
+    type: string
+    outputSource: substep/out
+requirements:
+  SubworkflowFeatureRequirement: {}
+  ScatterFeatureRequirement: {}
+  InlineJavascriptRequirement: {}
+  StepInputExpressionRequirement: {}
+steps:
+  substep:
+    in:
+      sleeptime: sleeptime
+      fileblub: fileblub
+    out: [out]
+    hints:
+      - class: arv:RunInSingleContainer
+    run:
+      class: Workflow
+      id: mysub
+      inputs:
+        fileblub: File
+      outputs:
+        out:
+          type: string
+          outputSource: sleep1/out
+      steps:
+        sleep1:
+          in:
+            fileblub: fileblub
+          out: [out]
+          run:
+            class: CommandLineTool
+            id: subtool
+            inputs:
+              fileblub:
+                type: File
+                inputBinding: {position: 1}
+            outputs:
+              out:
+                type: string
+                outputBinding:
+                  outputEval: "out"
+            baseCommand: cat
diff --git a/sdk/go/asyncbuf/buf.go b/sdk/go/asyncbuf/buf.go
new file mode 100644 (file)
index 0000000..05af02f
--- /dev/null
@@ -0,0 +1,108 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+       "bytes"
+       "io"
+       "sync"
+)
+
+// A Buffer is an io.Writer that distributes written data
+// asynchronously to multiple concurrent readers.
+//
+// NewReader() can be called at any time. In all cases, every returned
+// io.Reader reads all data written to the Buffer.
+//
+// Behavior is undefined if Write is called after Close or
+// CloseWithError.
+type Buffer interface {
+       io.WriteCloser
+
+       // NewReader() returns an io.Reader that reads all data
+       // written to the Buffer.
+       NewReader() io.Reader
+
+       // Close, but return the given error (instead of io.EOF) to
+       // all readers when they reach the end of the buffer.
+       //
+       // CloseWithError(nil) is equivalent to
+       // CloseWithError(io.EOF).
+       CloseWithError(error) error
+}
+
+type buffer struct {
+       data *bytes.Buffer
+       cond sync.Cond
+       err  error // nil if there might be more writes
+}
+
+// NewBuffer creates a new Buffer using buf as its initial
+// contents. The new Buffer takes ownership of buf, and the caller
+// should not use buf after this call.
+func NewBuffer(buf []byte) Buffer {
+       return &buffer{
+               data: bytes.NewBuffer(buf),
+               cond: sync.Cond{L: &sync.Mutex{}},
+       }
+}
+
+func (b *buffer) Write(p []byte) (int, error) {
+       defer b.cond.Broadcast()
+       b.cond.L.Lock()
+       defer b.cond.L.Unlock()
+       if b.err != nil {
+               return 0, b.err
+       }
+       return b.data.Write(p)
+}
+
+func (b *buffer) Close() error {
+       return b.CloseWithError(nil)
+}
+
+func (b *buffer) CloseWithError(err error) error {
+       defer b.cond.Broadcast()
+       b.cond.L.Lock()
+       defer b.cond.L.Unlock()
+       if err == nil {
+               b.err = io.EOF
+       } else {
+               b.err = err
+       }
+       return nil
+}
+
+func (b *buffer) NewReader() io.Reader {
+       return &reader{b: b}
+}
+
+type reader struct {
+       b    *buffer
+       read int // # bytes already read
+}
+
+func (r *reader) Read(p []byte) (int, error) {
+       r.b.cond.L.Lock()
+       for {
+               switch {
+               case r.read < r.b.data.Len():
+                       buf := r.b.data.Bytes()
+                       r.b.cond.L.Unlock()
+                       n := copy(p, buf[r.read:])
+                       r.read += n
+                       return n, nil
+               case r.b.err != nil || len(p) == 0:
+                       // r.b.err != nil means we reached EOF.  And
+                       // even if we're not at EOF, there's no need
+                       // to block if len(p)==0.
+                       err := r.b.err
+                       r.b.cond.L.Unlock()
+                       return 0, err
+               default:
+                       r.b.cond.Wait()
+               }
+       }
+}
diff --git a/sdk/go/asyncbuf/buf_test.go b/sdk/go/asyncbuf/buf_test.go
new file mode 100644 (file)
index 0000000..cc742a8
--- /dev/null
@@ -0,0 +1,245 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+       "crypto/md5"
+       "errors"
+       "io"
+       "io/ioutil"
+       "math/rand"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestNoWrites(c *check.C) {
+       b := NewBuffer(nil)
+       r1 := b.NewReader()
+       r2 := b.NewReader()
+       b.Close()
+       s.checkReader(c, r1, []byte{}, nil, nil)
+       s.checkReader(c, r2, []byte{}, nil, nil)
+}
+
+func (s *Suite) TestNoReaders(c *check.C) {
+       b := NewBuffer(nil)
+       n, err := b.Write([]byte("foobar"))
+       err2 := b.Close()
+       c.Check(n, check.Equals, 6)
+       c.Check(err, check.IsNil)
+       c.Check(err2, check.IsNil)
+}
+
+func (s *Suite) TestWriteReadClose(c *check.C) {
+       done := make(chan bool, 2)
+       b := NewBuffer(nil)
+       n, err := b.Write([]byte("foobar"))
+       c.Check(n, check.Equals, 6)
+       c.Check(err, check.IsNil)
+       r1 := b.NewReader()
+       r2 := b.NewReader()
+       go s.checkReader(c, r1, []byte("foobar"), nil, done)
+       go s.checkReader(c, r2, []byte("foobar"), nil, done)
+       time.Sleep(time.Millisecond)
+       c.Check(len(done), check.Equals, 0)
+       b.Close()
+       <-done
+       <-done
+}
+
+func (s *Suite) TestPrefillWriteCloseRead(c *check.C) {
+       done := make(chan bool, 2)
+       b := NewBuffer([]byte("baz"))
+       n, err := b.Write([]byte("waz"))
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.IsNil)
+       b.Close()
+       r1 := b.NewReader()
+       go s.checkReader(c, r1, []byte("bazwaz"), nil, done)
+       r2 := b.NewReader()
+       go s.checkReader(c, r2, []byte("bazwaz"), nil, done)
+       <-done
+       <-done
+}
+
+func (s *Suite) TestWriteReadCloseRead(c *check.C) {
+       done := make(chan bool, 1)
+       b := NewBuffer(nil)
+       r1 := b.NewReader()
+       go s.checkReader(c, r1, []byte("bazwazqux"), nil, done)
+
+       b.Write([]byte("bazwaz"))
+
+       r2 := b.NewReader()
+       r2.Read(make([]byte, 3))
+
+       b.Write([]byte("qux"))
+       b.Close()
+
+       s.checkReader(c, r2, []byte("wazqux"), nil, nil)
+       <-done
+}
+
+func (s *Suite) TestReadAtEOF(c *check.C) {
+       buf := make([]byte, 8)
+
+       b := NewBuffer([]byte{1, 2, 3})
+
+       r := b.NewReader()
+       n, err := r.Read(buf)
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.IsNil)
+
+       // Reading zero bytes at EOF, but before Close(), doesn't
+       // block or error
+       done := make(chan bool)
+       go func() {
+               defer close(done)
+               n, err = r.Read(buf[:0])
+               c.Check(n, check.Equals, 0)
+               c.Check(err, check.IsNil)
+       }()
+       select {
+       case <-done:
+       case <-time.After(time.Second):
+               c.Error("timeout")
+       }
+
+       b.Close()
+
+       // Reading zero bytes after Close() returns EOF
+       n, err = r.Read(buf[:0])
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+
+       // Reading from start after Close() returns 3 bytes, then EOF
+       r = b.NewReader()
+       n, err = r.Read(buf)
+       c.Check(n, check.Equals, 3)
+       if err != nil {
+               c.Check(err, check.Equals, io.EOF)
+       }
+       n, err = r.Read(buf[:0])
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+       n, err = r.Read(buf)
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+}
+
+func (s *Suite) TestCloseWithError(c *check.C) {
+       errFake := errors.New("it's not even a real error")
+
+       done := make(chan bool, 1)
+       b := NewBuffer(nil)
+       r1 := b.NewReader()
+       go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done)
+
+       b.Write([]byte("bazwaz"))
+
+       r2 := b.NewReader()
+       r2.Read(make([]byte, 3))
+
+       b.Write([]byte("qux"))
+       b.CloseWithError(errFake)
+
+       s.checkReader(c, r2, []byte("wazqux"), errFake, nil)
+       <-done
+}
+
+// Write n*n bytes, n at a time; read them into n goroutines using
+// varying buffer sizes; compare checksums.
+func (s *Suite) TestManyReaders(c *check.C) {
+       const n = 256
+
+       b := NewBuffer(nil)
+
+       expectSum := make(chan []byte)
+       go func() {
+               hash := md5.New()
+               buf := make([]byte, n)
+               for i := 0; i < n; i++ {
+                       time.Sleep(10 * time.Nanosecond)
+                       rand.Read(buf)
+                       b.Write(buf)
+                       hash.Write(buf)
+               }
+               expectSum <- hash.Sum(nil)
+               b.Close()
+       }()
+
+       gotSum := make(chan []byte)
+       for i := 0; i < n; i++ {
+               go func(bufSize int) {
+                       got := md5.New()
+                       io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize))
+                       gotSum <- got.Sum(nil)
+               }(i + n/2)
+       }
+
+       expect := <-expectSum
+       for i := 0; i < n; i++ {
+               c.Check(expect, check.DeepEquals, <-gotSum)
+       }
+}
+
+func (s *Suite) BenchmarkOneReader(c *check.C) {
+       s.benchmarkReaders(c, 1)
+}
+
+func (s *Suite) BenchmarkManyReaders(c *check.C) {
+       s.benchmarkReaders(c, 100)
+}
+
+func (s *Suite) benchmarkReaders(c *check.C, readers int) {
+       var n int64
+       t0 := time.Now()
+
+       buf := make([]byte, 10000)
+       rand.Read(buf)
+       for i := 0; i < 10; i++ {
+               b := NewBuffer(nil)
+               go func() {
+                       for i := 0; i < c.N; i++ {
+                               b.Write(buf)
+                       }
+                       b.Close()
+               }()
+
+               var wg sync.WaitGroup
+               for i := 0; i < readers; i++ {
+                       wg.Add(1)
+                       go func() {
+                               defer wg.Done()
+                               nn, _ := io.Copy(ioutil.Discard, b.NewReader())
+                               atomic.AddInt64(&n, int64(nn))
+                       }()
+               }
+               wg.Wait()
+       }
+       c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000)
+}
+
+func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
+       buf, err := ioutil.ReadAll(r)
+       c.Check(err, check.Equals, expectError)
+       c.Check(buf, check.DeepEquals, expectData)
+       if done != nil {
+               done <- true
+       }
+}
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
index 18fd91f560227ee1c40fec2bce64126bc0c3b827..d2c3a41f2108e2bc852f56119b747a7ec9423e7a 100644 (file)
@@ -5,6 +5,8 @@
 package httpserver
 
 import (
+       "math/rand"
+       "net/http"
        "strconv"
        "sync"
        "time"
@@ -17,19 +19,34 @@ type IDGenerator struct {
        // Prefix is prepended to each returned ID.
        Prefix string
 
-       lastID int64
-       mtx    sync.Mutex
+       mtx sync.Mutex
+       src rand.Source
 }
 
 // Next returns a new ID string. It is safe to call Next from multiple
 // goroutines.
 func (g *IDGenerator) Next() string {
-       id := time.Now().UnixNano()
        g.mtx.Lock()
-       if id <= g.lastID {
-               id = g.lastID + 1
+       defer g.mtx.Unlock()
+       if g.src == nil {
+               g.src = rand.NewSource(time.Now().UnixNano())
        }
-       g.lastID = id
-       g.mtx.Unlock()
-       return g.Prefix + strconv.FormatInt(id, 36)
+       a, b := g.src.Int63(), g.src.Int63()
+       id := strconv.FormatInt(a, 36) + strconv.FormatInt(b, 36)
+       for len(id) > 20 {
+               id = id[:20]
+       }
+       return g.Prefix + id
+}
+
+// AddRequestIDs wraps an http.Handler, adding an X-Request-Id header
+// to each request that doesn't already have one.
+func AddRequestIDs(h http.Handler) http.Handler {
+       gen := &IDGenerator{Prefix: "req-"}
+       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               if req.Header.Get("X-Request-Id") == "" {
+                       req.Header.Set("X-Request-Id", gen.Next())
+               }
+               h.ServeHTTP(w, req)
+       })
 }
diff --git a/sdk/go/httpserver/logger.go b/sdk/go/httpserver/logger.go
new file mode 100644 (file)
index 0000000..decb2ff
--- /dev/null
@@ -0,0 +1,82 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+       "context"
+       "net/http"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/stats"
+       log "github.com/Sirupsen/logrus"
+)
+
+type contextKey struct {
+       name string
+}
+
+var requestTimeContextKey = contextKey{"requestTime"}
+
+// LogRequests wraps an http.Handler, logging each request and
+// response via logrus.
+func LogRequests(h http.Handler) http.Handler {
+       return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
+               w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
+               req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
+               lgr := log.WithFields(log.Fields{
+                       "RequestID":       req.Header.Get("X-Request-Id"),
+                       "remoteAddr":      req.RemoteAddr,
+                       "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+                       "reqMethod":       req.Method,
+                       "reqPath":         req.URL.Path[1:],
+                       "reqBytes":        req.ContentLength,
+               })
+               logRequest(w, req, lgr)
+               defer logResponse(w, req, lgr)
+               h.ServeHTTP(w, req)
+       })
+}
+
+func logRequest(w *responseTimer, req *http.Request, lgr *log.Entry) {
+       lgr.Info("request")
+}
+
+func logResponse(w *responseTimer, req *http.Request, lgr *log.Entry) {
+       if tStart, ok := req.Context().Value(&requestTimeContextKey).(time.Time); ok {
+               tDone := time.Now()
+               lgr = lgr.WithFields(log.Fields{
+                       "timeTotal":     stats.Duration(tDone.Sub(tStart)),
+                       "timeToStatus":  stats.Duration(w.writeTime.Sub(tStart)),
+                       "timeWriteBody": stats.Duration(tDone.Sub(w.writeTime)),
+               })
+       }
+       lgr.WithFields(log.Fields{
+               "respStatusCode": w.WroteStatus(),
+               "respStatus":     http.StatusText(w.WroteStatus()),
+               "respBytes":      w.WroteBodyBytes(),
+       }).Info("response")
+}
+
+type responseTimer struct {
+       ResponseWriter
+       wrote     bool
+       writeTime time.Time
+}
+
+func (rt *responseTimer) WriteHeader(code int) {
+       if !rt.wrote {
+               rt.wrote = true
+               rt.writeTime = time.Now()
+       }
+       rt.ResponseWriter.WriteHeader(code)
+}
+
+func (rt *responseTimer) Write(p []byte) (int, error) {
+       if !rt.wrote {
+               rt.wrote = true
+               rt.writeTime = time.Now()
+       }
+       return rt.ResponseWriter.Write(p)
+}
diff --git a/sdk/go/httpserver/logger_test.go b/sdk/go/httpserver/logger_test.go
new file mode 100644 (file)
index 0000000..bbcafa1
--- /dev/null
@@ -0,0 +1,68 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+       "bytes"
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "testing"
+       "time"
+
+       log "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestLogRequests(c *check.C) {
+       defer log.SetOutput(os.Stdout)
+       captured := &bytes.Buffer{}
+       log.SetOutput(captured)
+       log.SetFormatter(&log.JSONFormatter{
+               TimestampFormat: time.RFC3339Nano,
+       })
+       h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               w.Write([]byte("hello world"))
+       })
+       req, err := http.NewRequest("GET", "https://foo.example/bar", nil)
+       req.Header.Set("X-Forwarded-For", "1.2.3.4:12345")
+       c.Assert(err, check.IsNil)
+       resp := httptest.NewRecorder()
+       AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req)
+
+       dec := json.NewDecoder(captured)
+
+       gotReq := make(map[string]interface{})
+       err = dec.Decode(&gotReq)
+       c.Logf("%#v", gotReq)
+       c.Check(gotReq["RequestID"], check.Matches, "req-[a-z0-9]{20}")
+       c.Check(gotReq["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+       c.Check(gotReq["msg"], check.Equals, "request")
+
+       gotResp := make(map[string]interface{})
+       err = dec.Decode(&gotResp)
+       c.Logf("%#v", gotResp)
+       c.Check(gotResp["RequestID"], check.Equals, gotReq["RequestID"])
+       c.Check(gotResp["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+       c.Check(gotResp["msg"], check.Equals, "response")
+
+       c.Assert(gotResp["time"], check.FitsTypeOf, "")
+       _, err = time.Parse(time.RFC3339Nano, gotResp["time"].(string))
+       c.Check(err, check.IsNil)
+
+       for _, key := range []string{"timeToStatus", "timeWriteBody", "timeTotal"} {
+               c.Assert(gotResp[key], check.FitsTypeOf, float64(0))
+               c.Check(gotResp[key].(float64), check.Not(check.Equals), float64(0))
+       }
+}
index 726b81362ca6d4e9b1db43f6f8677111039911b3..9295c14cc24a47cd38479e19a8aa57dc91c1c42a 100644 (file)
@@ -72,19 +72,16 @@ func (this HashCheckingReader) Close() (err error) {
        _, err = io.Copy(this.Hash, this.Reader)
 
        if closer, ok := this.Reader.(io.Closer); ok {
-               err2 := closer.Close()
-               if err2 != nil && err == nil {
-                       return err2
+               closeErr := closer.Close()
+               if err == nil {
+                       err = closeErr
                }
        }
        if err != nil {
                return err
        }
-
-       sum := this.Hash.Sum(nil)
-       if fmt.Sprintf("%x", sum) != this.Check {
-               err = BadChecksum
+       if fmt.Sprintf("%x", this.Hash.Sum(nil)) != this.Check {
+               return BadChecksum
        }
-
-       return err
+       return nil
 }
index 4bc0fc5996368175c905b267aed26a2265a58003..54a4a374b991b44c5a5e51878be980a1b78f9609 100644 (file)
@@ -21,7 +21,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
+       "git.curoverse.com/arvados.git/sdk/go/asyncbuf"
 )
 
 // A Keep "block" is 64MB.
@@ -156,10 +156,12 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
                bufsize = BLOCKSIZE
        }
 
-       t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
-       defer t.Close()
-
-       return kc.putReplicas(hash, t, dataBytes)
+       buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+       go func() {
+               _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+               buf.CloseWithError(err)
+       }()
+       return kc.putReplicas(hash, buf.NewReader, dataBytes)
 }
 
 // PutHB writes a block to Keep. The hash of the bytes is given in
@@ -167,9 +169,8 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
 //
 // Return values are the same as for PutHR.
 func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
-       t := streamer.AsyncStreamFromSlice(buf)
-       defer t.Close()
-       return kc.putReplicas(hash, t, int64(len(buf)))
+       newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+       return kc.putReplicas(hash, newReader, int64(len(buf)))
 }
 
 // PutB writes a block to Keep. It computes the hash itself.
index 3ce4e7425aa273e2753ddcaab516a86ecd34d59a..055141cbe88165cbe93a79d903d0b69a3731c672 100644 (file)
@@ -5,6 +5,7 @@
 package keepclient
 
 import (
+       "bytes"
        "crypto/md5"
        "errors"
        "fmt"
@@ -20,7 +21,6 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
        . "gopkg.in/check.v1"
 )
 
@@ -172,18 +172,8 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
                make(chan string)}
 
        UploadToStubHelper(c, st,
-               func(kc *KeepClient, url string, reader io.ReadCloser,
-                       writer io.WriteCloser, upload_status chan uploadStatus) {
-
-                       tr := streamer.AsyncStreamFromReader(512, reader)
-                       defer tr.Close()
-
-                       br1 := tr.MakeStreamReader()
-
-                       go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
-
-                       writer.Write([]byte("foo"))
-                       writer.Close()
+               func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
+                       go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, 0)
 
                        <-st.handled
 
index 49ef543d872f94d169c2e76b422cffee76ef86ed..37912506a2cb6ab7c014a0edac13e922c20526d6 100644 (file)
@@ -17,7 +17,6 @@ import (
        "strings"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
 )
 
 // Function used to emit debug messages. The easiest way to enable
@@ -57,7 +56,7 @@ type uploadStatus struct {
        response        string
 }
 
-func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
        upload_status chan<- uploadStatus, expectedLength int64, requestID int32) {
 
        var req *http.Request
@@ -66,21 +65,16 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
        if req, err = http.NewRequest("PUT", url, nil); err != nil {
                DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error())
                upload_status <- uploadStatus{err, url, 0, 0, ""}
-               body.Close()
                return
        }
 
        req.ContentLength = expectedLength
        if expectedLength > 0 {
-               // Do() will close the body ReadCloser when it is done
-               // with it.
-               req.Body = body
+               req.Body = ioutil.NopCloser(body)
        } else {
-               // "For client requests, a value of 0 means unknown if Body is
-               // not nil."  In this case we do want the body to be empty, so
-               // don't set req.Body.  However, we still need to close the
-               // body ReadCloser.
-               body.Close()
+               // "For client requests, a value of 0 means unknown if
+               // Body is not nil."  In this case we do want the body
+               // to be empty, so don't set req.Body.
        }
 
        req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
@@ -121,7 +115,7 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
 
 func (this *KeepClient) putReplicas(
        hash string,
-       tr *streamer.AsyncStream,
+       getReader func() io.Reader,
        expectedLength int64) (locator string, replicas int, err error) {
 
        // Generate an arbitrary ID to identify this specific
@@ -174,7 +168,7 @@ func (this *KeepClient) putReplicas(
                                // Start some upload requests
                                if next_server < len(sv) {
                                        DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
-                                       go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
+                                       go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID)
                                        next_server += 1
                                        active += 1
                                } else {
diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go
deleted file mode 100644 (file)
index 396e311..0000000
+++ /dev/null
@@ -1,158 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* AsyncStream pulls data in from a io.Reader source (such as a file or network
-socket) and fans out to any number of StreamReader sinks.
-
-Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
-any point in the lifetime of the AsyncStream, and each StreamReader will read
-the contents of the buffer up to the "frontier" of the buffer, at which point
-the StreamReader blocks until new data is read from the source.
-
-This is useful for minimizing readthrough latency as sinks can read and act on
-data from the source without waiting for the source to be completely buffered.
-It is also useful as a cache in situations where re-reading the original source
-potentially is costly, since the buffer retains a copy of the source data.
-
-Usage:
-
-Begin reading into a buffer with maximum size 'buffersize' from 'source':
-  stream := AsyncStreamFromReader(buffersize, source)
-
-To create a new reader (this can be called multiple times, each reader starts
-at the beginning of the buffer):
-  reader := tr.MakeStreamReader()
-
-Make sure to close the reader when you're done with it.
-  reader.Close()
-
-When you're done with the stream:
-  stream.Close()
-
-Alternately, if you already have a filled buffer and just want to read out from it:
-  stream := AsyncStreamFromSlice(buf)
-
-  r := tr.MakeStreamReader()
-
-*/
-
-package streamer
-
-import (
-       "errors"
-       "io"
-)
-
-var ErrAlreadyClosed = errors.New("cannot close a stream twice")
-
-type AsyncStream struct {
-       buffer            []byte
-       requests          chan sliceRequest
-       add_reader        chan bool
-       subtract_reader   chan bool
-       wait_zero_readers chan bool
-       closed            bool
-}
-
-// Reads from the buffer managed by the Transfer()
-type StreamReader struct {
-       offset    int
-       stream    *AsyncStream
-       responses chan sliceResult
-}
-
-func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-       t := &AsyncStream{
-               buffer:            make([]byte, buffersize),
-               requests:          make(chan sliceRequest),
-               add_reader:        make(chan bool),
-               subtract_reader:   make(chan bool),
-               wait_zero_readers: make(chan bool),
-       }
-
-       go t.transfer(source)
-       go t.readersMonitor()
-
-       return t
-}
-
-func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{
-               buffer:            buf,
-               requests:          make(chan sliceRequest),
-               add_reader:        make(chan bool),
-               subtract_reader:   make(chan bool),
-               wait_zero_readers: make(chan bool),
-       }
-
-       go t.transfer(nil)
-       go t.readersMonitor()
-
-       return t
-}
-
-func (this *AsyncStream) MakeStreamReader() *StreamReader {
-       this.add_reader <- true
-       return &StreamReader{0, this, make(chan sliceResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this *StreamReader) Read(p []byte) (n int, err error) {
-       this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
-       rr, valid := <-this.responses
-       if valid {
-               this.offset += len(rr.slice)
-               return copy(p, rr.slice), rr.err
-       } else {
-               return 0, io.ErrUnexpectedEOF
-       }
-}
-
-func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
-       // Record starting offset in order to correctly report the number of bytes sent
-       starting_offset := this.offset
-       for {
-               this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
-               rr, valid := <-this.responses
-               if valid {
-                       this.offset += len(rr.slice)
-                       if rr.err != nil {
-                               if rr.err == io.EOF {
-                                       // EOF is not an error.
-                                       return int64(this.offset - starting_offset), nil
-                               } else {
-                                       return int64(this.offset - starting_offset), rr.err
-                               }
-                       } else {
-                               dest.Write(rr.slice)
-                       }
-               } else {
-                       return int64(this.offset), io.ErrUnexpectedEOF
-               }
-       }
-}
-
-// Close the responses channel
-func (this *StreamReader) Close() error {
-       if this.stream == nil {
-               return ErrAlreadyClosed
-       }
-       this.stream.subtract_reader <- true
-       close(this.responses)
-       this.stream = nil
-       return nil
-}
-
-func (this *AsyncStream) Close() error {
-       if this.closed {
-               return ErrAlreadyClosed
-       }
-       this.closed = true
-       this.wait_zero_readers <- true
-       close(this.requests)
-       close(this.add_reader)
-       close(this.subtract_reader)
-       close(this.wait_zero_readers)
-       return nil
-}
diff --git a/sdk/go/streamer/streamer_test.go b/sdk/go/streamer/streamer_test.go
deleted file mode 100644 (file)
index f8ddbf5..0000000
+++ /dev/null
@@ -1,381 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package streamer
-
-import (
-       . "gopkg.in/check.v1"
-       "io"
-       "testing"
-       "time"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) { TestingT(t) }
-
-var _ = Suite(&StandaloneSuite{})
-
-// Standalone tests
-type StandaloneSuite struct{}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
-       ReadIntoBufferHelper(c, 225)
-       ReadIntoBufferHelper(c, 224)
-}
-
-func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
-       out := make([]byte, 128)
-       for i := 0; i < 128; i += 1 {
-               out[i] = byte(i)
-       }
-       writer.Write(out)
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 128)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 128; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-}
-
-func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
-       out := make([]byte, 96)
-       for i := 0; i < 96; i += 1 {
-               out[i] = byte(i / 2)
-       }
-       writer.Write(out)
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 96)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 96; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i/2))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else if i < (128 + 96) {
-                       c.Check(buffer[i], Equals, byte((i-128)/2))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-}
-
-func ReadIntoBufferHelper(c *C, bufsize int) {
-       buffer := make([]byte, bufsize)
-
-       reader, writer := io.Pipe()
-       slices := make(chan nextSlice)
-
-       go readIntoBuffer(buffer, reader, slices)
-
-       HelperWrite128andCheck(c, buffer, writer, slices)
-       HelperWrite96andCheck(c, buffer, writer, slices)
-
-       writer.Close()
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 0)
-       c.Check(s1.reader_error, Equals, io.EOF)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
-       buffer := make([]byte, 223)
-       reader, writer := io.Pipe()
-       slices := make(chan nextSlice)
-
-       go readIntoBuffer(buffer, reader, slices)
-
-       HelperWrite128andCheck(c, buffer, writer, slices)
-
-       out := make([]byte, 96)
-       for i := 0; i < 96; i += 1 {
-               out[i] = byte(i / 2)
-       }
-
-       // Write will deadlock because it can't write all the data, so
-       // spin it off to a goroutine
-       go writer.Write(out)
-       s1 := <-slices
-
-       c.Check(len(s1.slice), Equals, 95)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 95; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i/2))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else if i < (128 + 95) {
-                       c.Check(buffer[i], Equals, byte((i-128)/2))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-
-       writer.Close()
-       s1 = <-slices
-       c.Check(len(s1.slice), Equals, 0)
-       c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(512, reader)
-
-       br1 := tr.MakeStreamReader()
-       out := make([]byte, 128)
-
-       {
-               // Write some data, and read into a buffer shorter than
-               // available data
-               for i := 0; i < 128; i += 1 {
-                       out[i] = byte(i)
-               }
-
-               writer.Write(out[:100])
-
-               in := make([]byte, 64)
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 64)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 64; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-       }
-
-       {
-               // Write some more data, and read into buffer longer than
-               // available data
-               in := make([]byte, 64)
-               n, err := br1.Read(in)
-               c.Check(n, Equals, 36)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 36; i += 1 {
-                       c.Check(in[i], Equals, out[64+i])
-               }
-
-       }
-
-       {
-               // Test read before write
-               type Rd struct {
-                       n   int
-                       err error
-               }
-               rd := make(chan Rd)
-               in := make([]byte, 64)
-
-               go func() {
-                       n, err := br1.Read(in)
-                       rd <- Rd{n, err}
-               }()
-
-               time.Sleep(100 * time.Millisecond)
-               writer.Write(out[100:])
-
-               got := <-rd
-
-               c.Check(got.n, Equals, 28)
-               c.Check(got.err, Equals, nil)
-
-               for i := 0; i < 28; i += 1 {
-                       c.Check(in[i], Equals, out[100+i])
-               }
-       }
-
-       br2 := tr.MakeStreamReader()
-       {
-               // Test 'catch up' reader
-               in := make([]byte, 256)
-               n, err := br2.Read(in)
-
-               c.Check(n, Equals, 128)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 128; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-       }
-
-       {
-               // Test closing the reader
-               writer.Close()
-
-               in := make([]byte, 256)
-               n1, err1 := br1.Read(in)
-               n2, err2 := br2.Read(in)
-               c.Check(n1, Equals, 0)
-               c.Check(err1, Equals, io.EOF)
-               c.Check(n2, Equals, 0)
-               c.Check(err2, Equals, io.EOF)
-       }
-
-       {
-               // Test 'catch up' reader after closing
-               br3 := tr.MakeStreamReader()
-               in := make([]byte, 256)
-               n, err := br3.Read(in)
-
-               c.Check(n, Equals, 128)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 128; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-
-               n, err = br3.Read(in)
-
-               c.Check(n, Equals, 0)
-               c.Check(err, Equals, io.EOF)
-       }
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(100, reader)
-       defer tr.Close()
-
-       sr := tr.MakeStreamReader()
-       defer sr.Close()
-
-       out := make([]byte, 101)
-       go writer.Write(out)
-
-       n, err := sr.Read(out)
-       c.Check(n, Equals, 100)
-       c.Check(err, IsNil)
-
-       n, err = sr.Read(out)
-       c.Check(n, Equals, 0)
-       c.Check(err, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
-       // Buffer for reads from 'r'
-       buffer := make([]byte, 100)
-       for i := 0; i < 100; i += 1 {
-               buffer[i] = byte(i)
-       }
-
-       tr := AsyncStreamFromSlice(buffer)
-
-       br1 := tr.MakeStreamReader()
-
-       in := make([]byte, 64)
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 64)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 64; i += 1 {
-                       c.Check(in[i], Equals, buffer[i])
-               }
-       }
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 36)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 36; i += 1 {
-                       c.Check(in[i], Equals, buffer[64+i])
-               }
-       }
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 0)
-               c.Check(err, Equals, io.EOF)
-       }
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
-       // Buffer for reads from 'r'
-       buffer := make([]byte, 100)
-       for i := 0; i < 100; i += 1 {
-               buffer[i] = byte(i)
-       }
-
-       tr := AsyncStreamFromSlice(buffer)
-       defer tr.Close()
-
-       br1 := tr.MakeStreamReader()
-       defer br1.Close()
-
-       reader, writer := io.Pipe()
-
-       go func() {
-               p := make([]byte, 100)
-               n, err := reader.Read(p)
-               c.Check(n, Equals, 100)
-               c.Check(err, Equals, nil)
-               c.Check(p, DeepEquals, buffer)
-       }()
-
-       io.Copy(writer, br1)
-}
-
-func (s *StandaloneSuite) TestManyReaders(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(512, reader)
-       defer tr.Close()
-
-       sr := tr.MakeStreamReader()
-       go func() {
-               time.Sleep(100 * time.Millisecond)
-               sr.Close()
-       }()
-
-       for i := 0; i < 200; i += 1 {
-               go func() {
-                       br1 := tr.MakeStreamReader()
-                       defer br1.Close()
-
-                       p := make([]byte, 3)
-                       n, err := br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("foo"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("bar"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("baz"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 0)
-                       c.Check(err, Equals, io.EOF)
-               }()
-       }
-
-       writer.Write([]byte("foo"))
-       writer.Write([]byte("bar"))
-       writer.Write([]byte("baz"))
-       writer.Close()
-}
-
-func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
-       buffer := make([]byte, 100)
-       tr := AsyncStreamFromSlice(buffer)
-       sr := tr.MakeStreamReader()
-       c.Check(sr.Close(), IsNil)
-       c.Check(sr.Close(), Equals, ErrAlreadyClosed)
-       c.Check(tr.Close(), IsNil)
-       c.Check(tr.Close(), Equals, ErrAlreadyClosed)
-}
diff --git a/sdk/go/streamer/transfer.go b/sdk/go/streamer/transfer.go
deleted file mode 100644 (file)
index bea27f8..0000000
+++ /dev/null
@@ -1,310 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* Internal implementation of AsyncStream.
-Outline of operation:
-
-The kernel is the transfer() goroutine.  It manages concurrent reads and
-appends to the "body" slice.  "body" is a slice of "source_buffer" that
-represents the segment of the buffer that is already filled in and available
-for reading.
-
-To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
-from the io.Reader source directly into source_buffer.  Each read goes into a
-slice of buffer which spans the section immediately following the end of the
-current "body".  Each time a Read completes, a slice representing the the
-section just filled in (or any read errors/EOF) is sent over the "slices"
-channel back to the transfer() function.
-
-Meanwhile, the transfer() function selects() on two channels, the "requests"
-channel and the "slices" channel.
-
-When a message is received on the "slices" channel, this means the a new
-section of the buffer has data, or an error is signaled.  Since the data has
-been read directly into the source_buffer, it is able to simply increases the
-size of the body slice to encompass the newly filled in section.  Then any
-pending reads are serviced with handleReadRequest (described below).
-
-When a message is received on the "requests" channel, it means a StreamReader
-wants access to a slice of the buffer.  This is passed to handleReadRequest().
-
-The handleReadRequest() function takes a sliceRequest consisting of a buffer
-offset, maximum size, and channel to send the response.  If there was an error
-reported from the source reader, it is returned.  If the offset is less than
-the size of the body, the request can proceed, and it sends a body slice
-spanning the segment from offset to min(offset+maxsize, end of the body).  If
-source reader status is EOF (done filling the buffer) and the read request
-offset is beyond end of the body, it responds with EOF.  Otherwise, the read
-request is for a slice beyond the current size of "body" but we expect the body
-to expand as more data is added, so the request gets added to a wait list.
-
-The transfer() runs until the requests channel is closed by AsyncStream.Close()
-
-To track readers, streamer uses the readersMonitor() goroutine.  This goroutine
-chooses which channels to receive from based on the number of outstanding
-readers.  When a new reader is created, it sends a message on the add_reader
-channel.  If the number of readers is already at MAX_READERS, this blocks the
-sender until an existing reader is closed.  When a reader is closed, it sends a
-message on the subtract_reader channel.  Finally, when AsyncStream.Close() is
-called, it sends a message on the wait_zero_readers channel, which will block
-the sender unless there are zero readers and it is safe to shut down the
-AsyncStream.
-*/
-
-package streamer
-
-import (
-       "io"
-)
-
-const MAX_READERS = 100
-
-// A slice passed from readIntoBuffer() to transfer()
-type nextSlice struct {
-       slice        []byte
-       reader_error error
-}
-
-// A read request to the Transfer() function
-type sliceRequest struct {
-       offset  int
-       maxsize int
-       result  chan<- sliceResult
-}
-
-// A read result from the Transfer() function
-type sliceResult struct {
-       slice []byte
-       err   error
-}
-
-// Supports writing into a buffer
-type bufferWriter struct {
-       buf []byte
-       ptr int
-}
-
-// Copy p into this.buf, increment pointer and return number of bytes read.
-func (this *bufferWriter) Write(p []byte) (n int, err error) {
-       n = copy(this.buf[this.ptr:], p)
-       this.ptr += n
-       return n, nil
-}
-
-// Read repeatedly from the reader and write sequentially into the specified
-// buffer, and report each read to channel 'c'.  Completes when Reader 'r'
-// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
-       defer close(slices)
-
-       if writeto, ok := r.(io.WriterTo); ok {
-               n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
-               if err != nil {
-                       slices <- nextSlice{nil, err}
-               } else {
-                       slices <- nextSlice{buffer[:n], nil}
-                       slices <- nextSlice{nil, io.EOF}
-               }
-               return
-       } else {
-               // Initially entire buffer is available
-               ptr := buffer[:]
-               for {
-                       var n int
-                       var err error
-                       if len(ptr) > 0 {
-                               const readblock = 64 * 1024
-                               // Read 64KiB into the next part of the buffer
-                               if len(ptr) > readblock {
-                                       n, err = r.Read(ptr[:readblock])
-                               } else {
-                                       n, err = r.Read(ptr)
-                               }
-                       } else {
-                               // Ran out of buffer space, try reading one more byte
-                               var b [1]byte
-                               n, err = r.Read(b[:])
-
-                               if n > 0 {
-                                       // Reader has more data but we have nowhere to
-                                       // put it, so we're stuffed
-                                       slices <- nextSlice{nil, io.ErrShortBuffer}
-                               } else {
-                                       // Return some other error (hopefully EOF)
-                                       slices <- nextSlice{nil, err}
-                               }
-                               return
-                       }
-
-                       // End on error (includes EOF)
-                       if err != nil {
-                               slices <- nextSlice{nil, err}
-                               return
-                       }
-
-                       if n > 0 {
-                               // Make a slice with the contents of the read
-                               slices <- nextSlice{ptr[:n], nil}
-
-                               // Adjust the scratch space slice
-                               ptr = ptr[n:]
-                       }
-               }
-       }
-}
-
-// Handle a read request.  Returns true if a response was sent, and false if
-// the request should be queued.
-func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
-       if (reader_status != nil) && (reader_status != io.EOF) {
-               req.result <- sliceResult{nil, reader_status}
-               return true
-       } else if req.offset < len(body) {
-               var end int
-               if req.offset+req.maxsize < len(body) {
-                       end = req.offset + req.maxsize
-               } else {
-                       end = len(body)
-               }
-               req.result <- sliceResult{body[req.offset:end], nil}
-               return true
-       } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
-               req.result <- sliceResult{nil, io.EOF}
-               return true
-       } else {
-               return false
-       }
-}
-
-// Mediates between reads and appends.
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel.  Completes
-// when 'requests' channel is closed.
-func (this *AsyncStream) transfer(source_reader io.Reader) {
-       source_buffer := this.buffer
-       requests := this.requests
-
-       // currently buffered data
-       var body []byte
-
-       // for receiving slices from readIntoBuffer
-       var slices chan nextSlice = nil
-
-       // indicates the status of the underlying reader
-       var reader_status error = nil
-
-       if source_reader != nil {
-               // 'body' is the buffer slice representing the body content read so far
-               body = source_buffer[:0]
-
-               // used to communicate slices of the buffer as they are
-               // readIntoBuffer will close 'slices' when it is done with it
-               slices = make(chan nextSlice)
-
-               // Spin it off
-               go readIntoBuffer(source_buffer, source_reader, slices)
-       } else {
-               // use the whole buffer
-               body = source_buffer[:]
-
-               // buffer is complete
-               reader_status = io.EOF
-       }
-
-       pending_requests := make([]sliceRequest, 0)
-
-       for {
-               select {
-               case req, valid := <-requests:
-                       // Handle a buffer read request
-                       if valid {
-                               if !handleReadRequest(req, body, reader_status) {
-                                       pending_requests = append(pending_requests, req)
-                               }
-                       } else {
-                               // closed 'requests' channel indicates we're done
-                               return
-                       }
-
-               case bk, valid := <-slices:
-                       // Got a new slice from the reader
-                       if valid {
-                               reader_status = bk.reader_error
-
-                               if bk.slice != nil {
-                                       // adjust body bounds now that another slice has been read
-                                       body = source_buffer[0 : len(body)+len(bk.slice)]
-                               }
-
-                               // handle pending reads
-                               n := 0
-                               for n < len(pending_requests) {
-                                       if handleReadRequest(pending_requests[n], body, reader_status) {
-                                               // move the element from the back of the slice to
-                                               // position 'n', then shorten the slice by one element
-                                               pending_requests[n] = pending_requests[len(pending_requests)-1]
-                                               pending_requests = pending_requests[0 : len(pending_requests)-1]
-                                       } else {
-
-                                               // Request wasn't handled, so keep it in the request slice
-                                               n += 1
-                                       }
-                               }
-                       } else {
-                               if reader_status == nil {
-                                       // slices channel closed without signaling EOF
-                                       reader_status = io.ErrUnexpectedEOF
-                               }
-                               slices = nil
-                       }
-               }
-       }
-}
-
-func (this *AsyncStream) readersMonitor() {
-       var readers int = 0
-
-       for {
-               if readers == 0 {
-                       select {
-                       case _, ok := <-this.wait_zero_readers:
-                               if ok {
-                                       // nothing, just implicitly unblock the sender
-                               } else {
-                                       return
-                               }
-                       case _, ok := <-this.add_reader:
-                               if ok {
-                                       readers += 1
-                               } else {
-                                       return
-                               }
-                       }
-               } else if readers > 0 && readers < MAX_READERS {
-                       select {
-                       case _, ok := <-this.add_reader:
-                               if ok {
-                                       readers += 1
-                               } else {
-                                       return
-                               }
-
-                       case _, ok := <-this.subtract_reader:
-                               if ok {
-                                       readers -= 1
-                               } else {
-                                       return
-                               }
-                       }
-               } else if readers == MAX_READERS {
-                       _, ok := <-this.subtract_reader
-                       if ok {
-                               readers -= 1
-                       } else {
-                               return
-                       }
-               }
-       }
-}
index b866bf75e2e5755d2910e697e259642083dc75bf..881fdd6ad0f968eddf3ab810d90e62df70b63895 100755 (executable)
@@ -301,7 +301,7 @@ def files_in_collection(c):
 def write_block_or_manifest(dest, src, api_client, args):
     if '+A' in src:
         # block locator
-        kc = KeepClient(api_client=api_client)
+        kc = arvados.keep.KeepClient(api_client=api_client)
         dest.write(kc.get(src, num_retries=args.retries))
     else:
         # collection UUID or portable data hash
index fc6b846c39f2ee4737b91b9638af576121297103..5aa223a2eaf7fc1c444ff296641dbd5d344a228f 100644 (file)
@@ -73,6 +73,13 @@ class ArvadosGetTestCase(run_test_server.TestCaseWithServers,
         self.assertEqual(0, r)
         self.assertEqual(b'baz', self.stdout.getvalue())
 
+    def test_get_block(self):
+        # Get raw data using a block locator
+        blk = re.search(' (acbd18\S+\+A\S+) ', self.col_manifest).group(1)
+        r = self.run_get([blk, '-'])
+        self.assertEqual(0, r)
+        self.assertEqual(b'foo', self.stdout.getvalue())
+
     def test_get_multiple_files(self):
         # Download the entire collection to the temp directory
         r = self.run_get(["{}/".format(self.col_loc), self.tempdir])
index c06a9c5a9799d65dee6bf5d18d198b32d3395767..25e13a51dbbf70444dbd0eaa6c4417fad93b1d15 100644 (file)
@@ -58,7 +58,6 @@ gem 'themes_for_rails', git: 'https://github.com/curoverse/themes_for_rails'
 gem 'arvados', '>= 0.1.20150615153458'
 gem 'arvados-cli', '>= 0.1.20161017193526'
 
-gem 'puma', '~> 2.0'
 gem 'sshkey'
 gem 'safe_yaml'
 gem 'lograge'
index ebb594cb1ef31c8b89bd79a0dd01d2b14c158e56..91a9d04e4ebe26b48b4a85c88a8c40fa1d9ad826 100644 (file)
@@ -197,7 +197,6 @@ GEM
     protected_attributes (1.1.3)
       activemodel (>= 4.0.1, < 5.0)
     public_suffix (2.0.5)
-    puma (2.16.0)
     rack (1.6.8)
     rack-test (0.6.3)
       rack (>= 1.0)
@@ -308,7 +307,6 @@ DEPENDENCIES
   passenger
   pg
   protected_attributes
-  puma (~> 2.0)
   rails (~> 4.0)
   rails-observers
   responders (~> 2.0)
index 9826cf2f906f5a7ecd532b4522d2940f41ca5457..6bdba7af89d803975faa40f50d6508ab0b25d953 100644 (file)
@@ -365,7 +365,7 @@ class ApplicationController < ActionController::Base
   end
 
   def require_auth_scope
-    if @read_auths.empty?
+    unless current_user && @read_auths.any? { |auth| auth.user.andand.uuid == current_user.uuid }
       if require_login != false
         send_error("Forbidden", status: 403)
       end
index 9c1c5870e7d7c6d83b3ddf4a89a413da0db89991..826ced1cf8c1c3945503b7d79f4865928fb7beb3 100644 (file)
@@ -94,8 +94,24 @@ class Arvados::V1::ApiClientAuthorizationsController < ApplicationController
       @offset = 0
       super
       wanted_scopes.compact.each do |scope_list|
-        sorted_scopes = scope_list.sort
-        @objects = @objects.select { |auth| auth.scopes.sort == sorted_scopes }
+        if @objects.respond_to?(:where) && scope_list.length < 2
+          @objects = @objects.
+                     where('scopes in (?)',
+                           [scope_list.to_yaml, SafeJSON.dump(scope_list)])
+        else
+          if @objects.respond_to?(:where)
+            # Eliminate rows with scopes=['all'] before doing the
+            # expensive filter. They are typically the majority of
+            # rows, and they obviously won't match given
+            # scope_list.length>=2, so loading them all into
+            # ActiveRecord objects is a huge waste of time.
+            @objects = @objects.
+                       where('scopes not in (?)',
+                             [['all'].to_yaml, SafeJSON.dump(['all'])])
+          end
+          sorted_scopes = scope_list.sort
+          @objects = @objects.select { |auth| auth.scopes.sort == sorted_scopes }
+        end
       end
       @limit = @request_limit
       @offset = @request_offset
index d5ba487d0f92e0506814585a44fcf98fd50ec31f..6f893bcc850015b7c682243e1913ac121dbf9551 100644 (file)
@@ -50,6 +50,7 @@ class Arvados::V1::SchemaController < ApplicationController
         crunchLimitLogBytesPerJob: Rails.application.config.crunch_limit_log_bytes_per_job,
         crunchLogPartialLineThrottlePeriod: Rails.application.config.crunch_log_partial_line_throttle_period,
         websocketUrl: Rails.application.config.websocket_address,
+        workbenchUrl: Rails.application.config.workbench_address,
         parameters: {
           alt: {
             type: "string",
index 5eb756b9fa3609519a1fa63f8c3a1a9021ed190c..6a376318271472db857db6b926ba90d4d8262244 100644 (file)
@@ -32,21 +32,33 @@ class ArvadosApiToken
     user = nil
     api_client = nil
     api_client_auth = nil
-    supplied_token =
-      params["api_token"] ||
-      params["oauth_token"] ||
-      env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1]
-    if supplied_token
-      api_client_auth = ApiClientAuthorization.
+    if request.get? || params["_method"] == 'GET'
+      reader_tokens = params["reader_tokens"]
+      if reader_tokens.is_a? String
+        reader_tokens = SafeJSON.load(reader_tokens)
+      end
+    else
+      reader_tokens = nil
+    end
+
+    # Set current_user etc. based on the primary session token if a
+    # valid one is present. Otherwise, use the first valid token in
+    # reader_tokens.
+    [params["api_token"],
+     params["oauth_token"],
+     env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1],
+     *reader_tokens,
+    ].each do |supplied|
+      next if !supplied
+      try_auth = ApiClientAuthorization.
         includes(:api_client, :user).
-        where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
+        where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied).
         first
-      if api_client_auth.andand.user
+      if try_auth.andand.user
+        api_client_auth = try_auth
         user = api_client_auth.user
         api_client = api_client_auth.api_client
-      else
-        # Token seems valid, but points to a non-existent (deleted?) user.
-        api_client_auth = nil
+        break
       end
     end
     Thread.current[:api_client_ip_address] = remote_ip
index b1a7b5be9dd96c3d1d2ba44954b5e3a39c8b279c..235b78e332f337c0e618da27709517ee9c29af3f 100644 (file)
@@ -24,19 +24,15 @@ class Arvados::V1::SchemaControllerTest < ActionController::TestCase
                  "discovery document was generated >#{MAX_SCHEMA_AGE}s ago")
   end
 
-  test "discovery document has defaultTrashLifetime" do
+  test "discovery document fields" do
     get :index
     assert_response :success
     discovery_doc = JSON.parse(@response.body)
     assert_includes discovery_doc, 'defaultTrashLifetime'
     assert_equal discovery_doc['defaultTrashLifetime'], Rails.application.config.default_trash_lifetime
-  end
-
-  test "discovery document has source_version" do
-    get :index
-    assert_response :success
-    discovery_doc = JSON.parse(@response.body)
     assert_match(/^[0-9a-f]+(-modified)?$/, discovery_doc['source_version'])
+    assert_equal discovery_doc['websocketUrl'], Rails.application.config.websocket_address
+    assert_equal discovery_doc['workbenchUrl'], Rails.application.config.workbench_address
   end
 
   test "discovery document overrides source_version with config" do
index dba801920c7ed9365857db759dc6b4ac87be60aa..dfb57496a7704f7d0094f960fd3ec425ff3e55c1 100644 (file)
@@ -27,6 +27,20 @@ class ApiTokensScopeTest < ActionDispatch::IntegrationTest
     assert_response 403
   end
 
+  test "narrow + wide scoped tokens for different users" do
+    get_args = [{
+                  reader_tokens: [api_client_authorizations(:anonymous).api_token]
+                }, auth(:active_userlist)]
+    get(v1_url('users'), *get_args)
+    assert_response :success
+    get(v1_url('users', ''), *get_args)  # Add trailing slash.
+    assert_response :success
+    get(v1_url('users', 'current'), *get_args)
+    assert_response 403
+    get(v1_url('virtual_machines'), *get_args)
+    assert_response 403
+   end
+
   test "specimens token can see exactly owned specimens" do
     get_args = [{}, auth(:active_specimens)]
     get(v1_url('specimens'), *get_args)
index dd59f74eb4784ccbf857a6b6d07b16a283828e66..a60be093a31f033f15d4dbb96a22d4f35da0dda2 100644 (file)
@@ -50,22 +50,15 @@ class ReaderTokensTest < ActionDispatch::IntegrationTest
 
   [nil, :active_noscope].each do |main_auth|
     [:spectator, :spectator_specimens].each do |read_auth|
-      test "#{main_auth} auth with reader token #{read_auth} can read" do
-        assert_includes(get_specimen_uuids(main_auth, read_auth),
-                        spectator_specimen, "did not find spectator specimen")
-      end
-
-      test "#{main_auth} auth with JSON read token #{read_auth} can read" do
-        assert_includes(get_specimen_uuids(main_auth, read_auth, :to_json),
-                        spectator_specimen, "did not find spectator specimen")
-      end
-
-      test "#{main_auth} auth with reader token #{read_auth} can't write" do
-        assert_post_denied(main_auth, read_auth)
-      end
+      [:to_a, :to_json].each do |formatter|
+        test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can#{"'t" if main_auth} read" do
+          get_specimens(main_auth, read_auth)
+          assert_response(if main_auth then 403 else 200 end)
+        end
 
-      test "#{main_auth} auth with JSON read token #{read_auth} can't write" do
-        assert_post_denied(main_auth, read_auth, :to_json)
+        test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can't write" do
+          assert_post_denied(main_auth, read_auth, formatter)
+        end
       end
     end
   end
index 27a548aa61e94c5bb9e555a6737cd5bcdf6b1c9f..fc0dda718ceda7fddd2e1f41c704fa434fdb0034 100644 (file)
@@ -228,6 +228,32 @@ func (runner *ContainerRunner) stopSignals() {
        }
 }
 
+var errorBlacklist = []string{"Cannot connect to the Docker daemon"}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+
+func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+       for _, d := range errorBlacklist {
+               if strings.Index(goterr.Error(), d) != -1 {
+                       runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+                       if *brokenNodeHook == "" {
+                               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+                       } else {
+                               runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+                               // run killme script
+                               c := exec.Command(*brokenNodeHook)
+                               c.Stdout = runner.CrunchLog
+                               c.Stderr = runner.CrunchLog
+                               err := c.Run()
+                               if err != nil {
+                                       runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+                               }
+                       }
+                       return true
+               }
+       }
+       return false
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -1487,7 +1513,11 @@ func (runner *ContainerRunner) Run() (err error) {
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
-               runner.finalState = "Cancelled"
+               if !runner.checkBrokenNode(err) {
+                       // Failed to load image but not due to a "broken node"
+                       // condition, probably user error.
+                       runner.finalState = "Cancelled"
+               }
                err = fmt.Errorf("While loading container image: %v", err)
                return
        }
@@ -1516,8 +1546,6 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       runner.StartCrunchstat()
-
        if runner.IsCancelled() {
                return
        }
@@ -1528,8 +1556,11 @@ func (runner *ContainerRunner) Run() (err error) {
        }
        runner.finalState = "Cancelled"
 
+       runner.StartCrunchstat()
+
        err = runner.StartContainer()
        if err != nil {
+               runner.checkBrokenNode(err)
                return
        }
 
@@ -1607,25 +1638,27 @@ func main() {
        }
        api.Retries = 8
 
-       var kc *keepclient.KeepClient
-       kc, err = keepclient.MakeKeepClient(api)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
+       kc, kcerr := keepclient.MakeKeepClient(api)
+       if kcerr != nil {
+               log.Fatalf("%s: %v", containerId, kcerr)
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
-       var docker *dockerclient.Client
        // API version 1.21 corresponds to Docker 1.9, which is currently the
        // minimum version we want to support.
-       docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
-       }
-
+       docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
        dockerClientProxy := ThinDockerClientProxy{Docker: docker}
 
        cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+
+       if dockererr != nil {
+               cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+               cr.checkBrokenNode(dockererr)
+               cr.CrunchLog.Close()
+               os.Exit(1)
+       }
+
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent
index bc0b3125c9e699414a3d08f32cb88868769a346d..4702838362c04b1237593f66a789ea96848bb3c3 100644 (file)
@@ -156,6 +156,10 @@ func (t *TestDockerClient) ContainerWait(ctx context.Context, container string,
 }
 
 func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+       if t.finish == 2 {
+               return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+       }
+
        if t.imageLoaded == image {
                return dockertypes.ImageInspect{}, nil, nil
        } else {
@@ -164,6 +168,9 @@ func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string
 }
 
 func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+       if t.finish == 2 {
+               return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+       }
        _, err := io.Copy(ioutil.Discard, input)
        if err != nil {
                return dockertypes.ImageLoadResponse{}, err
@@ -668,9 +675,10 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f
        if api.CalledWith("container.state", "Complete") != nil {
                c.Check(err, IsNil)
        }
-       c.Check(api.WasSetRunning, Equals, true)
-
-       c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+       if exitCode != 2 {
+               c.Check(api.WasSetRunning, Equals, true)
+               c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+       }
 
        if err != nil {
                for k, v := range api.Logs {
@@ -1759,3 +1767,61 @@ func (s *TestSuite) TestEvalSymlinkDir(c *C) {
        _, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "", 0)
        c.Assert(err, NotNil)
 }
+
+func (s *TestSuite) TestFullBrokenDocker1(c *C) {
+       tf, err := ioutil.TempFile("", "brokenNodeHook-")
+       c.Assert(err, IsNil)
+       defer os.Remove(tf.Name())
+
+       tf.Write([]byte(`#!/bin/sh
+exec echo killme
+`))
+       tf.Close()
+       os.Chmod(tf.Name(), 0700)
+
+       ech := tf.Name()
+       brokenNodeHook = &ech
+
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello world\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
+
+}
+
+func (s *TestSuite) TestFullBrokenDocker2(c *C) {
+       ech := ""
+       brokenNodeHook = &ech
+
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello world\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+}
index 3d1b4476255d0cadf2274ad12b9aae78f164f72f..e2a6221f10e28981ea0c3f64fa5ce6d52ac718ea 100644 (file)
@@ -10,7 +10,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net"
        "net/http"
        "os"
@@ -25,7 +24,9 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/health"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
        "github.com/ghodss/yaml"
        "github.com/gorilla/mux"
@@ -55,7 +56,13 @@ var (
        router   http.Handler
 )
 
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
 func main() {
+       log.SetFormatter(&log.JSONFormatter{
+               TimestampFormat: rfc3339NanoFixed,
+       })
+
        cfg := DefaultConfig()
 
        flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
@@ -164,7 +171,7 @@ func main() {
 
        // Start serving requests.
        router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
-       http.Serve(listener, router)
+       http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
 
        log.Println("shutting down")
 }
@@ -596,7 +603,8 @@ func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient
                        Timeout:   h.timeout,
                        Transport: h.transport,
                },
-               proto: req.Proto,
+               proto:     req.Proto,
+               requestID: req.Header.Get("X-Request-Id"),
        }
        return &kc
 }
index 25619bbb91a3629d8d90563d051b8a31578ed121..a7b608b69c462fd4149f932c16dbabcaddbca6c6 100644 (file)
@@ -10,11 +10,12 @@ import (
        "errors"
        "fmt"
        "io/ioutil"
-       "log"
+       "math/rand"
        "net/http"
        "net/http/httptest"
        "os"
        "strings"
+       "sync"
        "testing"
        "time"
 
@@ -55,7 +56,7 @@ func waitForListener() {
                time.Sleep(ms * time.Millisecond)
        }
        if listener == nil {
-               log.Fatalf("Timed out waiting for listener to start")
+               panic("Timed out waiting for listener to start")
        }
 }
 
@@ -216,6 +217,33 @@ func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
        }
 }
 
+func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
+       kc := runProxy(c, nil, false)
+       defer closeListener()
+       router.(*proxyHandler).timeout = time.Nanosecond
+
+       buf := make([]byte, 1<<20)
+       rand.Read(buf)
+       var wg sync.WaitGroup
+       for i := 0; i < 128; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       kc.PutB(buf)
+               }()
+       }
+       done := make(chan bool)
+       go func() {
+               wg.Wait()
+               close(done)
+       }()
+       select {
+       case <-done:
+       case <-time.After(10 * time.Second):
+               c.Error("timeout")
+       }
+}
+
 func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
        kc := runProxy(c, nil, false)
        defer closeListener()
@@ -226,14 +254,14 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
        {
                _, _, err := kc.Ask(hash)
                c.Check(err, Equals, keepclient.BlockNotFound)
-               log.Print("Finished Ask (expected BlockNotFound)")
+               c.Log("Finished Ask (expected BlockNotFound)")
        }
 
        {
                reader, _, _, err := kc.Get(hash)
                c.Check(reader, Equals, nil)
                c.Check(err, Equals, keepclient.BlockNotFound)
-               log.Print("Finished Get (expected BlockNotFound)")
+               c.Log("Finished Get (expected BlockNotFound)")
        }
 
        // Note in bug #5309 among other errors keepproxy would set
@@ -252,14 +280,14 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("Finished PutB (expected success)")
+               c.Log("Finished PutB (expected success)")
        }
 
        {
                blocklen, _, err := kc.Ask(hash2)
                c.Assert(err, Equals, nil)
                c.Check(blocklen, Equals, int64(3))
-               log.Print("Finished Ask (expected success)")
+               c.Log("Finished Ask (expected success)")
        }
 
        {
@@ -268,7 +296,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                all, err := ioutil.ReadAll(reader)
                c.Check(all, DeepEquals, []byte("foo"))
                c.Check(blocklen, Equals, int64(3))
-               log.Print("Finished Get (expected success)")
+               c.Log("Finished Get (expected success)")
        }
 
        {
@@ -278,7 +306,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("Finished PutB zero block")
+               c.Log("Finished PutB zero block")
        }
 
        {
@@ -287,7 +315,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                all, err := ioutil.ReadAll(reader)
                c.Check(all, DeepEquals, []byte(""))
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Finished Get zero block")
+               c.Log("Finished Get zero block")
        }
 }
 
@@ -302,7 +330,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                errNotFound, _ := err.(keepclient.ErrNotFound)
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
-               log.Print("Ask 1")
+               c.Log("Ask 1")
        }
 
        {
@@ -310,7 +338,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(hash2, Equals, "")
                c.Check(rep, Equals, 0)
                c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
-               log.Print("PutB")
+               c.Log("PutB")
        }
 
        {
@@ -319,7 +347,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Ask 2")
+               c.Log("Ask 2")
        }
 
        {
@@ -328,7 +356,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Get")
+               c.Log("Get")
        }
 }
 
@@ -343,7 +371,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                errNotFound, _ := err.(keepclient.ErrNotFound)
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
-               log.Print("Ask 1")
+               c.Log("Ask 1")
        }
 
        {
@@ -351,7 +379,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("PutB")
+               c.Log("PutB")
        }
 
        {
@@ -360,7 +388,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Ask 2")
+               c.Log("Ask 2")
        }
 
        {
@@ -369,7 +397,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Get")
+               c.Log("Get")
        }
 }
 
index 0faf4aea0e3c35354e30dc33f1e7005d491ab4d5..3fa2671df58331bb52c16651ded3924e9390ee90 100644 (file)
@@ -13,11 +13,13 @@ import (
 var viaAlias = "keepproxy"
 
 type proxyClient struct {
-       client keepclient.HTTPClient
-       proto  string
+       client    keepclient.HTTPClient
+       proto     string
+       requestID string
 }
 
 func (pc *proxyClient) Do(req *http.Request) (*http.Response, error) {
        req.Header.Add("Via", pc.proto+" "+viaAlias)
+       req.Header.Add("X-Request-Id", pc.requestID)
        return pc.client.Do(req)
 }
index 424910dfa8b2af4a0d22ad9e91bd4eab20076af7..4d042a70dd376ea1e04bdff16283ab80669dfd0f 100644 (file)
@@ -975,7 +975,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
+               MakeRESTRouter().ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
index 921176dbbe93f481f497af476f176c2116ef3bce..e422179f643e9cad438742cd2aa450d52ae84fa4 100644 (file)
@@ -147,11 +147,11 @@ func main() {
        // Start a round-robin VolumeManager with the volumes we have found.
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
-       // Middleware stack: logger, MaxRequests limiter, method handlers
+       // Middleware/handler stack
        router := MakeRESTRouter()
        limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
        router.limiter = limiter
-       http.Handle("/", &LoggingRESTRouter{router: limiter})
+       http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
deleted file mode 100644 (file)
index 63c28a2..0000000
+++ /dev/null
@@ -1,111 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-// LoggingRESTRouter
-// LoggingResponseWriter
-
-import (
-       "context"
-       "net/http"
-       "strings"
-       "time"
-
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "git.curoverse.com/arvados.git/sdk/go/stats"
-       log "github.com/Sirupsen/logrus"
-)
-
-// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
-type LoggingResponseWriter struct {
-       Status int
-       Length int
-       http.ResponseWriter
-       ResponseBody string
-       sentHdr      time.Time
-}
-
-// CloseNotify implements http.CloseNotifier.
-func (resp *LoggingResponseWriter) CloseNotify() <-chan bool {
-       wrapped, ok := resp.ResponseWriter.(http.CloseNotifier)
-       if !ok {
-               // If upstream doesn't implement CloseNotifier, we can
-               // satisfy the interface by returning a channel that
-               // never sends anything (the interface doesn't
-               // guarantee that anything will ever be sent on the
-               // channel even if the client disconnects).
-               return nil
-       }
-       return wrapped.CloseNotify()
-}
-
-// WriteHeader writes header to ResponseWriter
-func (resp *LoggingResponseWriter) WriteHeader(code int) {
-       if resp.sentHdr == zeroTime {
-               resp.sentHdr = time.Now()
-       }
-       resp.Status = code
-       resp.ResponseWriter.WriteHeader(code)
-}
-
-var zeroTime time.Time
-
-func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
-       if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime {
-               resp.sentHdr = time.Now()
-       }
-       resp.Length += len(data)
-       if resp.Status >= 400 {
-               resp.ResponseBody += string(data)
-       }
-       return resp.ResponseWriter.Write(data)
-}
-
-// LoggingRESTRouter is used to add logging capabilities to mux.Router
-type LoggingRESTRouter struct {
-       router      http.Handler
-       idGenerator httpserver.IDGenerator
-}
-
-func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
-       tStart := time.Now()
-
-       // Attach a requestID-aware logger to the request context.
-       lgr := log.WithField("RequestID", loggingRouter.idGenerator.Next())
-       ctx := context.WithValue(req.Context(), "logger", lgr)
-       req = req.WithContext(ctx)
-
-       lgr = lgr.WithFields(log.Fields{
-               "remoteAddr":      req.RemoteAddr,
-               "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
-               "reqMethod":       req.Method,
-               "reqPath":         req.URL.Path[1:],
-               "reqBytes":        req.ContentLength,
-       })
-       lgr.Debug("request")
-
-       resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
-       loggingRouter.router.ServeHTTP(&resp, req)
-       tDone := time.Now()
-
-       statusText := http.StatusText(resp.Status)
-       if resp.Status >= 400 {
-               statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
-       }
-       if resp.sentHdr == zeroTime {
-               // Nobody changed status or wrote any data, i.e., we
-               // returned a 200 response with no body.
-               resp.sentHdr = tDone
-       }
-
-       lgr.WithFields(log.Fields{
-               "timeTotal":      stats.Duration(tDone.Sub(tStart)),
-               "timeToStatus":   stats.Duration(resp.sentHdr.Sub(tStart)),
-               "timeWriteBody":  stats.Duration(tDone.Sub(resp.sentHdr)),
-               "respStatusCode": resp.Status,
-               "respStatus":     statusText,
-               "respBytes":      resp.Length,
-       }).Info("response")
-}
diff --git a/services/keepstore/logging_router_test.go b/services/keepstore/logging_router_test.go
deleted file mode 100644 (file)
index 6ca48dc..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "net/http"
-       "testing"
-)
-
-func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
-       http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
-}
index e6a53d06c6c297ab343f697dd7c5f68a40e8355e..61e69f9096503eb88cf9328af5e85f129dba4226 100644 (file)
@@ -45,6 +45,8 @@ var (
        s3RaceWindow    time.Duration
 
        s3ACL = s3.Private
+
+       zeroTime time.Time
 )
 
 const (
index c8558a417a2979a56ee69501cf3491737f4b987e..5d033081213c5faa72dc33c656dd9c3167f140da 100644 (file)
@@ -130,7 +130,7 @@ class RetryDriver(FakeDriver):
         create_calls += 1
         if create_calls < 2:
             raise RateLimitReachedError(429, "Rate limit exceeded",
-                                        retry_after=12)
+                                        headers={'retry-after': '12'})
         elif create_calls < 3:
             raise BaseHTTPError(429, "Rate limit exceeded",
                                 {'retry-after': '2'})
index f3764fc8e98bdb880682b33cd271f201ea215935..6382dcb7277c19c20bb7612c7b9aecbf65137b99 100644 (file)
@@ -40,14 +40,14 @@ setup(name='arvados-node-manager',
           'setuptools'
       ],
       dependency_links=[
-          "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev2.zip"
+          "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev3.zip"
       ],
       test_suite='tests',
       tests_require=[
           'requests',
           'pbr<1.7.0',
           'mock>=1.0',
-          'apache-libcloud==2.2.2.dev2',
+          'apache-libcloud==2.2.2.dev3',
       ],
       zip_safe=False,
       cmdclass={'egg_info': tagger},
index 6d535eaed4dd73b31a91bd6e60774f6eac9ee4d7..f4a65da537ecb96f861c946ba8c92a2938dd1600 100755 (executable)
@@ -46,6 +46,10 @@ if test -z "$SSO_ROOT" ; then
     SSO_ROOT="$ARVBOX_DATA/sso-devise-omniauth-provider"
 fi
 
+if test -z "$COMPOSER_ROOT" ; then
+    COMPOSER_ROOT="$ARVBOX_DATA/composer"
+fi
+
 PG_DATA="$ARVBOX_DATA/postgres"
 VAR_DATA="$ARVBOX_DATA/var"
 PASSENGER="$ARVBOX_DATA/passenger"
@@ -193,6 +197,9 @@ run() {
         if ! test -d "$SSO_ROOT" ; then
             git clone https://github.com/curoverse/sso-devise-omniauth-provider.git "$SSO_ROOT"
         fi
+        if ! test -d "$COMPOSER_ROOT" ; then
+            git clone https://github.com/curoverse/composer.git "$COMPOSER_ROOT"
+        fi
 
         if test "$CONFIG" = test ; then
 
@@ -205,6 +212,7 @@ run() {
                        --privileged \
                        "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
                        "--volume=$SSO_ROOT:/usr/src/sso:rw" \
+                       "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
                        "--volume=$PG_DATA:/var/lib/postgresql:rw" \
                        "--volume=$VAR_DATA:/var/lib/arvados:rw" \
                        "--volume=$PASSENGER:/var/lib/passenger:rw" \
@@ -246,6 +254,7 @@ run() {
                    --privileged \
                    "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
                    "--volume=$SSO_ROOT:/usr/src/sso:rw" \
+                   "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
                    "--volume=$PG_DATA:/var/lib/postgresql:rw" \
                    "--volume=$VAR_DATA:/var/lib/arvados:rw" \
                    "--volume=$PASSENGER:/var/lib/passenger:rw" \
index a12c1070f1b63e10f0502d92ebeb8153b27cd2ce..f29066ec9fcb71de007837e295de61bcefc85ffa 100644 (file)
@@ -18,7 +18,8 @@ RUN apt-get update && \
     libjson-perl nginx gitolite3 lsof libreadline-dev \
     apt-transport-https ca-certificates slurm-wlm \
     linkchecker python3-virtualenv python-virtualenv xvfb iceweasel \
-    libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr && \
+    libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr \
+    libsecret-1-dev && \
     apt-get clean
 
 ENV RUBYVERSION_MINOR 2.3
@@ -76,7 +77,7 @@ RUN set -e && \
 
 RUN pip install -U setuptools
 
-ENV NODEVERSION v6.11.2
+ENV NODEVERSION v6.11.4
 
 # Install nodejs binary
 RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
index 39bc21c3ae4f3c7a39f573ee1ee76f4d01eb2991..80344c16f2ef9bfcf0f97bf14f07a8c1cb97ce73 100644 (file)
@@ -5,12 +5,15 @@
 FROM arvados/arvbox-base
 ARG arvados_version
 ARG sso_version=master
+ARG composer_version=master
 
 RUN cd /usr/src && \
     git clone --no-checkout https://github.com/curoverse/arvados.git && \
     git -C arvados checkout ${arvados_version} && \
     git clone --no-checkout https://github.com/curoverse/sso-devise-omniauth-provider.git sso && \
-    git -C sso checkout ${sso_version}
+    git -C sso checkout ${sso_version} && \
+    git clone --no-checkout https://github.com/curoverse/composer.git && \
+    git -C composer checkout ${composer_version}
 
 ADD service/ /var/lib/arvbox/service
 RUN ln -sf /var/lib/arvbox/service /etc
index 62225df6ceae523493a27450466345c28d4bc7a5..466ef1fceede5fd7c956a9ee46c558614712633a 100644 (file)
@@ -21,6 +21,7 @@ services=(
   [workbench]=80
   [api]=8000
   [sso]=8900
+  [composer]=4200
   [arv-git-httpd]=9001
   [keep-web]=9002
   [keepproxy]=25100
index 3296a3cd178779cb956d87250748a71e9ae8d8ad..a4689f004aeebab31c71f0dbe35833ec99df7b89 100755 (executable)
@@ -19,6 +19,7 @@ if ! grep "^arvbox:" /etc/passwd >/dev/null 2>/dev/null ; then
             --uid $HOSTUID --gid $HOSTGID \
             --non-unique \
             --groups docker \
+            --shell /bin/bash \
             arvbox
     useradd --home-dir /var/lib/arvados/git --uid $HOSTUID --gid $HOSTGID --non-unique git
     useradd --groups docker crunch
diff --git a/tools/arvbox/lib/arvbox/docker/service/composer/log/main/.gitstub b/tools/arvbox/lib/arvbox/docker/service/composer/log/main/.gitstub
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/tools/arvbox/lib/arvbox/docker/service/composer/log/run b/tools/arvbox/lib/arvbox/docker/service/composer/log/run
new file mode 120000 (symlink)
index 0000000..d6aef4a
--- /dev/null
@@ -0,0 +1 @@
+/usr/local/lib/arvbox/logger
\ No newline at end of file
diff --git a/tools/arvbox/lib/arvbox/docker/service/composer/run b/tools/arvbox/lib/arvbox/docker/service/composer/run
new file mode 100755 (executable)
index 0000000..cd2f86a
--- /dev/null
@@ -0,0 +1,8 @@
+#!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+set -e
+
+/usr/local/lib/arvbox/runsu.sh $0-service $1
diff --git a/tools/arvbox/lib/arvbox/docker/service/composer/run-service b/tools/arvbox/lib/arvbox/docker/service/composer/run-service
new file mode 100755 (executable)
index 0000000..6578ea5
--- /dev/null
@@ -0,0 +1,22 @@
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+exec 2>&1
+set -ex -o pipefail
+
+.  /usr/local/lib/arvbox/common.sh
+
+cd /usr/src/composer
+
+npm install yarn
+
+PATH=$PATH:/usr/src/composer/node_modules/.bin
+
+yarn install
+
+if test "$1" != "--only-deps" ; then
+    echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/arvados-configuration.yml
+    exec ng serve --host 0.0.0.0 --port 4200 --env=webdev
+fi
index ee35bb916f7b580e6ed540e225f48fd943f87167..ab20d5758c96a5f298e3ce25e5248611a3446e21 100755 (executable)
@@ -16,7 +16,8 @@ else
 fi
 
 run_bundler --without=development
-bundle exec passenger start --runtime-check-only --runtime-dir=/var/lib/passenger
+bundle exec passenger-config build-native-support
+bundle exec passenger-config install-standalone-runtime
 
 if test "$1" = "--only-deps" ; then
     exit
@@ -82,6 +83,7 @@ fi
 rm -rf tmp
 mkdir -p tmp/cache
 
+bundle exec rake assets:precompile
 bundle exec rake db:migrate
 
 set +u
@@ -90,6 +92,5 @@ if test "$1" = "--only-setup" ; then
 fi
 
 exec bundle exec passenger start --port=${services[sso]} \
-     --runtime-dir=/var/lib/passenger \
      --ssl --ssl-certificate=/var/lib/arvados/self-signed.pem \
      --ssl-certificate-key=/var/lib/arvados/self-signed.key
index 1bd89f9f4d77f07588960fbf8c7a23bac96b5b55..a41922bb343948656d838f3d958c0b8131fa9d26 100755 (executable)
@@ -13,6 +13,12 @@ rm -rf tmp
 mkdir tmp
 chown arvbox:arvbox tmp
 
+if test -s /var/lib/arvados/workbench_rails_env ; then
+  export RAILS_ENV=$(cat /var/lib/arvados/workbench_rails_env)
+else
+  export RAILS_ENV=development
+fi
+
 if test "$1" != "--only-deps" ; then
     exec bundle exec passenger start --port 80 \
          --user arvbox --runtime-dir=/var/lib/passenger
index 8382a1cf30e1e17ee811e93b2b8c72ccf1139c7e..885385aeef971816b08f5f32f17b51ebf23ff2eb 100755 (executable)
@@ -45,6 +45,9 @@ $RAILS_ENV:
   keep_web_download_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
   keep_web_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
   arvados_docsite: http://$localip:${services[doc]}/
+  force_ssl: false
 EOF
 
+bundle exec rake assets:precompile
+
 (cd config && /usr/local/lib/arvbox/application_yml_override.py)