Merge branch '12018-tool-docs'
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 29 Nov 2017 21:59:44 +0000 (18:59 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 29 Nov 2017 21:59:44 +0000 (18:59 -0300)
Refs #12018

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

37 files changed:
apps/workbench/app/assets/javascripts/components/search.js [moved from apps/workbench/app/assets/javascripts/components/collections.js with 68% similarity]
apps/workbench/app/assets/javascripts/components/sessions.js
apps/workbench/app/assets/javascripts/models/session_db.js
apps/workbench/app/controllers/collections_controller.rb
apps/workbench/app/controllers/search_controller.rb
apps/workbench/app/models/arvados_api_client.rb
apps/workbench/app/models/user.rb
apps/workbench/app/views/layouts/body.html.erb
apps/workbench/app/views/search/index.html [moved from apps/workbench/app/views/collections/multisite.html with 66% similarity]
apps/workbench/config/routes.rb
build/package-build-dockerfiles/ubuntu1204/Dockerfile [deleted file]
build/package-test-dockerfiles/ubuntu1204/Dockerfile [deleted file]
build/run-build-packages-all-targets.sh
build/run-tests.sh
docker/jobs/apt.arvados.org.list
sdk/cwl/arvados_cwl/__init__.py
sdk/go/asyncbuf/buf.go [new file with mode: 0644]
sdk/go/asyncbuf/buf_test.go [new file with mode: 0644]
sdk/go/keepclient/hashcheck.go
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/support.go
sdk/go/streamer/streamer.go [deleted file]
sdk/go/streamer/streamer_test.go [deleted file]
sdk/go/streamer/transfer.go [deleted file]
services/api/app/controllers/application_controller.rb
services/api/app/middlewares/arvados_api_token.rb
services/api/test/integration/api_client_authorizations_scopes_test.rb
services/api/test/integration/reader_tokens_test.rb
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/keepproxy/keepproxy_test.go
tools/arvbox/lib/arvbox/docker/Dockerfile.base
tools/arvbox/lib/arvbox/docker/Dockerfile.demo
tools/arvbox/lib/arvbox/docker/service/sso/run-service
tools/arvbox/lib/arvbox/docker/service/workbench/run
tools/arvbox/lib/arvbox/docker/service/workbench/run-service

similarity index 68%
rename from apps/workbench/app/assets/javascripts/components/collections.js
rename to apps/workbench/app/assets/javascripts/components/search.js
index 591bf38aa74b162e353c9705da1ecbcbba2d8cde..2fe73193e7f116fcda6c2831c8c2250c2a266aee 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-window.CollectionsTable = {
+window.SearchResultsTable = {
     maybeLoadMore: function(dom) {
         var loader = this.loader
         if (loader.state != loader.READY)
@@ -37,6 +37,10 @@ window.CollectionsTable = {
     },
     view: function(vnode) {
         var loader = vnode.attrs.loader
+        var iconsMap = {
+            collections: m('i.fa.fa-fw.fa-archive'),
+            projects: m('i.fa.fa-fw.fa-folder'),
+        }
         return m('table.table.table-condensed', [
             m('thead', m('tr', [
                 m('th'),
@@ -50,8 +54,13 @@ window.CollectionsTable = {
                         m('td', [
                             item.workbenchBaseURL() &&
                                 m('a.btn.btn-xs.btn-default', {
-                                    href: item.workbenchBaseURL()+'collections/'+item.uuid,
-                                }, 'Show'),
+                                    'data-original-title': 'show '+item.objectType.description,
+                                    'data-placement': 'top',
+                                    'data-toggle': 'tooltip',
+                                    href: item.workbenchBaseURL()+'/'+item.objectType.wb_path+'/'+item.uuid,
+                                    // Bootstrap's tooltip feature
+                                    oncreate: function(vnode) { $(vnode.dom).tooltip() },
+                                }, iconsMap[item.objectType.wb_path]),
                         ]),
                         m('td.arvados-uuid', item.uuid),
                         m('td', item.name || '(unnamed)'),
@@ -79,7 +88,7 @@ window.CollectionsTable = {
     },
 }
 
-window.CollectionsSearch = {
+window.Search = {
     oninit: function(vnode) {
         vnode.state.sessionDB = new SessionDB()
         vnode.state.searchEntered = m.stream()
@@ -97,28 +106,50 @@ window.CollectionsSearch = {
                     var workbenchBaseURL = function() {
                         return vnode.state.sessionDB.workbenchBaseURL(session)
                     }
-                    return new MultipageLoader({
+                    var searchable_objects = [
+                        {
+                            wb_path: 'projects',
+                            api_path: 'arvados/v1/groups',
+                            filters: [['group_class', '=', 'project']],
+                            description: 'project',
+                        },
+                        {
+                            wb_path: 'collections',
+                            api_path: 'arvados/v1/collections',
+                            filters: [],
+                            description: 'collection',
+                        },
+                    ]
+                    return new MergingLoader({
                         sessionKey: key,
-                        loadFunc: function(filters) {
-                            var tsquery = to_tsquery(q)
-                            if (tsquery) {
-                                filters = filters.slice(0)
-                                filters.push(['any', '@@', tsquery])
-                            }
-                            return vnode.state.sessionDB.request(session, 'arvados/v1/collections', {
-                                data: {
-                                    filters: JSON.stringify(filters),
-                                    count: 'none',
+                        // For every session, search for every object type
+                        children: searchable_objects.map(function(obj_type) {
+                            return new MultipageLoader({
+                                sessionKey: key,
+                                loadFunc: function(filters) {
+                                    // Apply additional type dependant filters
+                                    filters = filters.concat(obj_type.filters)
+                                    var tsquery = to_tsquery(q)
+                                    if (tsquery) {
+                                        filters.push(['any', '@@', tsquery])
+                                    }
+                                    return vnode.state.sessionDB.request(session, obj_type.api_path, {
+                                        data: {
+                                            filters: JSON.stringify(filters),
+                                            count: 'none',
+                                        },
+                                    }).then(function(resp) {
+                                        resp.items.map(function(item) {
+                                            item.workbenchBaseURL = workbenchBaseURL
+                                            item.objectType = obj_type
+                                        })
+                                        return resp
+                                    })
                                 },
-                            }).then(function(resp) {
-                                resp.items.map(function(item) {
-                                    item.workbenchBaseURL = workbenchBaseURL
-                                })
-                                return resp
                             })
-                        },
+                        }),
                     })
-                })
+                }),
             })
         })
     },
@@ -141,7 +172,7 @@ window.CollectionsSearch = {
                 m('.row', [
                     m('.col-md-6', [
                         m('.input-group', [
-                            m('input#search.form-control[placeholder=Search]', {
+                            m('input#search.form-control[placeholder=Search collections and projects]', {
                                 oninput: m.withAttr('value', vnode.state.searchEntered),
                                 value: vnode.state.searchEntered(),
                             }),
@@ -163,7 +194,7 @@ window.CollectionsSearch = {
                         m('a[href="/sessions"]', 'Add/remove sites'),
                     ]),
                 ]),
-                m(CollectionsTable, {
+                m(SearchResultsTable, {
                     loader: vnode.state.loader,
                 }),
             ],
index 3d127f1714d58a98eac115eeba96d9ebf95d39d4..e7cc5055734d4edaa2144d7eb4d704ec7e737736 100644 (file)
@@ -21,8 +21,8 @@ window.SessionsTable = {
         return m('.container', [
             m('p', [
                 'You can log in to multiple Arvados sites here, then use the ',
-                m('a[href="/collections/multisite"]', 'multi-site search'),
-                ' page to search collections on all sites at once.',
+                m('a[href="/search"]', 'multi-site search'),
+                ' page to search collections and projects on all sites at once.',
             ]),
             m('table.table.table-condensed.table-hover', [
                 m('thead', m('tr', [
index 01b0d72728c17d9531665058d397857ff3fa3b99..ad9ad1878417370dfd75294e9bd9cecbe25880d1 100644 (file)
@@ -135,7 +135,9 @@ window.SessionDB = function() {
             // the host part of apihostport is an IPv4 or [IPv6]
             // address.
             if (!session.baseURL.match('://(\\[|\\d+\\.\\d+\\.\\d+\\.\\d+[:/])'))
-                return session.baseURL.replace('://', '://workbench.')
+                var wbUrl = session.baseURL.replace('://', '://workbench.')
+                // Remove the trailing slash, if it's there.
+                return wbUrl.slice(-1) == '/' ? wbUrl.slice(0, -1) : wbUrl
             return null
         },
         // Return a m.stream that will get fulfilled with the
index 779d95c45b874d4fec157768e19013a1c68a0022..5fcb2dc569ff6b2446c602dc26de61a069155ba2 100644 (file)
@@ -16,7 +16,7 @@ class CollectionsController < ApplicationController
   skip_around_filter(:require_thread_api_token,
                      only: [:show_file, :show_file_links])
   skip_before_filter(:find_object_by_uuid,
-                     only: [:provenance, :show_file, :show_file_links, :multisite])
+                     only: [:provenance, :show_file, :show_file_links])
   # We depend on show_file to display the user agreement:
   skip_before_filter :check_user_agreements, only: :show_file
   skip_before_filter :check_user_profile, only: :show_file
index 40e484ea062b449068923fdc8d9951f1f7adddbe..3775abd1ae9f1117926d7bde8c847fc32ad0cd60 100644 (file)
@@ -3,6 +3,8 @@
 # SPDX-License-Identifier: AGPL-3.0
 
 class SearchController < ApplicationController
+  skip_before_filter :ensure_arvados_api_exists
+
   def find_objects_for_index
     search_what = Group
     if params[:project_uuid]
index ac2fe3a97629f8ec957b6a99b21f779b9d363e63..5a8fd518d386ec89125552c9fe17730e0488d4c4 100644 (file)
@@ -82,7 +82,7 @@ class ArvadosApiClient
     @client_mtx = Mutex.new
   end
 
-  def api(resources_kind, action, data=nil, tokens={})
+  def api(resources_kind, action, data=nil, tokens={}, include_anon_token=true)
 
     profile_checkpoint
 
@@ -117,7 +117,7 @@ class ArvadosApiClient
       'reader_tokens' => ((tokens[:reader_tokens] ||
                            Thread.current[:reader_tokens] ||
                            []) +
-                          [Rails.configuration.anonymous_user_token]).to_json,
+                          (include_anon_token ? [Rails.configuration.anonymous_user_token] : [])).to_json,
     }
     if !data.nil?
       data.each do |k,v|
index 10da22db69e4540f8d7027df94b3fd7d43d98279..1f102dbf17acd3fb807110c34f4937686ebb9f2d 100644 (file)
@@ -10,7 +10,7 @@ class User < ArvadosBase
   end
 
   def self.current
-    res = arvados_api_client.api self, '/current'
+    res = arvados_api_client.api self, '/current', nil, {}, false
     arvados_api_client.unpack_api_response(res)
   end
 
index 15f654596cbb501e8e86ccb24138d986c2f625e3..c1399f2602dc151907253d080af08504a53c0875 100644 (file)
@@ -34,7 +34,7 @@ SPDX-License-Identifier: AGPL-3.0 %>
                     <%=
                        target = Rails.configuration.multi_site_search
                        if target == true
-                         target = {controller: 'collections', action: 'multisite'}
+                         target = {controller: 'search', action: 'index'}
                        end
                        link_to("Multi-site search", target, {class: 'btn btn-default'}) %>
                   </form>
similarity index 66%
rename from apps/workbench/app/views/collections/multisite.html
rename to apps/workbench/app/views/search/index.html
index 9b03f10f3dcd90f6fd14d4d730367359be620797..6bcad0b1ae2c245ccd9b65afda7d3eac373088a7 100644 (file)
@@ -2,4 +2,4 @@
 
 SPDX-License-Identifier: AGPL-3.0 -->
 
-<div data-mount-mithril="CollectionsSearch"></div>
+<div data-mount-mithril="Search"></div>
index fee49c14ce213aa2cd001252043f4cc7dfc79745..d969abd78c2b69d8de936e2a00df0c0d1f1ef0f1 100644 (file)
@@ -95,7 +95,7 @@ ArvadosWorkbench::Application.routes.draw do
     post 'remove_selected_files', on: :member
     get 'tags', on: :member
     post 'save_tags', on: :member
-    get 'multisite', on: :collection
+    get 'multisite', on: :collection, to: redirect('/search')
   end
   get('/collections/download/:uuid/:reader_token/*file' => 'collections#show_file',
       format: false)
@@ -109,7 +109,7 @@ ArvadosWorkbench::Application.routes.draw do
     get 'tab_counts', on: :member
     get 'public', on: :collection
   end
-
+  
   resources :search do
     get 'choose', :on => :collection
   end
@@ -131,9 +131,9 @@ ArvadosWorkbench::Application.routes.draw do
   match '/_health/ping', to: 'healthcheck#ping', via: [:get]
 
   get '/tests/mithril', to: 'tests#mithril'
-
+  
   get '/status', to: 'status#status'
-
+  
   # Send unroutable requests to an arbitrary controller
   # (ends up at ApplicationController#render_not_found)
   match '*a', to: 'links#render_not_found', via: [:get, :post]
diff --git a/build/package-build-dockerfiles/ubuntu1204/Dockerfile b/build/package-build-dockerfiles/ubuntu1204/Dockerfile
deleted file mode 100644 (file)
index 1d07db7..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip build-essential unzip
-
-# Install RVM
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
-    curl -L https://get.rvm.io | bash -s stable && \
-    /usr/local/rvm/bin/rvm install 2.3 && \
-    /usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
-    /usr/local/rvm/bin/rvm-exec default gem install bundler && \
-    /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
-
-# Install golang binary
-ADD generated/go1.8.3.linux-amd64.tar.gz /usr/local/
-RUN ln -s /usr/local/go/bin/go /usr/local/bin/
-
-# Install nodejs and npm
-ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
-RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
-ENV WORKSPACE /arvados
-CMD ["/usr/local/rvm/bin/rvm-exec", "default", "bash", "/jenkins/run-build-packages.sh", "--target", "ubuntu1204"]
diff --git a/build/package-test-dockerfiles/ubuntu1204/Dockerfile b/build/package-test-dockerfiles/ubuntu1204/Dockerfile
deleted file mode 100644 (file)
index 75c0cea..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install RVM
-RUN apt-get update && \
-    apt-get -y install --no-install-recommends curl ca-certificates g++ && \
-    gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
-    curl -L https://get.rvm.io | bash -s stable && \
-    /usr/local/rvm/bin/rvm install 2.3 && \
-    /usr/local/rvm/bin/rvm alias create default ruby-2.3
-
-# udev daemon can't start in a container, so don't try.
-RUN mkdir -p /etc/udev/disabled
-
-RUN echo "deb file:///arvados/packages/ubuntu1204/ /" >>/etc/apt/sources.list
index 7dd21a363d451b473bce76fa017e5ae710fa22f1..4cba3e9a62a513c8cb18d816dab98ced7f5b5363 100755 (executable)
@@ -91,11 +91,16 @@ for dockerfile_path in $(find -name Dockerfile | grep package-build-dockerfiles)
         true
     else
         FINAL_EXITCODE=$?
+        echo
+        echo "Build packages failed for $(basename $(dirname "$dockerfile_path"))"
+        echo
     fi
 done
 
 if test $FINAL_EXITCODE != 0 ; then
+    echo
     echo "Build packages failed with code $FINAL_EXITCODE" >&2
+    echo
 fi
 
 exit $FINAL_EXITCODE
index 433685c5a86d9cf2cc61b884e21c16d91a67e932..3cfc692aaec5bc1f595b9333fa6be9179164f200 100755 (executable)
@@ -100,7 +100,7 @@ sdk/go/health
 sdk/go/httpserver
 sdk/go/manifest
 sdk/go/blockdigest
-sdk/go/streamer
+sdk/go/asyncbuf
 sdk/go/stats
 sdk/go/crunchrunner
 sdk/cwl
@@ -829,7 +829,7 @@ gostuff=(
     sdk/go/health
     sdk/go/httpserver
     sdk/go/manifest
-    sdk/go/streamer
+    sdk/go/asyncbuf
     sdk/go/crunchrunner
     sdk/go/stats
     lib/crunchstat
index 3ae6df42160b2c66025f832fa159e739aa975cb5..ae1a0862a67437e257fa1fdec2919473799ac56c 100644 (file)
@@ -1,2 +1,3 @@
 # apt.arvados.org
 deb http://apt.arvados.org/ jessie main
+deb http://apt.arvados.org/ jessie-dev main
index 5756789cb1ecd5068780ca0ae036069dbdc9af5b..d15acf767fc90cc3e594be1fca0e67ae96f034ec 100644 (file)
@@ -750,7 +750,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     arvargs.conformance_test = None
     arvargs.use_container = True
     arvargs.relax_path_checks = True
-    arvargs.validate = None
     arvargs.print_supported_versions = False
 
     make_fs_access = partial(CollectionFsAccess,
diff --git a/sdk/go/asyncbuf/buf.go b/sdk/go/asyncbuf/buf.go
new file mode 100644 (file)
index 0000000..05af02f
--- /dev/null
@@ -0,0 +1,108 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+       "bytes"
+       "io"
+       "sync"
+)
+
+// A Buffer is an io.Writer that distributes written data
+// asynchronously to multiple concurrent readers.
+//
+// NewReader() can be called at any time. In all cases, every returned
+// io.Reader reads all data written to the Buffer.
+//
+// Behavior is undefined if Write is called after Close or
+// CloseWithError.
+type Buffer interface {
+       io.WriteCloser
+
+       // NewReader() returns an io.Reader that reads all data
+       // written to the Buffer.
+       NewReader() io.Reader
+
+       // Close, but return the given error (instead of io.EOF) to
+       // all readers when they reach the end of the buffer.
+       //
+       // CloseWithError(nil) is equivalent to
+       // CloseWithError(io.EOF).
+       CloseWithError(error) error
+}
+
+type buffer struct {
+       data *bytes.Buffer
+       cond sync.Cond
+       err  error // nil if there might be more writes
+}
+
+// NewBuffer creates a new Buffer using buf as its initial
+// contents. The new Buffer takes ownership of buf, and the caller
+// should not use buf after this call.
+func NewBuffer(buf []byte) Buffer {
+       return &buffer{
+               data: bytes.NewBuffer(buf),
+               cond: sync.Cond{L: &sync.Mutex{}},
+       }
+}
+
+func (b *buffer) Write(p []byte) (int, error) {
+       defer b.cond.Broadcast()
+       b.cond.L.Lock()
+       defer b.cond.L.Unlock()
+       if b.err != nil {
+               return 0, b.err
+       }
+       return b.data.Write(p)
+}
+
+func (b *buffer) Close() error {
+       return b.CloseWithError(nil)
+}
+
+func (b *buffer) CloseWithError(err error) error {
+       defer b.cond.Broadcast()
+       b.cond.L.Lock()
+       defer b.cond.L.Unlock()
+       if err == nil {
+               b.err = io.EOF
+       } else {
+               b.err = err
+       }
+       return nil
+}
+
+func (b *buffer) NewReader() io.Reader {
+       return &reader{b: b}
+}
+
+type reader struct {
+       b    *buffer
+       read int // # bytes already read
+}
+
+func (r *reader) Read(p []byte) (int, error) {
+       r.b.cond.L.Lock()
+       for {
+               switch {
+               case r.read < r.b.data.Len():
+                       buf := r.b.data.Bytes()
+                       r.b.cond.L.Unlock()
+                       n := copy(p, buf[r.read:])
+                       r.read += n
+                       return n, nil
+               case r.b.err != nil || len(p) == 0:
+                       // r.b.err != nil means we reached EOF.  And
+                       // even if we're not at EOF, there's no need
+                       // to block if len(p)==0.
+                       err := r.b.err
+                       r.b.cond.L.Unlock()
+                       return 0, err
+               default:
+                       r.b.cond.Wait()
+               }
+       }
+}
diff --git a/sdk/go/asyncbuf/buf_test.go b/sdk/go/asyncbuf/buf_test.go
new file mode 100644 (file)
index 0000000..cc742a8
--- /dev/null
@@ -0,0 +1,245 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+       "crypto/md5"
+       "errors"
+       "io"
+       "io/ioutil"
+       "math/rand"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestNoWrites(c *check.C) {
+       b := NewBuffer(nil)
+       r1 := b.NewReader()
+       r2 := b.NewReader()
+       b.Close()
+       s.checkReader(c, r1, []byte{}, nil, nil)
+       s.checkReader(c, r2, []byte{}, nil, nil)
+}
+
+func (s *Suite) TestNoReaders(c *check.C) {
+       b := NewBuffer(nil)
+       n, err := b.Write([]byte("foobar"))
+       err2 := b.Close()
+       c.Check(n, check.Equals, 6)
+       c.Check(err, check.IsNil)
+       c.Check(err2, check.IsNil)
+}
+
+func (s *Suite) TestWriteReadClose(c *check.C) {
+       done := make(chan bool, 2)
+       b := NewBuffer(nil)
+       n, err := b.Write([]byte("foobar"))
+       c.Check(n, check.Equals, 6)
+       c.Check(err, check.IsNil)
+       r1 := b.NewReader()
+       r2 := b.NewReader()
+       go s.checkReader(c, r1, []byte("foobar"), nil, done)
+       go s.checkReader(c, r2, []byte("foobar"), nil, done)
+       time.Sleep(time.Millisecond)
+       c.Check(len(done), check.Equals, 0)
+       b.Close()
+       <-done
+       <-done
+}
+
+func (s *Suite) TestPrefillWriteCloseRead(c *check.C) {
+       done := make(chan bool, 2)
+       b := NewBuffer([]byte("baz"))
+       n, err := b.Write([]byte("waz"))
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.IsNil)
+       b.Close()
+       r1 := b.NewReader()
+       go s.checkReader(c, r1, []byte("bazwaz"), nil, done)
+       r2 := b.NewReader()
+       go s.checkReader(c, r2, []byte("bazwaz"), nil, done)
+       <-done
+       <-done
+}
+
+func (s *Suite) TestWriteReadCloseRead(c *check.C) {
+       done := make(chan bool, 1)
+       b := NewBuffer(nil)
+       r1 := b.NewReader()
+       go s.checkReader(c, r1, []byte("bazwazqux"), nil, done)
+
+       b.Write([]byte("bazwaz"))
+
+       r2 := b.NewReader()
+       r2.Read(make([]byte, 3))
+
+       b.Write([]byte("qux"))
+       b.Close()
+
+       s.checkReader(c, r2, []byte("wazqux"), nil, nil)
+       <-done
+}
+
+func (s *Suite) TestReadAtEOF(c *check.C) {
+       buf := make([]byte, 8)
+
+       b := NewBuffer([]byte{1, 2, 3})
+
+       r := b.NewReader()
+       n, err := r.Read(buf)
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.IsNil)
+
+       // Reading zero bytes at EOF, but before Close(), doesn't
+       // block or error
+       done := make(chan bool)
+       go func() {
+               defer close(done)
+               n, err = r.Read(buf[:0])
+               c.Check(n, check.Equals, 0)
+               c.Check(err, check.IsNil)
+       }()
+       select {
+       case <-done:
+       case <-time.After(time.Second):
+               c.Error("timeout")
+       }
+
+       b.Close()
+
+       // Reading zero bytes after Close() returns EOF
+       n, err = r.Read(buf[:0])
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+
+       // Reading from start after Close() returns 3 bytes, then EOF
+       r = b.NewReader()
+       n, err = r.Read(buf)
+       c.Check(n, check.Equals, 3)
+       if err != nil {
+               c.Check(err, check.Equals, io.EOF)
+       }
+       n, err = r.Read(buf[:0])
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+       n, err = r.Read(buf)
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+}
+
+func (s *Suite) TestCloseWithError(c *check.C) {
+       errFake := errors.New("it's not even a real error")
+
+       done := make(chan bool, 1)
+       b := NewBuffer(nil)
+       r1 := b.NewReader()
+       go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done)
+
+       b.Write([]byte("bazwaz"))
+
+       r2 := b.NewReader()
+       r2.Read(make([]byte, 3))
+
+       b.Write([]byte("qux"))
+       b.CloseWithError(errFake)
+
+       s.checkReader(c, r2, []byte("wazqux"), errFake, nil)
+       <-done
+}
+
+// Write n*n bytes, n at a time; read them into n goroutines using
+// varying buffer sizes; compare checksums.
+func (s *Suite) TestManyReaders(c *check.C) {
+       const n = 256
+
+       b := NewBuffer(nil)
+
+       expectSum := make(chan []byte)
+       go func() {
+               hash := md5.New()
+               buf := make([]byte, n)
+               for i := 0; i < n; i++ {
+                       time.Sleep(10 * time.Nanosecond)
+                       rand.Read(buf)
+                       b.Write(buf)
+                       hash.Write(buf)
+               }
+               expectSum <- hash.Sum(nil)
+               b.Close()
+       }()
+
+       gotSum := make(chan []byte)
+       for i := 0; i < n; i++ {
+               go func(bufSize int) {
+                       got := md5.New()
+                       io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize))
+                       gotSum <- got.Sum(nil)
+               }(i + n/2)
+       }
+
+       expect := <-expectSum
+       for i := 0; i < n; i++ {
+               c.Check(expect, check.DeepEquals, <-gotSum)
+       }
+}
+
+func (s *Suite) BenchmarkOneReader(c *check.C) {
+       s.benchmarkReaders(c, 1)
+}
+
+func (s *Suite) BenchmarkManyReaders(c *check.C) {
+       s.benchmarkReaders(c, 100)
+}
+
+func (s *Suite) benchmarkReaders(c *check.C, readers int) {
+       var n int64
+       t0 := time.Now()
+
+       buf := make([]byte, 10000)
+       rand.Read(buf)
+       for i := 0; i < 10; i++ {
+               b := NewBuffer(nil)
+               go func() {
+                       for i := 0; i < c.N; i++ {
+                               b.Write(buf)
+                       }
+                       b.Close()
+               }()
+
+               var wg sync.WaitGroup
+               for i := 0; i < readers; i++ {
+                       wg.Add(1)
+                       go func() {
+                               defer wg.Done()
+                               nn, _ := io.Copy(ioutil.Discard, b.NewReader())
+                               atomic.AddInt64(&n, int64(nn))
+                       }()
+               }
+               wg.Wait()
+       }
+       c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000)
+}
+
+func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
+       buf, err := ioutil.ReadAll(r)
+       c.Check(err, check.Equals, expectError)
+       c.Check(buf, check.DeepEquals, expectData)
+       if done != nil {
+               done <- true
+       }
+}
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
index 726b81362ca6d4e9b1db43f6f8677111039911b3..9295c14cc24a47cd38479e19a8aa57dc91c1c42a 100644 (file)
@@ -72,19 +72,16 @@ func (this HashCheckingReader) Close() (err error) {
        _, err = io.Copy(this.Hash, this.Reader)
 
        if closer, ok := this.Reader.(io.Closer); ok {
-               err2 := closer.Close()
-               if err2 != nil && err == nil {
-                       return err2
+               closeErr := closer.Close()
+               if err == nil {
+                       err = closeErr
                }
        }
        if err != nil {
                return err
        }
-
-       sum := this.Hash.Sum(nil)
-       if fmt.Sprintf("%x", sum) != this.Check {
-               err = BadChecksum
+       if fmt.Sprintf("%x", this.Hash.Sum(nil)) != this.Check {
+               return BadChecksum
        }
-
-       return err
+       return nil
 }
index cbfad8177da775337bf2b528a99ff9a0757cbaa0..37d651e31fbd971defa3217d6c11e883c4a073cc 100644 (file)
@@ -21,7 +21,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
+       "git.curoverse.com/arvados.git/sdk/go/asyncbuf"
 )
 
 // A Keep "block" is 64MB.
@@ -156,10 +156,12 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
                bufsize = BLOCKSIZE
        }
 
-       t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
-       defer t.Close()
-
-       return kc.putReplicas(hash, t, dataBytes)
+       buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+       go func() {
+               _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+               buf.CloseWithError(err)
+       }()
+       return kc.putReplicas(hash, buf.NewReader, dataBytes)
 }
 
 // PutHB writes a block to Keep. The hash of the bytes is given in
@@ -167,9 +169,8 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
 //
 // Return values are the same as for PutHR.
 func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
-       t := streamer.AsyncStreamFromSlice(buf)
-       defer t.Close()
-       return kc.putReplicas(hash, t, int64(len(buf)))
+       newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+       return kc.putReplicas(hash, newReader, int64(len(buf)))
 }
 
 // PutB writes a block to Keep. It computes the hash itself.
index 3ce4e7425aa273e2753ddcaab516a86ecd34d59a..055141cbe88165cbe93a79d903d0b69a3731c672 100644 (file)
@@ -5,6 +5,7 @@
 package keepclient
 
 import (
+       "bytes"
        "crypto/md5"
        "errors"
        "fmt"
@@ -20,7 +21,6 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
        . "gopkg.in/check.v1"
 )
 
@@ -172,18 +172,8 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
                make(chan string)}
 
        UploadToStubHelper(c, st,
-               func(kc *KeepClient, url string, reader io.ReadCloser,
-                       writer io.WriteCloser, upload_status chan uploadStatus) {
-
-                       tr := streamer.AsyncStreamFromReader(512, reader)
-                       defer tr.Close()
-
-                       br1 := tr.MakeStreamReader()
-
-                       go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
-
-                       writer.Write([]byte("foo"))
-                       writer.Close()
+               func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
+                       go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, 0)
 
                        <-st.handled
 
index 49ef543d872f94d169c2e76b422cffee76ef86ed..37912506a2cb6ab7c014a0edac13e922c20526d6 100644 (file)
@@ -17,7 +17,6 @@ import (
        "strings"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
 )
 
 // Function used to emit debug messages. The easiest way to enable
@@ -57,7 +56,7 @@ type uploadStatus struct {
        response        string
 }
 
-func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
        upload_status chan<- uploadStatus, expectedLength int64, requestID int32) {
 
        var req *http.Request
@@ -66,21 +65,16 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
        if req, err = http.NewRequest("PUT", url, nil); err != nil {
                DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error())
                upload_status <- uploadStatus{err, url, 0, 0, ""}
-               body.Close()
                return
        }
 
        req.ContentLength = expectedLength
        if expectedLength > 0 {
-               // Do() will close the body ReadCloser when it is done
-               // with it.
-               req.Body = body
+               req.Body = ioutil.NopCloser(body)
        } else {
-               // "For client requests, a value of 0 means unknown if Body is
-               // not nil."  In this case we do want the body to be empty, so
-               // don't set req.Body.  However, we still need to close the
-               // body ReadCloser.
-               body.Close()
+               // "For client requests, a value of 0 means unknown if
+               // Body is not nil."  In this case we do want the body
+               // to be empty, so don't set req.Body.
        }
 
        req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
@@ -121,7 +115,7 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
 
 func (this *KeepClient) putReplicas(
        hash string,
-       tr *streamer.AsyncStream,
+       getReader func() io.Reader,
        expectedLength int64) (locator string, replicas int, err error) {
 
        // Generate an arbitrary ID to identify this specific
@@ -174,7 +168,7 @@ func (this *KeepClient) putReplicas(
                                // Start some upload requests
                                if next_server < len(sv) {
                                        DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
-                                       go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
+                                       go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID)
                                        next_server += 1
                                        active += 1
                                } else {
diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go
deleted file mode 100644 (file)
index 396e311..0000000
+++ /dev/null
@@ -1,158 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* AsyncStream pulls data in from a io.Reader source (such as a file or network
-socket) and fans out to any number of StreamReader sinks.
-
-Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
-any point in the lifetime of the AsyncStream, and each StreamReader will read
-the contents of the buffer up to the "frontier" of the buffer, at which point
-the StreamReader blocks until new data is read from the source.
-
-This is useful for minimizing readthrough latency as sinks can read and act on
-data from the source without waiting for the source to be completely buffered.
-It is also useful as a cache in situations where re-reading the original source
-potentially is costly, since the buffer retains a copy of the source data.
-
-Usage:
-
-Begin reading into a buffer with maximum size 'buffersize' from 'source':
-  stream := AsyncStreamFromReader(buffersize, source)
-
-To create a new reader (this can be called multiple times, each reader starts
-at the beginning of the buffer):
-  reader := tr.MakeStreamReader()
-
-Make sure to close the reader when you're done with it.
-  reader.Close()
-
-When you're done with the stream:
-  stream.Close()
-
-Alternately, if you already have a filled buffer and just want to read out from it:
-  stream := AsyncStreamFromSlice(buf)
-
-  r := tr.MakeStreamReader()
-
-*/
-
-package streamer
-
-import (
-       "errors"
-       "io"
-)
-
-var ErrAlreadyClosed = errors.New("cannot close a stream twice")
-
-type AsyncStream struct {
-       buffer            []byte
-       requests          chan sliceRequest
-       add_reader        chan bool
-       subtract_reader   chan bool
-       wait_zero_readers chan bool
-       closed            bool
-}
-
-// Reads from the buffer managed by the Transfer()
-type StreamReader struct {
-       offset    int
-       stream    *AsyncStream
-       responses chan sliceResult
-}
-
-func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-       t := &AsyncStream{
-               buffer:            make([]byte, buffersize),
-               requests:          make(chan sliceRequest),
-               add_reader:        make(chan bool),
-               subtract_reader:   make(chan bool),
-               wait_zero_readers: make(chan bool),
-       }
-
-       go t.transfer(source)
-       go t.readersMonitor()
-
-       return t
-}
-
-func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{
-               buffer:            buf,
-               requests:          make(chan sliceRequest),
-               add_reader:        make(chan bool),
-               subtract_reader:   make(chan bool),
-               wait_zero_readers: make(chan bool),
-       }
-
-       go t.transfer(nil)
-       go t.readersMonitor()
-
-       return t
-}
-
-func (this *AsyncStream) MakeStreamReader() *StreamReader {
-       this.add_reader <- true
-       return &StreamReader{0, this, make(chan sliceResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this *StreamReader) Read(p []byte) (n int, err error) {
-       this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
-       rr, valid := <-this.responses
-       if valid {
-               this.offset += len(rr.slice)
-               return copy(p, rr.slice), rr.err
-       } else {
-               return 0, io.ErrUnexpectedEOF
-       }
-}
-
-func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
-       // Record starting offset in order to correctly report the number of bytes sent
-       starting_offset := this.offset
-       for {
-               this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
-               rr, valid := <-this.responses
-               if valid {
-                       this.offset += len(rr.slice)
-                       if rr.err != nil {
-                               if rr.err == io.EOF {
-                                       // EOF is not an error.
-                                       return int64(this.offset - starting_offset), nil
-                               } else {
-                                       return int64(this.offset - starting_offset), rr.err
-                               }
-                       } else {
-                               dest.Write(rr.slice)
-                       }
-               } else {
-                       return int64(this.offset), io.ErrUnexpectedEOF
-               }
-       }
-}
-
-// Close the responses channel
-func (this *StreamReader) Close() error {
-       if this.stream == nil {
-               return ErrAlreadyClosed
-       }
-       this.stream.subtract_reader <- true
-       close(this.responses)
-       this.stream = nil
-       return nil
-}
-
-func (this *AsyncStream) Close() error {
-       if this.closed {
-               return ErrAlreadyClosed
-       }
-       this.closed = true
-       this.wait_zero_readers <- true
-       close(this.requests)
-       close(this.add_reader)
-       close(this.subtract_reader)
-       close(this.wait_zero_readers)
-       return nil
-}
diff --git a/sdk/go/streamer/streamer_test.go b/sdk/go/streamer/streamer_test.go
deleted file mode 100644 (file)
index f8ddbf5..0000000
+++ /dev/null
@@ -1,381 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package streamer
-
-import (
-       . "gopkg.in/check.v1"
-       "io"
-       "testing"
-       "time"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) { TestingT(t) }
-
-var _ = Suite(&StandaloneSuite{})
-
-// Standalone tests
-type StandaloneSuite struct{}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
-       ReadIntoBufferHelper(c, 225)
-       ReadIntoBufferHelper(c, 224)
-}
-
-func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
-       out := make([]byte, 128)
-       for i := 0; i < 128; i += 1 {
-               out[i] = byte(i)
-       }
-       writer.Write(out)
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 128)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 128; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-}
-
-func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
-       out := make([]byte, 96)
-       for i := 0; i < 96; i += 1 {
-               out[i] = byte(i / 2)
-       }
-       writer.Write(out)
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 96)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 96; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i/2))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else if i < (128 + 96) {
-                       c.Check(buffer[i], Equals, byte((i-128)/2))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-}
-
-func ReadIntoBufferHelper(c *C, bufsize int) {
-       buffer := make([]byte, bufsize)
-
-       reader, writer := io.Pipe()
-       slices := make(chan nextSlice)
-
-       go readIntoBuffer(buffer, reader, slices)
-
-       HelperWrite128andCheck(c, buffer, writer, slices)
-       HelperWrite96andCheck(c, buffer, writer, slices)
-
-       writer.Close()
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 0)
-       c.Check(s1.reader_error, Equals, io.EOF)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
-       buffer := make([]byte, 223)
-       reader, writer := io.Pipe()
-       slices := make(chan nextSlice)
-
-       go readIntoBuffer(buffer, reader, slices)
-
-       HelperWrite128andCheck(c, buffer, writer, slices)
-
-       out := make([]byte, 96)
-       for i := 0; i < 96; i += 1 {
-               out[i] = byte(i / 2)
-       }
-
-       // Write will deadlock because it can't write all the data, so
-       // spin it off to a goroutine
-       go writer.Write(out)
-       s1 := <-slices
-
-       c.Check(len(s1.slice), Equals, 95)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 95; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i/2))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else if i < (128 + 95) {
-                       c.Check(buffer[i], Equals, byte((i-128)/2))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-
-       writer.Close()
-       s1 = <-slices
-       c.Check(len(s1.slice), Equals, 0)
-       c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(512, reader)
-
-       br1 := tr.MakeStreamReader()
-       out := make([]byte, 128)
-
-       {
-               // Write some data, and read into a buffer shorter than
-               // available data
-               for i := 0; i < 128; i += 1 {
-                       out[i] = byte(i)
-               }
-
-               writer.Write(out[:100])
-
-               in := make([]byte, 64)
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 64)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 64; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-       }
-
-       {
-               // Write some more data, and read into buffer longer than
-               // available data
-               in := make([]byte, 64)
-               n, err := br1.Read(in)
-               c.Check(n, Equals, 36)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 36; i += 1 {
-                       c.Check(in[i], Equals, out[64+i])
-               }
-
-       }
-
-       {
-               // Test read before write
-               type Rd struct {
-                       n   int
-                       err error
-               }
-               rd := make(chan Rd)
-               in := make([]byte, 64)
-
-               go func() {
-                       n, err := br1.Read(in)
-                       rd <- Rd{n, err}
-               }()
-
-               time.Sleep(100 * time.Millisecond)
-               writer.Write(out[100:])
-
-               got := <-rd
-
-               c.Check(got.n, Equals, 28)
-               c.Check(got.err, Equals, nil)
-
-               for i := 0; i < 28; i += 1 {
-                       c.Check(in[i], Equals, out[100+i])
-               }
-       }
-
-       br2 := tr.MakeStreamReader()
-       {
-               // Test 'catch up' reader
-               in := make([]byte, 256)
-               n, err := br2.Read(in)
-
-               c.Check(n, Equals, 128)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 128; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-       }
-
-       {
-               // Test closing the reader
-               writer.Close()
-
-               in := make([]byte, 256)
-               n1, err1 := br1.Read(in)
-               n2, err2 := br2.Read(in)
-               c.Check(n1, Equals, 0)
-               c.Check(err1, Equals, io.EOF)
-               c.Check(n2, Equals, 0)
-               c.Check(err2, Equals, io.EOF)
-       }
-
-       {
-               // Test 'catch up' reader after closing
-               br3 := tr.MakeStreamReader()
-               in := make([]byte, 256)
-               n, err := br3.Read(in)
-
-               c.Check(n, Equals, 128)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 128; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-
-               n, err = br3.Read(in)
-
-               c.Check(n, Equals, 0)
-               c.Check(err, Equals, io.EOF)
-       }
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(100, reader)
-       defer tr.Close()
-
-       sr := tr.MakeStreamReader()
-       defer sr.Close()
-
-       out := make([]byte, 101)
-       go writer.Write(out)
-
-       n, err := sr.Read(out)
-       c.Check(n, Equals, 100)
-       c.Check(err, IsNil)
-
-       n, err = sr.Read(out)
-       c.Check(n, Equals, 0)
-       c.Check(err, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
-       // Buffer for reads from 'r'
-       buffer := make([]byte, 100)
-       for i := 0; i < 100; i += 1 {
-               buffer[i] = byte(i)
-       }
-
-       tr := AsyncStreamFromSlice(buffer)
-
-       br1 := tr.MakeStreamReader()
-
-       in := make([]byte, 64)
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 64)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 64; i += 1 {
-                       c.Check(in[i], Equals, buffer[i])
-               }
-       }
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 36)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 36; i += 1 {
-                       c.Check(in[i], Equals, buffer[64+i])
-               }
-       }
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 0)
-               c.Check(err, Equals, io.EOF)
-       }
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
-       // Buffer for reads from 'r'
-       buffer := make([]byte, 100)
-       for i := 0; i < 100; i += 1 {
-               buffer[i] = byte(i)
-       }
-
-       tr := AsyncStreamFromSlice(buffer)
-       defer tr.Close()
-
-       br1 := tr.MakeStreamReader()
-       defer br1.Close()
-
-       reader, writer := io.Pipe()
-
-       go func() {
-               p := make([]byte, 100)
-               n, err := reader.Read(p)
-               c.Check(n, Equals, 100)
-               c.Check(err, Equals, nil)
-               c.Check(p, DeepEquals, buffer)
-       }()
-
-       io.Copy(writer, br1)
-}
-
-func (s *StandaloneSuite) TestManyReaders(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(512, reader)
-       defer tr.Close()
-
-       sr := tr.MakeStreamReader()
-       go func() {
-               time.Sleep(100 * time.Millisecond)
-               sr.Close()
-       }()
-
-       for i := 0; i < 200; i += 1 {
-               go func() {
-                       br1 := tr.MakeStreamReader()
-                       defer br1.Close()
-
-                       p := make([]byte, 3)
-                       n, err := br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("foo"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("bar"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("baz"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 0)
-                       c.Check(err, Equals, io.EOF)
-               }()
-       }
-
-       writer.Write([]byte("foo"))
-       writer.Write([]byte("bar"))
-       writer.Write([]byte("baz"))
-       writer.Close()
-}
-
-func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
-       buffer := make([]byte, 100)
-       tr := AsyncStreamFromSlice(buffer)
-       sr := tr.MakeStreamReader()
-       c.Check(sr.Close(), IsNil)
-       c.Check(sr.Close(), Equals, ErrAlreadyClosed)
-       c.Check(tr.Close(), IsNil)
-       c.Check(tr.Close(), Equals, ErrAlreadyClosed)
-}
diff --git a/sdk/go/streamer/transfer.go b/sdk/go/streamer/transfer.go
deleted file mode 100644 (file)
index bea27f8..0000000
+++ /dev/null
@@ -1,310 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* Internal implementation of AsyncStream.
-Outline of operation:
-
-The kernel is the transfer() goroutine.  It manages concurrent reads and
-appends to the "body" slice.  "body" is a slice of "source_buffer" that
-represents the segment of the buffer that is already filled in and available
-for reading.
-
-To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
-from the io.Reader source directly into source_buffer.  Each read goes into a
-slice of buffer which spans the section immediately following the end of the
-current "body".  Each time a Read completes, a slice representing the the
-section just filled in (or any read errors/EOF) is sent over the "slices"
-channel back to the transfer() function.
-
-Meanwhile, the transfer() function selects() on two channels, the "requests"
-channel and the "slices" channel.
-
-When a message is received on the "slices" channel, this means the a new
-section of the buffer has data, or an error is signaled.  Since the data has
-been read directly into the source_buffer, it is able to simply increases the
-size of the body slice to encompass the newly filled in section.  Then any
-pending reads are serviced with handleReadRequest (described below).
-
-When a message is received on the "requests" channel, it means a StreamReader
-wants access to a slice of the buffer.  This is passed to handleReadRequest().
-
-The handleReadRequest() function takes a sliceRequest consisting of a buffer
-offset, maximum size, and channel to send the response.  If there was an error
-reported from the source reader, it is returned.  If the offset is less than
-the size of the body, the request can proceed, and it sends a body slice
-spanning the segment from offset to min(offset+maxsize, end of the body).  If
-source reader status is EOF (done filling the buffer) and the read request
-offset is beyond end of the body, it responds with EOF.  Otherwise, the read
-request is for a slice beyond the current size of "body" but we expect the body
-to expand as more data is added, so the request gets added to a wait list.
-
-The transfer() runs until the requests channel is closed by AsyncStream.Close()
-
-To track readers, streamer uses the readersMonitor() goroutine.  This goroutine
-chooses which channels to receive from based on the number of outstanding
-readers.  When a new reader is created, it sends a message on the add_reader
-channel.  If the number of readers is already at MAX_READERS, this blocks the
-sender until an existing reader is closed.  When a reader is closed, it sends a
-message on the subtract_reader channel.  Finally, when AsyncStream.Close() is
-called, it sends a message on the wait_zero_readers channel, which will block
-the sender unless there are zero readers and it is safe to shut down the
-AsyncStream.
-*/
-
-package streamer
-
-import (
-       "io"
-)
-
-const MAX_READERS = 100
-
-// A slice passed from readIntoBuffer() to transfer()
-type nextSlice struct {
-       slice        []byte
-       reader_error error
-}
-
-// A read request to the Transfer() function
-type sliceRequest struct {
-       offset  int
-       maxsize int
-       result  chan<- sliceResult
-}
-
-// A read result from the Transfer() function
-type sliceResult struct {
-       slice []byte
-       err   error
-}
-
-// Supports writing into a buffer
-type bufferWriter struct {
-       buf []byte
-       ptr int
-}
-
-// Copy p into this.buf, increment pointer and return number of bytes read.
-func (this *bufferWriter) Write(p []byte) (n int, err error) {
-       n = copy(this.buf[this.ptr:], p)
-       this.ptr += n
-       return n, nil
-}
-
-// Read repeatedly from the reader and write sequentially into the specified
-// buffer, and report each read to channel 'c'.  Completes when Reader 'r'
-// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
-       defer close(slices)
-
-       if writeto, ok := r.(io.WriterTo); ok {
-               n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
-               if err != nil {
-                       slices <- nextSlice{nil, err}
-               } else {
-                       slices <- nextSlice{buffer[:n], nil}
-                       slices <- nextSlice{nil, io.EOF}
-               }
-               return
-       } else {
-               // Initially entire buffer is available
-               ptr := buffer[:]
-               for {
-                       var n int
-                       var err error
-                       if len(ptr) > 0 {
-                               const readblock = 64 * 1024
-                               // Read 64KiB into the next part of the buffer
-                               if len(ptr) > readblock {
-                                       n, err = r.Read(ptr[:readblock])
-                               } else {
-                                       n, err = r.Read(ptr)
-                               }
-                       } else {
-                               // Ran out of buffer space, try reading one more byte
-                               var b [1]byte
-                               n, err = r.Read(b[:])
-
-                               if n > 0 {
-                                       // Reader has more data but we have nowhere to
-                                       // put it, so we're stuffed
-                                       slices <- nextSlice{nil, io.ErrShortBuffer}
-                               } else {
-                                       // Return some other error (hopefully EOF)
-                                       slices <- nextSlice{nil, err}
-                               }
-                               return
-                       }
-
-                       // End on error (includes EOF)
-                       if err != nil {
-                               slices <- nextSlice{nil, err}
-                               return
-                       }
-
-                       if n > 0 {
-                               // Make a slice with the contents of the read
-                               slices <- nextSlice{ptr[:n], nil}
-
-                               // Adjust the scratch space slice
-                               ptr = ptr[n:]
-                       }
-               }
-       }
-}
-
-// Handle a read request.  Returns true if a response was sent, and false if
-// the request should be queued.
-func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
-       if (reader_status != nil) && (reader_status != io.EOF) {
-               req.result <- sliceResult{nil, reader_status}
-               return true
-       } else if req.offset < len(body) {
-               var end int
-               if req.offset+req.maxsize < len(body) {
-                       end = req.offset + req.maxsize
-               } else {
-                       end = len(body)
-               }
-               req.result <- sliceResult{body[req.offset:end], nil}
-               return true
-       } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
-               req.result <- sliceResult{nil, io.EOF}
-               return true
-       } else {
-               return false
-       }
-}
-
-// Mediates between reads and appends.
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel.  Completes
-// when 'requests' channel is closed.
-func (this *AsyncStream) transfer(source_reader io.Reader) {
-       source_buffer := this.buffer
-       requests := this.requests
-
-       // currently buffered data
-       var body []byte
-
-       // for receiving slices from readIntoBuffer
-       var slices chan nextSlice = nil
-
-       // indicates the status of the underlying reader
-       var reader_status error = nil
-
-       if source_reader != nil {
-               // 'body' is the buffer slice representing the body content read so far
-               body = source_buffer[:0]
-
-               // used to communicate slices of the buffer as they are
-               // readIntoBuffer will close 'slices' when it is done with it
-               slices = make(chan nextSlice)
-
-               // Spin it off
-               go readIntoBuffer(source_buffer, source_reader, slices)
-       } else {
-               // use the whole buffer
-               body = source_buffer[:]
-
-               // buffer is complete
-               reader_status = io.EOF
-       }
-
-       pending_requests := make([]sliceRequest, 0)
-
-       for {
-               select {
-               case req, valid := <-requests:
-                       // Handle a buffer read request
-                       if valid {
-                               if !handleReadRequest(req, body, reader_status) {
-                                       pending_requests = append(pending_requests, req)
-                               }
-                       } else {
-                               // closed 'requests' channel indicates we're done
-                               return
-                       }
-
-               case bk, valid := <-slices:
-                       // Got a new slice from the reader
-                       if valid {
-                               reader_status = bk.reader_error
-
-                               if bk.slice != nil {
-                                       // adjust body bounds now that another slice has been read
-                                       body = source_buffer[0 : len(body)+len(bk.slice)]
-                               }
-
-                               // handle pending reads
-                               n := 0
-                               for n < len(pending_requests) {
-                                       if handleReadRequest(pending_requests[n], body, reader_status) {
-                                               // move the element from the back of the slice to
-                                               // position 'n', then shorten the slice by one element
-                                               pending_requests[n] = pending_requests[len(pending_requests)-1]
-                                               pending_requests = pending_requests[0 : len(pending_requests)-1]
-                                       } else {
-
-                                               // Request wasn't handled, so keep it in the request slice
-                                               n += 1
-                                       }
-                               }
-                       } else {
-                               if reader_status == nil {
-                                       // slices channel closed without signaling EOF
-                                       reader_status = io.ErrUnexpectedEOF
-                               }
-                               slices = nil
-                       }
-               }
-       }
-}
-
-func (this *AsyncStream) readersMonitor() {
-       var readers int = 0
-
-       for {
-               if readers == 0 {
-                       select {
-                       case _, ok := <-this.wait_zero_readers:
-                               if ok {
-                                       // nothing, just implicitly unblock the sender
-                               } else {
-                                       return
-                               }
-                       case _, ok := <-this.add_reader:
-                               if ok {
-                                       readers += 1
-                               } else {
-                                       return
-                               }
-                       }
-               } else if readers > 0 && readers < MAX_READERS {
-                       select {
-                       case _, ok := <-this.add_reader:
-                               if ok {
-                                       readers += 1
-                               } else {
-                                       return
-                               }
-
-                       case _, ok := <-this.subtract_reader:
-                               if ok {
-                                       readers -= 1
-                               } else {
-                                       return
-                               }
-                       }
-               } else if readers == MAX_READERS {
-                       _, ok := <-this.subtract_reader
-                       if ok {
-                               readers -= 1
-                       } else {
-                               return
-                       }
-               }
-       }
-}
index 9826cf2f906f5a7ecd532b4522d2940f41ca5457..6bdba7af89d803975faa40f50d6508ab0b25d953 100644 (file)
@@ -365,7 +365,7 @@ class ApplicationController < ActionController::Base
   end
 
   def require_auth_scope
-    if @read_auths.empty?
+    unless current_user && @read_auths.any? { |auth| auth.user.andand.uuid == current_user.uuid }
       if require_login != false
         send_error("Forbidden", status: 403)
       end
index 5eb756b9fa3609519a1fa63f8c3a1a9021ed190c..6a376318271472db857db6b926ba90d4d8262244 100644 (file)
@@ -32,21 +32,33 @@ class ArvadosApiToken
     user = nil
     api_client = nil
     api_client_auth = nil
-    supplied_token =
-      params["api_token"] ||
-      params["oauth_token"] ||
-      env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1]
-    if supplied_token
-      api_client_auth = ApiClientAuthorization.
+    if request.get? || params["_method"] == 'GET'
+      reader_tokens = params["reader_tokens"]
+      if reader_tokens.is_a? String
+        reader_tokens = SafeJSON.load(reader_tokens)
+      end
+    else
+      reader_tokens = nil
+    end
+
+    # Set current_user etc. based on the primary session token if a
+    # valid one is present. Otherwise, use the first valid token in
+    # reader_tokens.
+    [params["api_token"],
+     params["oauth_token"],
+     env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1],
+     *reader_tokens,
+    ].each do |supplied|
+      next if !supplied
+      try_auth = ApiClientAuthorization.
         includes(:api_client, :user).
-        where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
+        where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied).
         first
-      if api_client_auth.andand.user
+      if try_auth.andand.user
+        api_client_auth = try_auth
         user = api_client_auth.user
         api_client = api_client_auth.api_client
-      else
-        # Token seems valid, but points to a non-existent (deleted?) user.
-        api_client_auth = nil
+        break
       end
     end
     Thread.current[:api_client_ip_address] = remote_ip
index dba801920c7ed9365857db759dc6b4ac87be60aa..dfb57496a7704f7d0094f960fd3ec425ff3e55c1 100644 (file)
@@ -27,6 +27,20 @@ class ApiTokensScopeTest < ActionDispatch::IntegrationTest
     assert_response 403
   end
 
+  test "narrow + wide scoped tokens for different users" do
+    get_args = [{
+                  reader_tokens: [api_client_authorizations(:anonymous).api_token]
+                }, auth(:active_userlist)]
+    get(v1_url('users'), *get_args)
+    assert_response :success
+    get(v1_url('users', ''), *get_args)  # Add trailing slash.
+    assert_response :success
+    get(v1_url('users', 'current'), *get_args)
+    assert_response 403
+    get(v1_url('virtual_machines'), *get_args)
+    assert_response 403
+   end
+
   test "specimens token can see exactly owned specimens" do
     get_args = [{}, auth(:active_specimens)]
     get(v1_url('specimens'), *get_args)
index dd59f74eb4784ccbf857a6b6d07b16a283828e66..a60be093a31f033f15d4dbb96a22d4f35da0dda2 100644 (file)
@@ -50,22 +50,15 @@ class ReaderTokensTest < ActionDispatch::IntegrationTest
 
   [nil, :active_noscope].each do |main_auth|
     [:spectator, :spectator_specimens].each do |read_auth|
-      test "#{main_auth} auth with reader token #{read_auth} can read" do
-        assert_includes(get_specimen_uuids(main_auth, read_auth),
-                        spectator_specimen, "did not find spectator specimen")
-      end
-
-      test "#{main_auth} auth with JSON read token #{read_auth} can read" do
-        assert_includes(get_specimen_uuids(main_auth, read_auth, :to_json),
-                        spectator_specimen, "did not find spectator specimen")
-      end
-
-      test "#{main_auth} auth with reader token #{read_auth} can't write" do
-        assert_post_denied(main_auth, read_auth)
-      end
+      [:to_a, :to_json].each do |formatter|
+        test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can#{"'t" if main_auth} read" do
+          get_specimens(main_auth, read_auth)
+          assert_response(if main_auth then 403 else 200 end)
+        end
 
-      test "#{main_auth} auth with JSON read token #{read_auth} can't write" do
-        assert_post_denied(main_auth, read_auth, :to_json)
+        test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can't write" do
+          assert_post_denied(main_auth, read_auth, formatter)
+        end
       end
     end
   end
index 27a548aa61e94c5bb9e555a6737cd5bcdf6b1c9f..fc0dda718ceda7fddd2e1f41c704fa434fdb0034 100644 (file)
@@ -228,6 +228,32 @@ func (runner *ContainerRunner) stopSignals() {
        }
 }
 
+var errorBlacklist = []string{"Cannot connect to the Docker daemon"}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+
+func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+       for _, d := range errorBlacklist {
+               if strings.Index(goterr.Error(), d) != -1 {
+                       runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+                       if *brokenNodeHook == "" {
+                               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+                       } else {
+                               runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+                               // run killme script
+                               c := exec.Command(*brokenNodeHook)
+                               c.Stdout = runner.CrunchLog
+                               c.Stderr = runner.CrunchLog
+                               err := c.Run()
+                               if err != nil {
+                                       runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+                               }
+                       }
+                       return true
+               }
+       }
+       return false
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -1487,7 +1513,11 @@ func (runner *ContainerRunner) Run() (err error) {
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
-               runner.finalState = "Cancelled"
+               if !runner.checkBrokenNode(err) {
+                       // Failed to load image but not due to a "broken node"
+                       // condition, probably user error.
+                       runner.finalState = "Cancelled"
+               }
                err = fmt.Errorf("While loading container image: %v", err)
                return
        }
@@ -1516,8 +1546,6 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       runner.StartCrunchstat()
-
        if runner.IsCancelled() {
                return
        }
@@ -1528,8 +1556,11 @@ func (runner *ContainerRunner) Run() (err error) {
        }
        runner.finalState = "Cancelled"
 
+       runner.StartCrunchstat()
+
        err = runner.StartContainer()
        if err != nil {
+               runner.checkBrokenNode(err)
                return
        }
 
@@ -1607,25 +1638,27 @@ func main() {
        }
        api.Retries = 8
 
-       var kc *keepclient.KeepClient
-       kc, err = keepclient.MakeKeepClient(api)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
+       kc, kcerr := keepclient.MakeKeepClient(api)
+       if kcerr != nil {
+               log.Fatalf("%s: %v", containerId, kcerr)
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
-       var docker *dockerclient.Client
        // API version 1.21 corresponds to Docker 1.9, which is currently the
        // minimum version we want to support.
-       docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
-       }
-
+       docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
        dockerClientProxy := ThinDockerClientProxy{Docker: docker}
 
        cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+
+       if dockererr != nil {
+               cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+               cr.checkBrokenNode(dockererr)
+               cr.CrunchLog.Close()
+               os.Exit(1)
+       }
+
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent
index bc0b3125c9e699414a3d08f32cb88868769a346d..4702838362c04b1237593f66a789ea96848bb3c3 100644 (file)
@@ -156,6 +156,10 @@ func (t *TestDockerClient) ContainerWait(ctx context.Context, container string,
 }
 
 func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+       if t.finish == 2 {
+               return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+       }
+
        if t.imageLoaded == image {
                return dockertypes.ImageInspect{}, nil, nil
        } else {
@@ -164,6 +168,9 @@ func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string
 }
 
 func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+       if t.finish == 2 {
+               return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+       }
        _, err := io.Copy(ioutil.Discard, input)
        if err != nil {
                return dockertypes.ImageLoadResponse{}, err
@@ -668,9 +675,10 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f
        if api.CalledWith("container.state", "Complete") != nil {
                c.Check(err, IsNil)
        }
-       c.Check(api.WasSetRunning, Equals, true)
-
-       c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+       if exitCode != 2 {
+               c.Check(api.WasSetRunning, Equals, true)
+               c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+       }
 
        if err != nil {
                for k, v := range api.Logs {
@@ -1759,3 +1767,61 @@ func (s *TestSuite) TestEvalSymlinkDir(c *C) {
        _, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "", 0)
        c.Assert(err, NotNil)
 }
+
+func (s *TestSuite) TestFullBrokenDocker1(c *C) {
+       tf, err := ioutil.TempFile("", "brokenNodeHook-")
+       c.Assert(err, IsNil)
+       defer os.Remove(tf.Name())
+
+       tf.Write([]byte(`#!/bin/sh
+exec echo killme
+`))
+       tf.Close()
+       os.Chmod(tf.Name(), 0700)
+
+       ech := tf.Name()
+       brokenNodeHook = &ech
+
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello world\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
+
+}
+
+func (s *TestSuite) TestFullBrokenDocker2(c *C) {
+       ech := ""
+       brokenNodeHook = &ech
+
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello world\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+}
index 25619bbb91a3629d8d90563d051b8a31578ed121..23bb2bd216c1f1080d246e63a47252564e4d9bf4 100644 (file)
@@ -11,10 +11,12 @@ import (
        "fmt"
        "io/ioutil"
        "log"
+       "math/rand"
        "net/http"
        "net/http/httptest"
        "os"
        "strings"
+       "sync"
        "testing"
        "time"
 
@@ -216,6 +218,33 @@ func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
        }
 }
 
+func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
+       kc := runProxy(c, nil, false)
+       defer closeListener()
+       router.(*proxyHandler).timeout = time.Nanosecond
+
+       buf := make([]byte, 1<<20)
+       rand.Read(buf)
+       var wg sync.WaitGroup
+       for i := 0; i < 128; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       kc.PutB(buf)
+               }()
+       }
+       done := make(chan bool)
+       go func() {
+               wg.Wait()
+               close(done)
+       }()
+       select {
+       case <-done:
+       case <-time.After(10 * time.Second):
+               c.Error("timeout")
+       }
+}
+
 func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
        kc := runProxy(c, nil, false)
        defer closeListener()
index cf4608733dc2ca3133411dbe2f617c059770f044..f29066ec9fcb71de007837e295de61bcefc85ffa 100644 (file)
@@ -77,7 +77,7 @@ RUN set -e && \
 
 RUN pip install -U setuptools
 
-ENV NODEVERSION v6.11.2
+ENV NODEVERSION v6.11.4
 
 # Install nodejs binary
 RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
index 39bc21c3ae4f3c7a39f573ee1ee76f4d01eb2991..80344c16f2ef9bfcf0f97bf14f07a8c1cb97ce73 100644 (file)
@@ -5,12 +5,15 @@
 FROM arvados/arvbox-base
 ARG arvados_version
 ARG sso_version=master
+ARG composer_version=master
 
 RUN cd /usr/src && \
     git clone --no-checkout https://github.com/curoverse/arvados.git && \
     git -C arvados checkout ${arvados_version} && \
     git clone --no-checkout https://github.com/curoverse/sso-devise-omniauth-provider.git sso && \
-    git -C sso checkout ${sso_version}
+    git -C sso checkout ${sso_version} && \
+    git clone --no-checkout https://github.com/curoverse/composer.git && \
+    git -C composer checkout ${composer_version}
 
 ADD service/ /var/lib/arvbox/service
 RUN ln -sf /var/lib/arvbox/service /etc
index 1dfffaf59ef3e77bd4670f9cd5de664d64d55cf7..ab20d5758c96a5f298e3ce25e5248611a3446e21 100755 (executable)
@@ -83,6 +83,7 @@ fi
 rm -rf tmp
 mkdir -p tmp/cache
 
+bundle exec rake assets:precompile
 bundle exec rake db:migrate
 
 set +u
index 1bd89f9f4d77f07588960fbf8c7a23bac96b5b55..a41922bb343948656d838f3d958c0b8131fa9d26 100755 (executable)
@@ -13,6 +13,12 @@ rm -rf tmp
 mkdir tmp
 chown arvbox:arvbox tmp
 
+if test -s /var/lib/arvados/workbench_rails_env ; then
+  export RAILS_ENV=$(cat /var/lib/arvados/workbench_rails_env)
+else
+  export RAILS_ENV=development
+fi
+
 if test "$1" != "--only-deps" ; then
     exec bundle exec passenger start --port 80 \
          --user arvbox --runtime-dir=/var/lib/passenger
index 8382a1cf30e1e17ee811e93b2b8c72ccf1139c7e..885385aeef971816b08f5f32f17b51ebf23ff2eb 100755 (executable)
@@ -45,6 +45,9 @@ $RAILS_ENV:
   keep_web_download_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
   keep_web_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
   arvados_docsite: http://$localip:${services[doc]}/
+  force_ssl: false
 EOF
 
+bundle exec rake assets:precompile
+
 (cd config && /usr/local/lib/arvbox/application_yml_override.py)