From: Peter Amstutz Date: Mon, 27 Nov 2017 19:37:11 +0000 (-0500) Subject: Merge branch '12616-acr-validate' closes #12616 X-Git-Tag: 1.1.2~46 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/0d06a2984420d9d48e16ccb6d85982b3dce05644?hp=36e2a9db3259bc73d09176d7e6b86bd448b724e9 Merge branch '12616-acr-validate' closes #12616 Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- diff --git a/apps/workbench/app/assets/javascripts/components/collections.js b/apps/workbench/app/assets/javascripts/components/search.js similarity index 68% rename from apps/workbench/app/assets/javascripts/components/collections.js rename to apps/workbench/app/assets/javascripts/components/search.js index 591bf38aa7..2fe73193e7 100644 --- a/apps/workbench/app/assets/javascripts/components/collections.js +++ b/apps/workbench/app/assets/javascripts/components/search.js @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0 -window.CollectionsTable = { +window.SearchResultsTable = { maybeLoadMore: function(dom) { var loader = this.loader if (loader.state != loader.READY) @@ -37,6 +37,10 @@ window.CollectionsTable = { }, view: function(vnode) { var loader = vnode.attrs.loader + var iconsMap = { + collections: m('i.fa.fa-fw.fa-archive'), + projects: m('i.fa.fa-fw.fa-folder'), + } return m('table.table.table-condensed', [ m('thead', m('tr', [ m('th'), @@ -50,8 +54,13 @@ window.CollectionsTable = { m('td', [ item.workbenchBaseURL() && m('a.btn.btn-xs.btn-default', { - href: item.workbenchBaseURL()+'collections/'+item.uuid, - }, 'Show'), + 'data-original-title': 'show '+item.objectType.description, + 'data-placement': 'top', + 'data-toggle': 'tooltip', + href: item.workbenchBaseURL()+'/'+item.objectType.wb_path+'/'+item.uuid, + // Bootstrap's tooltip feature + oncreate: function(vnode) { $(vnode.dom).tooltip() }, + }, iconsMap[item.objectType.wb_path]), ]), m('td.arvados-uuid', item.uuid), m('td', item.name || '(unnamed)'), @@ -79,7 +88,7 @@ window.CollectionsTable = { }, } -window.CollectionsSearch = { +window.Search = { oninit: function(vnode) { vnode.state.sessionDB = new SessionDB() vnode.state.searchEntered = m.stream() @@ -97,28 +106,50 @@ window.CollectionsSearch = { var workbenchBaseURL = function() { return vnode.state.sessionDB.workbenchBaseURL(session) } - return new MultipageLoader({ + var searchable_objects = [ + { + wb_path: 'projects', + api_path: 'arvados/v1/groups', + filters: [['group_class', '=', 'project']], + description: 'project', + }, + { + wb_path: 'collections', + api_path: 'arvados/v1/collections', + filters: [], + description: 'collection', + }, + ] + return new MergingLoader({ sessionKey: key, - loadFunc: function(filters) { - var tsquery = to_tsquery(q) - if (tsquery) { - filters = filters.slice(0) - filters.push(['any', '@@', tsquery]) - } - return vnode.state.sessionDB.request(session, 'arvados/v1/collections', { - data: { - filters: JSON.stringify(filters), - count: 'none', + // For every session, search for every object type + children: searchable_objects.map(function(obj_type) { + return new MultipageLoader({ + sessionKey: key, + loadFunc: function(filters) { + // Apply additional type dependant filters + filters = filters.concat(obj_type.filters) + var tsquery = to_tsquery(q) + if (tsquery) { + filters.push(['any', '@@', tsquery]) + } + return vnode.state.sessionDB.request(session, obj_type.api_path, { + data: { + filters: JSON.stringify(filters), + count: 'none', + }, + }).then(function(resp) { + resp.items.map(function(item) { + item.workbenchBaseURL = workbenchBaseURL + item.objectType = obj_type + }) + return resp + }) }, - }).then(function(resp) { - resp.items.map(function(item) { - item.workbenchBaseURL = workbenchBaseURL - }) - return resp }) - }, + }), }) - }) + }), }) }) }, @@ -141,7 +172,7 @@ window.CollectionsSearch = { m('.row', [ m('.col-md-6', [ m('.input-group', [ - m('input#search.form-control[placeholder=Search]', { + m('input#search.form-control[placeholder=Search collections and projects]', { oninput: m.withAttr('value', vnode.state.searchEntered), value: vnode.state.searchEntered(), }), @@ -163,7 +194,7 @@ window.CollectionsSearch = { m('a[href="/sessions"]', 'Add/remove sites'), ]), ]), - m(CollectionsTable, { + m(SearchResultsTable, { loader: vnode.state.loader, }), ], diff --git a/apps/workbench/app/assets/javascripts/components/sessions.js b/apps/workbench/app/assets/javascripts/components/sessions.js index 3d127f1714..e7cc505573 100644 --- a/apps/workbench/app/assets/javascripts/components/sessions.js +++ b/apps/workbench/app/assets/javascripts/components/sessions.js @@ -21,8 +21,8 @@ window.SessionsTable = { return m('.container', [ m('p', [ 'You can log in to multiple Arvados sites here, then use the ', - m('a[href="/collections/multisite"]', 'multi-site search'), - ' page to search collections on all sites at once.', + m('a[href="/search"]', 'multi-site search'), + ' page to search collections and projects on all sites at once.', ]), m('table.table.table-condensed.table-hover', [ m('thead', m('tr', [ diff --git a/apps/workbench/app/assets/javascripts/models/session_db.js b/apps/workbench/app/assets/javascripts/models/session_db.js index 01b0d72728..ad9ad18784 100644 --- a/apps/workbench/app/assets/javascripts/models/session_db.js +++ b/apps/workbench/app/assets/javascripts/models/session_db.js @@ -135,7 +135,9 @@ window.SessionDB = function() { // the host part of apihostport is an IPv4 or [IPv6] // address. if (!session.baseURL.match('://(\\[|\\d+\\.\\d+\\.\\d+\\.\\d+[:/])')) - return session.baseURL.replace('://', '://workbench.') + var wbUrl = session.baseURL.replace('://', '://workbench.') + // Remove the trailing slash, if it's there. + return wbUrl.slice(-1) == '/' ? wbUrl.slice(0, -1) : wbUrl return null }, // Return a m.stream that will get fulfilled with the diff --git a/apps/workbench/app/controllers/collections_controller.rb b/apps/workbench/app/controllers/collections_controller.rb index 779d95c45b..5fcb2dc569 100644 --- a/apps/workbench/app/controllers/collections_controller.rb +++ b/apps/workbench/app/controllers/collections_controller.rb @@ -16,7 +16,7 @@ class CollectionsController < ApplicationController skip_around_filter(:require_thread_api_token, only: [:show_file, :show_file_links]) skip_before_filter(:find_object_by_uuid, - only: [:provenance, :show_file, :show_file_links, :multisite]) + only: [:provenance, :show_file, :show_file_links]) # We depend on show_file to display the user agreement: skip_before_filter :check_user_agreements, only: :show_file skip_before_filter :check_user_profile, only: :show_file diff --git a/apps/workbench/app/controllers/search_controller.rb b/apps/workbench/app/controllers/search_controller.rb index 40e484ea06..3775abd1ae 100644 --- a/apps/workbench/app/controllers/search_controller.rb +++ b/apps/workbench/app/controllers/search_controller.rb @@ -3,6 +3,8 @@ # SPDX-License-Identifier: AGPL-3.0 class SearchController < ApplicationController + skip_before_filter :ensure_arvados_api_exists + def find_objects_for_index search_what = Group if params[:project_uuid] diff --git a/apps/workbench/app/views/layouts/body.html.erb b/apps/workbench/app/views/layouts/body.html.erb index 15f654596c..c1399f2602 100644 --- a/apps/workbench/app/views/layouts/body.html.erb +++ b/apps/workbench/app/views/layouts/body.html.erb @@ -34,7 +34,7 @@ SPDX-License-Identifier: AGPL-3.0 %> <%= target = Rails.configuration.multi_site_search if target == true - target = {controller: 'collections', action: 'multisite'} + target = {controller: 'search', action: 'index'} end link_to("Multi-site search", target, {class: 'btn btn-default'}) %> diff --git a/apps/workbench/app/views/collections/multisite.html b/apps/workbench/app/views/search/index.html similarity index 66% rename from apps/workbench/app/views/collections/multisite.html rename to apps/workbench/app/views/search/index.html index 9b03f10f3d..6bcad0b1ae 100644 --- a/apps/workbench/app/views/collections/multisite.html +++ b/apps/workbench/app/views/search/index.html @@ -2,4 +2,4 @@ SPDX-License-Identifier: AGPL-3.0 --> -
+
diff --git a/apps/workbench/config/routes.rb b/apps/workbench/config/routes.rb index fee49c14ce..d969abd78c 100644 --- a/apps/workbench/config/routes.rb +++ b/apps/workbench/config/routes.rb @@ -95,7 +95,7 @@ ArvadosWorkbench::Application.routes.draw do post 'remove_selected_files', on: :member get 'tags', on: :member post 'save_tags', on: :member - get 'multisite', on: :collection + get 'multisite', on: :collection, to: redirect('/search') end get('/collections/download/:uuid/:reader_token/*file' => 'collections#show_file', format: false) @@ -109,7 +109,7 @@ ArvadosWorkbench::Application.routes.draw do get 'tab_counts', on: :member get 'public', on: :collection end - + resources :search do get 'choose', :on => :collection end @@ -131,9 +131,9 @@ ArvadosWorkbench::Application.routes.draw do match '/_health/ping', to: 'healthcheck#ping', via: [:get] get '/tests/mithril', to: 'tests#mithril' - + get '/status', to: 'status#status' - + # Send unroutable requests to an arbitrary controller # (ends up at ApplicationController#render_not_found) match '*a', to: 'links#render_not_found', via: [:get, :post] diff --git a/build/libcloud-pin.sh b/build/libcloud-pin.sh index f9a0b551e9..4182575651 100644 --- a/build/libcloud-pin.sh +++ b/build/libcloud-pin.sh @@ -2,4 +2,4 @@ # # SPDX-License-Identifier: AGPL-3.0 -LIBCLOUD_PIN=2.2.2.dev2 +LIBCLOUD_PIN=2.2.2.dev3 diff --git a/build/package-build-dockerfiles/ubuntu1204/Dockerfile b/build/package-build-dockerfiles/ubuntu1204/Dockerfile deleted file mode 100644 index 1d07db744c..0000000000 --- a/build/package-build-dockerfiles/ubuntu1204/Dockerfile +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: AGPL-3.0 - -FROM ubuntu:precise -MAINTAINER Ward Vandewege - -ENV DEBIAN_FRONTEND noninteractive - -# Install dependencies. -RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip build-essential unzip - -# Install RVM -RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \ - curl -L https://get.rvm.io | bash -s stable && \ - /usr/local/rvm/bin/rvm install 2.3 && \ - /usr/local/rvm/bin/rvm alias create default ruby-2.3 && \ - /usr/local/rvm/bin/rvm-exec default gem install bundler && \ - /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b - -# Install golang binary -ADD generated/go1.8.3.linux-amd64.tar.gz /usr/local/ -RUN ln -s /usr/local/go/bin/go /usr/local/bin/ - -# Install nodejs and npm -ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/ -RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/ - -# Old versions of setuptools cannot build a schema-salad package. -RUN pip install --upgrade setuptools - -ENV WORKSPACE /arvados -CMD ["/usr/local/rvm/bin/rvm-exec", "default", "bash", "/jenkins/run-build-packages.sh", "--target", "ubuntu1204"] diff --git a/build/package-test-dockerfiles/ubuntu1204/Dockerfile b/build/package-test-dockerfiles/ubuntu1204/Dockerfile deleted file mode 100644 index 75c0cea1de..0000000000 --- a/build/package-test-dockerfiles/ubuntu1204/Dockerfile +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: AGPL-3.0 - -FROM ubuntu:precise -MAINTAINER Ward Vandewege - -ENV DEBIAN_FRONTEND noninteractive - -# Install RVM -RUN apt-get update && \ - apt-get -y install --no-install-recommends curl ca-certificates g++ && \ - gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \ - curl -L https://get.rvm.io | bash -s stable && \ - /usr/local/rvm/bin/rvm install 2.3 && \ - /usr/local/rvm/bin/rvm alias create default ruby-2.3 - -# udev daemon can't start in a container, so don't try. -RUN mkdir -p /etc/udev/disabled - -RUN echo "deb file:///arvados/packages/ubuntu1204/ /" >>/etc/apt/sources.list diff --git a/build/run-build-packages-all-targets.sh b/build/run-build-packages-all-targets.sh index 7dd21a363d..4cba3e9a62 100755 --- a/build/run-build-packages-all-targets.sh +++ b/build/run-build-packages-all-targets.sh @@ -91,11 +91,16 @@ for dockerfile_path in $(find -name Dockerfile | grep package-build-dockerfiles) true else FINAL_EXITCODE=$? + echo + echo "Build packages failed for $(basename $(dirname "$dockerfile_path"))" + echo fi done if test $FINAL_EXITCODE != 0 ; then + echo echo "Build packages failed with code $FINAL_EXITCODE" >&2 + echo fi exit $FINAL_EXITCODE diff --git a/build/run-tests.sh b/build/run-tests.sh index 433685c5a8..3cfc692aae 100755 --- a/build/run-tests.sh +++ b/build/run-tests.sh @@ -100,7 +100,7 @@ sdk/go/health sdk/go/httpserver sdk/go/manifest sdk/go/blockdigest -sdk/go/streamer +sdk/go/asyncbuf sdk/go/stats sdk/go/crunchrunner sdk/cwl @@ -829,7 +829,7 @@ gostuff=( sdk/go/health sdk/go/httpserver sdk/go/manifest - sdk/go/streamer + sdk/go/asyncbuf sdk/go/crunchrunner sdk/go/stats lib/crunchstat diff --git a/sdk/go/asyncbuf/buf.go b/sdk/go/asyncbuf/buf.go new file mode 100644 index 0000000000..05af02f5f5 --- /dev/null +++ b/sdk/go/asyncbuf/buf.go @@ -0,0 +1,108 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +package asyncbuf + +import ( + "bytes" + "io" + "sync" +) + +// A Buffer is an io.Writer that distributes written data +// asynchronously to multiple concurrent readers. +// +// NewReader() can be called at any time. In all cases, every returned +// io.Reader reads all data written to the Buffer. +// +// Behavior is undefined if Write is called after Close or +// CloseWithError. +type Buffer interface { + io.WriteCloser + + // NewReader() returns an io.Reader that reads all data + // written to the Buffer. + NewReader() io.Reader + + // Close, but return the given error (instead of io.EOF) to + // all readers when they reach the end of the buffer. + // + // CloseWithError(nil) is equivalent to + // CloseWithError(io.EOF). + CloseWithError(error) error +} + +type buffer struct { + data *bytes.Buffer + cond sync.Cond + err error // nil if there might be more writes +} + +// NewBuffer creates a new Buffer using buf as its initial +// contents. The new Buffer takes ownership of buf, and the caller +// should not use buf after this call. +func NewBuffer(buf []byte) Buffer { + return &buffer{ + data: bytes.NewBuffer(buf), + cond: sync.Cond{L: &sync.Mutex{}}, + } +} + +func (b *buffer) Write(p []byte) (int, error) { + defer b.cond.Broadcast() + b.cond.L.Lock() + defer b.cond.L.Unlock() + if b.err != nil { + return 0, b.err + } + return b.data.Write(p) +} + +func (b *buffer) Close() error { + return b.CloseWithError(nil) +} + +func (b *buffer) CloseWithError(err error) error { + defer b.cond.Broadcast() + b.cond.L.Lock() + defer b.cond.L.Unlock() + if err == nil { + b.err = io.EOF + } else { + b.err = err + } + return nil +} + +func (b *buffer) NewReader() io.Reader { + return &reader{b: b} +} + +type reader struct { + b *buffer + read int // # bytes already read +} + +func (r *reader) Read(p []byte) (int, error) { + r.b.cond.L.Lock() + for { + switch { + case r.read < r.b.data.Len(): + buf := r.b.data.Bytes() + r.b.cond.L.Unlock() + n := copy(p, buf[r.read:]) + r.read += n + return n, nil + case r.b.err != nil || len(p) == 0: + // r.b.err != nil means we reached EOF. And + // even if we're not at EOF, there's no need + // to block if len(p)==0. + err := r.b.err + r.b.cond.L.Unlock() + return 0, err + default: + r.b.cond.Wait() + } + } +} diff --git a/sdk/go/asyncbuf/buf_test.go b/sdk/go/asyncbuf/buf_test.go new file mode 100644 index 0000000000..cc742a8cbe --- /dev/null +++ b/sdk/go/asyncbuf/buf_test.go @@ -0,0 +1,245 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +package asyncbuf + +import ( + "crypto/md5" + "errors" + "io" + "io/ioutil" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + check "gopkg.in/check.v1" +) + +var _ = check.Suite(&Suite{}) + +type Suite struct{} + +func (s *Suite) TestNoWrites(c *check.C) { + b := NewBuffer(nil) + r1 := b.NewReader() + r2 := b.NewReader() + b.Close() + s.checkReader(c, r1, []byte{}, nil, nil) + s.checkReader(c, r2, []byte{}, nil, nil) +} + +func (s *Suite) TestNoReaders(c *check.C) { + b := NewBuffer(nil) + n, err := b.Write([]byte("foobar")) + err2 := b.Close() + c.Check(n, check.Equals, 6) + c.Check(err, check.IsNil) + c.Check(err2, check.IsNil) +} + +func (s *Suite) TestWriteReadClose(c *check.C) { + done := make(chan bool, 2) + b := NewBuffer(nil) + n, err := b.Write([]byte("foobar")) + c.Check(n, check.Equals, 6) + c.Check(err, check.IsNil) + r1 := b.NewReader() + r2 := b.NewReader() + go s.checkReader(c, r1, []byte("foobar"), nil, done) + go s.checkReader(c, r2, []byte("foobar"), nil, done) + time.Sleep(time.Millisecond) + c.Check(len(done), check.Equals, 0) + b.Close() + <-done + <-done +} + +func (s *Suite) TestPrefillWriteCloseRead(c *check.C) { + done := make(chan bool, 2) + b := NewBuffer([]byte("baz")) + n, err := b.Write([]byte("waz")) + c.Check(n, check.Equals, 3) + c.Check(err, check.IsNil) + b.Close() + r1 := b.NewReader() + go s.checkReader(c, r1, []byte("bazwaz"), nil, done) + r2 := b.NewReader() + go s.checkReader(c, r2, []byte("bazwaz"), nil, done) + <-done + <-done +} + +func (s *Suite) TestWriteReadCloseRead(c *check.C) { + done := make(chan bool, 1) + b := NewBuffer(nil) + r1 := b.NewReader() + go s.checkReader(c, r1, []byte("bazwazqux"), nil, done) + + b.Write([]byte("bazwaz")) + + r2 := b.NewReader() + r2.Read(make([]byte, 3)) + + b.Write([]byte("qux")) + b.Close() + + s.checkReader(c, r2, []byte("wazqux"), nil, nil) + <-done +} + +func (s *Suite) TestReadAtEOF(c *check.C) { + buf := make([]byte, 8) + + b := NewBuffer([]byte{1, 2, 3}) + + r := b.NewReader() + n, err := r.Read(buf) + c.Check(n, check.Equals, 3) + c.Check(err, check.IsNil) + + // Reading zero bytes at EOF, but before Close(), doesn't + // block or error + done := make(chan bool) + go func() { + defer close(done) + n, err = r.Read(buf[:0]) + c.Check(n, check.Equals, 0) + c.Check(err, check.IsNil) + }() + select { + case <-done: + case <-time.After(time.Second): + c.Error("timeout") + } + + b.Close() + + // Reading zero bytes after Close() returns EOF + n, err = r.Read(buf[:0]) + c.Check(n, check.Equals, 0) + c.Check(err, check.Equals, io.EOF) + + // Reading from start after Close() returns 3 bytes, then EOF + r = b.NewReader() + n, err = r.Read(buf) + c.Check(n, check.Equals, 3) + if err != nil { + c.Check(err, check.Equals, io.EOF) + } + n, err = r.Read(buf[:0]) + c.Check(n, check.Equals, 0) + c.Check(err, check.Equals, io.EOF) + n, err = r.Read(buf) + c.Check(n, check.Equals, 0) + c.Check(err, check.Equals, io.EOF) +} + +func (s *Suite) TestCloseWithError(c *check.C) { + errFake := errors.New("it's not even a real error") + + done := make(chan bool, 1) + b := NewBuffer(nil) + r1 := b.NewReader() + go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done) + + b.Write([]byte("bazwaz")) + + r2 := b.NewReader() + r2.Read(make([]byte, 3)) + + b.Write([]byte("qux")) + b.CloseWithError(errFake) + + s.checkReader(c, r2, []byte("wazqux"), errFake, nil) + <-done +} + +// Write n*n bytes, n at a time; read them into n goroutines using +// varying buffer sizes; compare checksums. +func (s *Suite) TestManyReaders(c *check.C) { + const n = 256 + + b := NewBuffer(nil) + + expectSum := make(chan []byte) + go func() { + hash := md5.New() + buf := make([]byte, n) + for i := 0; i < n; i++ { + time.Sleep(10 * time.Nanosecond) + rand.Read(buf) + b.Write(buf) + hash.Write(buf) + } + expectSum <- hash.Sum(nil) + b.Close() + }() + + gotSum := make(chan []byte) + for i := 0; i < n; i++ { + go func(bufSize int) { + got := md5.New() + io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize)) + gotSum <- got.Sum(nil) + }(i + n/2) + } + + expect := <-expectSum + for i := 0; i < n; i++ { + c.Check(expect, check.DeepEquals, <-gotSum) + } +} + +func (s *Suite) BenchmarkOneReader(c *check.C) { + s.benchmarkReaders(c, 1) +} + +func (s *Suite) BenchmarkManyReaders(c *check.C) { + s.benchmarkReaders(c, 100) +} + +func (s *Suite) benchmarkReaders(c *check.C, readers int) { + var n int64 + t0 := time.Now() + + buf := make([]byte, 10000) + rand.Read(buf) + for i := 0; i < 10; i++ { + b := NewBuffer(nil) + go func() { + for i := 0; i < c.N; i++ { + b.Write(buf) + } + b.Close() + }() + + var wg sync.WaitGroup + for i := 0; i < readers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + nn, _ := io.Copy(ioutil.Discard, b.NewReader()) + atomic.AddInt64(&n, int64(nn)) + }() + } + wg.Wait() + } + c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000) +} + +func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) { + buf, err := ioutil.ReadAll(r) + c.Check(err, check.Equals, expectError) + c.Check(buf, check.DeepEquals, expectData) + if done != nil { + done <- true + } +} + +// Gocheck boilerplate +func Test(t *testing.T) { + check.TestingT(t) +} diff --git a/sdk/go/keepclient/hashcheck.go b/sdk/go/keepclient/hashcheck.go index 726b81362c..9295c14cc2 100644 --- a/sdk/go/keepclient/hashcheck.go +++ b/sdk/go/keepclient/hashcheck.go @@ -72,19 +72,16 @@ func (this HashCheckingReader) Close() (err error) { _, err = io.Copy(this.Hash, this.Reader) if closer, ok := this.Reader.(io.Closer); ok { - err2 := closer.Close() - if err2 != nil && err == nil { - return err2 + closeErr := closer.Close() + if err == nil { + err = closeErr } } if err != nil { return err } - - sum := this.Hash.Sum(nil) - if fmt.Sprintf("%x", sum) != this.Check { - err = BadChecksum + if fmt.Sprintf("%x", this.Hash.Sum(nil)) != this.Check { + return BadChecksum } - - return err + return nil } diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index cbfad8177d..37d651e31f 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -21,7 +21,7 @@ import ( "time" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/streamer" + "git.curoverse.com/arvados.git/sdk/go/asyncbuf" ) // A Keep "block" is 64MB. @@ -156,10 +156,12 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, bufsize = BLOCKSIZE } - t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash}) - defer t.Close() - - return kc.putReplicas(hash, t, dataBytes) + buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize)) + go func() { + _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash}) + buf.CloseWithError(err) + }() + return kc.putReplicas(hash, buf.NewReader, dataBytes) } // PutHB writes a block to Keep. The hash of the bytes is given in @@ -167,9 +169,8 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, // // Return values are the same as for PutHR. func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) { - t := streamer.AsyncStreamFromSlice(buf) - defer t.Close() - return kc.putReplicas(hash, t, int64(len(buf))) + newReader := func() io.Reader { return bytes.NewBuffer(buf) } + return kc.putReplicas(hash, newReader, int64(len(buf))) } // PutB writes a block to Keep. It computes the hash itself. diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index 3ce4e7425a..055141cbe8 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -5,6 +5,7 @@ package keepclient import ( + "bytes" "crypto/md5" "errors" "fmt" @@ -20,7 +21,6 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/arvadostest" - "git.curoverse.com/arvados.git/sdk/go/streamer" . "gopkg.in/check.v1" ) @@ -172,18 +172,8 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { make(chan string)} UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan uploadStatus) { - - tr := streamer.AsyncStreamFromReader(512, reader) - defer tr.Close() - - br1 := tr.MakeStreamReader() - - go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0) - - writer.Write([]byte("foo")) - writer.Close() + func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, 0) <-st.handled diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go index 49ef543d87..37912506a2 100644 --- a/sdk/go/keepclient/support.go +++ b/sdk/go/keepclient/support.go @@ -17,7 +17,6 @@ import ( "strings" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/streamer" ) // Function used to emit debug messages. The easiest way to enable @@ -57,7 +56,7 @@ type uploadStatus struct { response string } -func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser, +func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader, upload_status chan<- uploadStatus, expectedLength int64, requestID int32) { var req *http.Request @@ -66,21 +65,16 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea if req, err = http.NewRequest("PUT", url, nil); err != nil { DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error()) upload_status <- uploadStatus{err, url, 0, 0, ""} - body.Close() return } req.ContentLength = expectedLength if expectedLength > 0 { - // Do() will close the body ReadCloser when it is done - // with it. - req.Body = body + req.Body = ioutil.NopCloser(body) } else { - // "For client requests, a value of 0 means unknown if Body is - // not nil." In this case we do want the body to be empty, so - // don't set req.Body. However, we still need to close the - // body ReadCloser. - body.Close() + // "For client requests, a value of 0 means unknown if + // Body is not nil." In this case we do want the body + // to be empty, so don't set req.Body. } req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) @@ -121,7 +115,7 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea func (this *KeepClient) putReplicas( hash string, - tr *streamer.AsyncStream, + getReader func() io.Reader, expectedLength int64) (locator string, replicas int, err error) { // Generate an arbitrary ID to identify this specific @@ -174,7 +168,7 @@ func (this *KeepClient) putReplicas( // Start some upload requests if next_server < len(sv) { DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server]) - go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID) + go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID) next_server += 1 active += 1 } else { diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go deleted file mode 100644 index 396e311038..0000000000 --- a/sdk/go/streamer/streamer.go +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: Apache-2.0 - -/* AsyncStream pulls data in from a io.Reader source (such as a file or network -socket) and fans out to any number of StreamReader sinks. - -Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at -any point in the lifetime of the AsyncStream, and each StreamReader will read -the contents of the buffer up to the "frontier" of the buffer, at which point -the StreamReader blocks until new data is read from the source. - -This is useful for minimizing readthrough latency as sinks can read and act on -data from the source without waiting for the source to be completely buffered. -It is also useful as a cache in situations where re-reading the original source -potentially is costly, since the buffer retains a copy of the source data. - -Usage: - -Begin reading into a buffer with maximum size 'buffersize' from 'source': - stream := AsyncStreamFromReader(buffersize, source) - -To create a new reader (this can be called multiple times, each reader starts -at the beginning of the buffer): - reader := tr.MakeStreamReader() - -Make sure to close the reader when you're done with it. - reader.Close() - -When you're done with the stream: - stream.Close() - -Alternately, if you already have a filled buffer and just want to read out from it: - stream := AsyncStreamFromSlice(buf) - - r := tr.MakeStreamReader() - -*/ - -package streamer - -import ( - "errors" - "io" -) - -var ErrAlreadyClosed = errors.New("cannot close a stream twice") - -type AsyncStream struct { - buffer []byte - requests chan sliceRequest - add_reader chan bool - subtract_reader chan bool - wait_zero_readers chan bool - closed bool -} - -// Reads from the buffer managed by the Transfer() -type StreamReader struct { - offset int - stream *AsyncStream - responses chan sliceResult -} - -func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { - t := &AsyncStream{ - buffer: make([]byte, buffersize), - requests: make(chan sliceRequest), - add_reader: make(chan bool), - subtract_reader: make(chan bool), - wait_zero_readers: make(chan bool), - } - - go t.transfer(source) - go t.readersMonitor() - - return t -} - -func AsyncStreamFromSlice(buf []byte) *AsyncStream { - t := &AsyncStream{ - buffer: buf, - requests: make(chan sliceRequest), - add_reader: make(chan bool), - subtract_reader: make(chan bool), - wait_zero_readers: make(chan bool), - } - - go t.transfer(nil) - go t.readersMonitor() - - return t -} - -func (this *AsyncStream) MakeStreamReader() *StreamReader { - this.add_reader <- true - return &StreamReader{0, this, make(chan sliceResult)} -} - -// Reads from the buffer managed by the Transfer() -func (this *StreamReader) Read(p []byte) (n int, err error) { - this.stream.requests <- sliceRequest{this.offset, len(p), this.responses} - rr, valid := <-this.responses - if valid { - this.offset += len(rr.slice) - return copy(p, rr.slice), rr.err - } else { - return 0, io.ErrUnexpectedEOF - } -} - -func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) { - // Record starting offset in order to correctly report the number of bytes sent - starting_offset := this.offset - for { - this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses} - rr, valid := <-this.responses - if valid { - this.offset += len(rr.slice) - if rr.err != nil { - if rr.err == io.EOF { - // EOF is not an error. - return int64(this.offset - starting_offset), nil - } else { - return int64(this.offset - starting_offset), rr.err - } - } else { - dest.Write(rr.slice) - } - } else { - return int64(this.offset), io.ErrUnexpectedEOF - } - } -} - -// Close the responses channel -func (this *StreamReader) Close() error { - if this.stream == nil { - return ErrAlreadyClosed - } - this.stream.subtract_reader <- true - close(this.responses) - this.stream = nil - return nil -} - -func (this *AsyncStream) Close() error { - if this.closed { - return ErrAlreadyClosed - } - this.closed = true - this.wait_zero_readers <- true - close(this.requests) - close(this.add_reader) - close(this.subtract_reader) - close(this.wait_zero_readers) - return nil -} diff --git a/sdk/go/streamer/streamer_test.go b/sdk/go/streamer/streamer_test.go deleted file mode 100644 index f8ddbf5a4c..0000000000 --- a/sdk/go/streamer/streamer_test.go +++ /dev/null @@ -1,381 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: Apache-2.0 - -package streamer - -import ( - . "gopkg.in/check.v1" - "io" - "testing" - "time" -) - -// Gocheck boilerplate -func Test(t *testing.T) { TestingT(t) } - -var _ = Suite(&StandaloneSuite{}) - -// Standalone tests -type StandaloneSuite struct{} - -func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { - ReadIntoBufferHelper(c, 225) - ReadIntoBufferHelper(c, 224) -} - -func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } -} - -func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 96) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 96; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 96) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } -} - -func ReadIntoBufferHelper(c *C, bufsize int) { - buffer := make([]byte, bufsize) - - reader, writer := io.Pipe() - slices := make(chan nextSlice) - - go readIntoBuffer(buffer, reader, slices) - - HelperWrite128andCheck(c, buffer, writer, slices) - HelperWrite96andCheck(c, buffer, writer, slices) - - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.EOF) -} - -func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) { - buffer := make([]byte, 223) - reader, writer := io.Pipe() - slices := make(chan nextSlice) - - go readIntoBuffer(buffer, reader, slices) - - HelperWrite128andCheck(c, buffer, writer, slices) - - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - - // Write will deadlock because it can't write all the data, so - // spin it off to a goroutine - go writer.Write(out) - s1 := <-slices - - c.Check(len(s1.slice), Equals, 95) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 95; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 95) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - - writer.Close() - s1 = <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.ErrShortBuffer) -} - -func (s *StandaloneSuite) TestTransfer(c *C) { - reader, writer := io.Pipe() - - tr := AsyncStreamFromReader(512, reader) - - br1 := tr.MakeStreamReader() - out := make([]byte, 128) - - { - // Write some data, and read into a buffer shorter than - // available data - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - - writer.Write(out[:100]) - - in := make([]byte, 64) - n, err := br1.Read(in) - - c.Check(n, Equals, 64) - c.Check(err, Equals, nil) - - for i := 0; i < 64; i += 1 { - c.Check(in[i], Equals, out[i]) - } - } - - { - // Write some more data, and read into buffer longer than - // available data - in := make([]byte, 64) - n, err := br1.Read(in) - c.Check(n, Equals, 36) - c.Check(err, Equals, nil) - - for i := 0; i < 36; i += 1 { - c.Check(in[i], Equals, out[64+i]) - } - - } - - { - // Test read before write - type Rd struct { - n int - err error - } - rd := make(chan Rd) - in := make([]byte, 64) - - go func() { - n, err := br1.Read(in) - rd <- Rd{n, err} - }() - - time.Sleep(100 * time.Millisecond) - writer.Write(out[100:]) - - got := <-rd - - c.Check(got.n, Equals, 28) - c.Check(got.err, Equals, nil) - - for i := 0; i < 28; i += 1 { - c.Check(in[i], Equals, out[100+i]) - } - } - - br2 := tr.MakeStreamReader() - { - // Test 'catch up' reader - in := make([]byte, 256) - n, err := br2.Read(in) - - c.Check(n, Equals, 128) - c.Check(err, Equals, nil) - - for i := 0; i < 128; i += 1 { - c.Check(in[i], Equals, out[i]) - } - } - - { - // Test closing the reader - writer.Close() - - in := make([]byte, 256) - n1, err1 := br1.Read(in) - n2, err2 := br2.Read(in) - c.Check(n1, Equals, 0) - c.Check(err1, Equals, io.EOF) - c.Check(n2, Equals, 0) - c.Check(err2, Equals, io.EOF) - } - - { - // Test 'catch up' reader after closing - br3 := tr.MakeStreamReader() - in := make([]byte, 256) - n, err := br3.Read(in) - - c.Check(n, Equals, 128) - c.Check(err, Equals, nil) - - for i := 0; i < 128; i += 1 { - c.Check(in[i], Equals, out[i]) - } - - n, err = br3.Read(in) - - c.Check(n, Equals, 0) - c.Check(err, Equals, io.EOF) - } -} - -func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { - reader, writer := io.Pipe() - - tr := AsyncStreamFromReader(100, reader) - defer tr.Close() - - sr := tr.MakeStreamReader() - defer sr.Close() - - out := make([]byte, 101) - go writer.Write(out) - - n, err := sr.Read(out) - c.Check(n, Equals, 100) - c.Check(err, IsNil) - - n, err = sr.Read(out) - c.Check(n, Equals, 0) - c.Check(err, Equals, io.ErrShortBuffer) -} - -func (s *StandaloneSuite) TestTransferFromBuffer(c *C) { - // Buffer for reads from 'r' - buffer := make([]byte, 100) - for i := 0; i < 100; i += 1 { - buffer[i] = byte(i) - } - - tr := AsyncStreamFromSlice(buffer) - - br1 := tr.MakeStreamReader() - - in := make([]byte, 64) - { - n, err := br1.Read(in) - - c.Check(n, Equals, 64) - c.Check(err, Equals, nil) - - for i := 0; i < 64; i += 1 { - c.Check(in[i], Equals, buffer[i]) - } - } - { - n, err := br1.Read(in) - - c.Check(n, Equals, 36) - c.Check(err, Equals, nil) - - for i := 0; i < 36; i += 1 { - c.Check(in[i], Equals, buffer[64+i]) - } - } - { - n, err := br1.Read(in) - - c.Check(n, Equals, 0) - c.Check(err, Equals, io.EOF) - } -} - -func (s *StandaloneSuite) TestTransferIoCopy(c *C) { - // Buffer for reads from 'r' - buffer := make([]byte, 100) - for i := 0; i < 100; i += 1 { - buffer[i] = byte(i) - } - - tr := AsyncStreamFromSlice(buffer) - defer tr.Close() - - br1 := tr.MakeStreamReader() - defer br1.Close() - - reader, writer := io.Pipe() - - go func() { - p := make([]byte, 100) - n, err := reader.Read(p) - c.Check(n, Equals, 100) - c.Check(err, Equals, nil) - c.Check(p, DeepEquals, buffer) - }() - - io.Copy(writer, br1) -} - -func (s *StandaloneSuite) TestManyReaders(c *C) { - reader, writer := io.Pipe() - - tr := AsyncStreamFromReader(512, reader) - defer tr.Close() - - sr := tr.MakeStreamReader() - go func() { - time.Sleep(100 * time.Millisecond) - sr.Close() - }() - - for i := 0; i < 200; i += 1 { - go func() { - br1 := tr.MakeStreamReader() - defer br1.Close() - - p := make([]byte, 3) - n, err := br1.Read(p) - c.Check(n, Equals, 3) - c.Check(p[0:3], DeepEquals, []byte("foo")) - - n, err = br1.Read(p) - c.Check(n, Equals, 3) - c.Check(p[0:3], DeepEquals, []byte("bar")) - - n, err = br1.Read(p) - c.Check(n, Equals, 3) - c.Check(p[0:3], DeepEquals, []byte("baz")) - - n, err = br1.Read(p) - c.Check(n, Equals, 0) - c.Check(err, Equals, io.EOF) - }() - } - - writer.Write([]byte("foo")) - writer.Write([]byte("bar")) - writer.Write([]byte("baz")) - writer.Close() -} - -func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) { - buffer := make([]byte, 100) - tr := AsyncStreamFromSlice(buffer) - sr := tr.MakeStreamReader() - c.Check(sr.Close(), IsNil) - c.Check(sr.Close(), Equals, ErrAlreadyClosed) - c.Check(tr.Close(), IsNil) - c.Check(tr.Close(), Equals, ErrAlreadyClosed) -} diff --git a/sdk/go/streamer/transfer.go b/sdk/go/streamer/transfer.go deleted file mode 100644 index bea27f8f81..0000000000 --- a/sdk/go/streamer/transfer.go +++ /dev/null @@ -1,310 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: Apache-2.0 - -/* Internal implementation of AsyncStream. -Outline of operation: - -The kernel is the transfer() goroutine. It manages concurrent reads and -appends to the "body" slice. "body" is a slice of "source_buffer" that -represents the segment of the buffer that is already filled in and available -for reading. - -To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read -from the io.Reader source directly into source_buffer. Each read goes into a -slice of buffer which spans the section immediately following the end of the -current "body". Each time a Read completes, a slice representing the the -section just filled in (or any read errors/EOF) is sent over the "slices" -channel back to the transfer() function. - -Meanwhile, the transfer() function selects() on two channels, the "requests" -channel and the "slices" channel. - -When a message is received on the "slices" channel, this means the a new -section of the buffer has data, or an error is signaled. Since the data has -been read directly into the source_buffer, it is able to simply increases the -size of the body slice to encompass the newly filled in section. Then any -pending reads are serviced with handleReadRequest (described below). - -When a message is received on the "requests" channel, it means a StreamReader -wants access to a slice of the buffer. This is passed to handleReadRequest(). - -The handleReadRequest() function takes a sliceRequest consisting of a buffer -offset, maximum size, and channel to send the response. If there was an error -reported from the source reader, it is returned. If the offset is less than -the size of the body, the request can proceed, and it sends a body slice -spanning the segment from offset to min(offset+maxsize, end of the body). If -source reader status is EOF (done filling the buffer) and the read request -offset is beyond end of the body, it responds with EOF. Otherwise, the read -request is for a slice beyond the current size of "body" but we expect the body -to expand as more data is added, so the request gets added to a wait list. - -The transfer() runs until the requests channel is closed by AsyncStream.Close() - -To track readers, streamer uses the readersMonitor() goroutine. This goroutine -chooses which channels to receive from based on the number of outstanding -readers. When a new reader is created, it sends a message on the add_reader -channel. If the number of readers is already at MAX_READERS, this blocks the -sender until an existing reader is closed. When a reader is closed, it sends a -message on the subtract_reader channel. Finally, when AsyncStream.Close() is -called, it sends a message on the wait_zero_readers channel, which will block -the sender unless there are zero readers and it is safe to shut down the -AsyncStream. -*/ - -package streamer - -import ( - "io" -) - -const MAX_READERS = 100 - -// A slice passed from readIntoBuffer() to transfer() -type nextSlice struct { - slice []byte - reader_error error -} - -// A read request to the Transfer() function -type sliceRequest struct { - offset int - maxsize int - result chan<- sliceResult -} - -// A read result from the Transfer() function -type sliceResult struct { - slice []byte - err error -} - -// Supports writing into a buffer -type bufferWriter struct { - buf []byte - ptr int -} - -// Copy p into this.buf, increment pointer and return number of bytes read. -func (this *bufferWriter) Write(p []byte) (n int, err error) { - n = copy(this.buf[this.ptr:], p) - this.ptr += n - return n, nil -} - -// Read repeatedly from the reader and write sequentially into the specified -// buffer, and report each read to channel 'c'. Completes when Reader 'r' -// reports on the error channel and closes channel 'c'. -func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) { - defer close(slices) - - if writeto, ok := r.(io.WriterTo); ok { - n, err := writeto.WriteTo(&bufferWriter{buffer, 0}) - if err != nil { - slices <- nextSlice{nil, err} - } else { - slices <- nextSlice{buffer[:n], nil} - slices <- nextSlice{nil, io.EOF} - } - return - } else { - // Initially entire buffer is available - ptr := buffer[:] - for { - var n int - var err error - if len(ptr) > 0 { - const readblock = 64 * 1024 - // Read 64KiB into the next part of the buffer - if len(ptr) > readblock { - n, err = r.Read(ptr[:readblock]) - } else { - n, err = r.Read(ptr) - } - } else { - // Ran out of buffer space, try reading one more byte - var b [1]byte - n, err = r.Read(b[:]) - - if n > 0 { - // Reader has more data but we have nowhere to - // put it, so we're stuffed - slices <- nextSlice{nil, io.ErrShortBuffer} - } else { - // Return some other error (hopefully EOF) - slices <- nextSlice{nil, err} - } - return - } - - // End on error (includes EOF) - if err != nil { - slices <- nextSlice{nil, err} - return - } - - if n > 0 { - // Make a slice with the contents of the read - slices <- nextSlice{ptr[:n], nil} - - // Adjust the scratch space slice - ptr = ptr[n:] - } - } - } -} - -// Handle a read request. Returns true if a response was sent, and false if -// the request should be queued. -func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool { - if (reader_status != nil) && (reader_status != io.EOF) { - req.result <- sliceResult{nil, reader_status} - return true - } else if req.offset < len(body) { - var end int - if req.offset+req.maxsize < len(body) { - end = req.offset + req.maxsize - } else { - end = len(body) - } - req.result <- sliceResult{body[req.offset:end], nil} - return true - } else if (reader_status == io.EOF) && (req.offset >= len(body)) { - req.result <- sliceResult{nil, io.EOF} - return true - } else { - return false - } -} - -// Mediates between reads and appends. -// If 'source_reader' is not nil, reads data from 'source_reader' and stores it -// in the provided buffer. Otherwise, use the contents of 'buffer' as is. -// Accepts read requests on the buffer on the 'requests' channel. Completes -// when 'requests' channel is closed. -func (this *AsyncStream) transfer(source_reader io.Reader) { - source_buffer := this.buffer - requests := this.requests - - // currently buffered data - var body []byte - - // for receiving slices from readIntoBuffer - var slices chan nextSlice = nil - - // indicates the status of the underlying reader - var reader_status error = nil - - if source_reader != nil { - // 'body' is the buffer slice representing the body content read so far - body = source_buffer[:0] - - // used to communicate slices of the buffer as they are - // readIntoBuffer will close 'slices' when it is done with it - slices = make(chan nextSlice) - - // Spin it off - go readIntoBuffer(source_buffer, source_reader, slices) - } else { - // use the whole buffer - body = source_buffer[:] - - // buffer is complete - reader_status = io.EOF - } - - pending_requests := make([]sliceRequest, 0) - - for { - select { - case req, valid := <-requests: - // Handle a buffer read request - if valid { - if !handleReadRequest(req, body, reader_status) { - pending_requests = append(pending_requests, req) - } - } else { - // closed 'requests' channel indicates we're done - return - } - - case bk, valid := <-slices: - // Got a new slice from the reader - if valid { - reader_status = bk.reader_error - - if bk.slice != nil { - // adjust body bounds now that another slice has been read - body = source_buffer[0 : len(body)+len(bk.slice)] - } - - // handle pending reads - n := 0 - for n < len(pending_requests) { - if handleReadRequest(pending_requests[n], body, reader_status) { - // move the element from the back of the slice to - // position 'n', then shorten the slice by one element - pending_requests[n] = pending_requests[len(pending_requests)-1] - pending_requests = pending_requests[0 : len(pending_requests)-1] - } else { - - // Request wasn't handled, so keep it in the request slice - n += 1 - } - } - } else { - if reader_status == nil { - // slices channel closed without signaling EOF - reader_status = io.ErrUnexpectedEOF - } - slices = nil - } - } - } -} - -func (this *AsyncStream) readersMonitor() { - var readers int = 0 - - for { - if readers == 0 { - select { - case _, ok := <-this.wait_zero_readers: - if ok { - // nothing, just implicitly unblock the sender - } else { - return - } - case _, ok := <-this.add_reader: - if ok { - readers += 1 - } else { - return - } - } - } else if readers > 0 && readers < MAX_READERS { - select { - case _, ok := <-this.add_reader: - if ok { - readers += 1 - } else { - return - } - - case _, ok := <-this.subtract_reader: - if ok { - readers -= 1 - } else { - return - } - } - } else if readers == MAX_READERS { - _, ok := <-this.subtract_reader - if ok { - readers -= 1 - } else { - return - } - } - } -} diff --git a/services/api/app/controllers/application_controller.rb b/services/api/app/controllers/application_controller.rb index 9826cf2f90..6bdba7af89 100644 --- a/services/api/app/controllers/application_controller.rb +++ b/services/api/app/controllers/application_controller.rb @@ -365,7 +365,7 @@ class ApplicationController < ActionController::Base end def require_auth_scope - if @read_auths.empty? + unless current_user && @read_auths.any? { |auth| auth.user.andand.uuid == current_user.uuid } if require_login != false send_error("Forbidden", status: 403) end diff --git a/services/api/app/middlewares/arvados_api_token.rb b/services/api/app/middlewares/arvados_api_token.rb index 5eb756b9fa..6a37631827 100644 --- a/services/api/app/middlewares/arvados_api_token.rb +++ b/services/api/app/middlewares/arvados_api_token.rb @@ -32,21 +32,33 @@ class ArvadosApiToken user = nil api_client = nil api_client_auth = nil - supplied_token = - params["api_token"] || - params["oauth_token"] || - env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1] - if supplied_token - api_client_auth = ApiClientAuthorization. + if request.get? || params["_method"] == 'GET' + reader_tokens = params["reader_tokens"] + if reader_tokens.is_a? String + reader_tokens = SafeJSON.load(reader_tokens) + end + else + reader_tokens = nil + end + + # Set current_user etc. based on the primary session token if a + # valid one is present. Otherwise, use the first valid token in + # reader_tokens. + [params["api_token"], + params["oauth_token"], + env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1], + *reader_tokens, + ].each do |supplied| + next if !supplied + try_auth = ApiClientAuthorization. includes(:api_client, :user). - where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token). + where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied). first - if api_client_auth.andand.user + if try_auth.andand.user + api_client_auth = try_auth user = api_client_auth.user api_client = api_client_auth.api_client - else - # Token seems valid, but points to a non-existent (deleted?) user. - api_client_auth = nil + break end end Thread.current[:api_client_ip_address] = remote_ip diff --git a/services/api/test/integration/api_client_authorizations_scopes_test.rb b/services/api/test/integration/api_client_authorizations_scopes_test.rb index dba801920c..dfb57496a7 100644 --- a/services/api/test/integration/api_client_authorizations_scopes_test.rb +++ b/services/api/test/integration/api_client_authorizations_scopes_test.rb @@ -27,6 +27,20 @@ class ApiTokensScopeTest < ActionDispatch::IntegrationTest assert_response 403 end + test "narrow + wide scoped tokens for different users" do + get_args = [{ + reader_tokens: [api_client_authorizations(:anonymous).api_token] + }, auth(:active_userlist)] + get(v1_url('users'), *get_args) + assert_response :success + get(v1_url('users', ''), *get_args) # Add trailing slash. + assert_response :success + get(v1_url('users', 'current'), *get_args) + assert_response 403 + get(v1_url('virtual_machines'), *get_args) + assert_response 403 + end + test "specimens token can see exactly owned specimens" do get_args = [{}, auth(:active_specimens)] get(v1_url('specimens'), *get_args) diff --git a/services/api/test/integration/reader_tokens_test.rb b/services/api/test/integration/reader_tokens_test.rb index dd59f74eb4..a60be093a3 100644 --- a/services/api/test/integration/reader_tokens_test.rb +++ b/services/api/test/integration/reader_tokens_test.rb @@ -50,22 +50,15 @@ class ReaderTokensTest < ActionDispatch::IntegrationTest [nil, :active_noscope].each do |main_auth| [:spectator, :spectator_specimens].each do |read_auth| - test "#{main_auth} auth with reader token #{read_auth} can read" do - assert_includes(get_specimen_uuids(main_auth, read_auth), - spectator_specimen, "did not find spectator specimen") - end - - test "#{main_auth} auth with JSON read token #{read_auth} can read" do - assert_includes(get_specimen_uuids(main_auth, read_auth, :to_json), - spectator_specimen, "did not find spectator specimen") - end - - test "#{main_auth} auth with reader token #{read_auth} can't write" do - assert_post_denied(main_auth, read_auth) - end + [:to_a, :to_json].each do |formatter| + test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can#{"'t" if main_auth} read" do + get_specimens(main_auth, read_auth) + assert_response(if main_auth then 403 else 200 end) + end - test "#{main_auth} auth with JSON read token #{read_auth} can't write" do - assert_post_denied(main_auth, read_auth, :to_json) + test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can't write" do + assert_post_denied(main_auth, read_auth, formatter) + end end end end diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go index 25619bbb91..23bb2bd216 100644 --- a/services/keepproxy/keepproxy_test.go +++ b/services/keepproxy/keepproxy_test.go @@ -11,10 +11,12 @@ import ( "fmt" "io/ioutil" "log" + "math/rand" "net/http" "net/http/httptest" "os" "strings" + "sync" "testing" "time" @@ -216,6 +218,33 @@ func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) { } } +func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) { + kc := runProxy(c, nil, false) + defer closeListener() + router.(*proxyHandler).timeout = time.Nanosecond + + buf := make([]byte, 1<<20) + rand.Read(buf) + var wg sync.WaitGroup + for i := 0; i < 128; i++ { + wg.Add(1) + go func() { + defer wg.Done() + kc.PutB(buf) + }() + } + done := make(chan bool) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(10 * time.Second): + c.Error("timeout") + } +} + func (s *ServerRequiredSuite) TestPutAskGet(c *C) { kc := runProxy(c, nil, false) defer closeListener() diff --git a/services/nodemanager/arvnodeman/test/fake_driver.py b/services/nodemanager/arvnodeman/test/fake_driver.py index c8558a417a..5d03308121 100644 --- a/services/nodemanager/arvnodeman/test/fake_driver.py +++ b/services/nodemanager/arvnodeman/test/fake_driver.py @@ -130,7 +130,7 @@ class RetryDriver(FakeDriver): create_calls += 1 if create_calls < 2: raise RateLimitReachedError(429, "Rate limit exceeded", - retry_after=12) + headers={'retry-after': '12'}) elif create_calls < 3: raise BaseHTTPError(429, "Rate limit exceeded", {'retry-after': '2'}) diff --git a/services/nodemanager/setup.py b/services/nodemanager/setup.py index f3764fc8e9..6382dcb727 100644 --- a/services/nodemanager/setup.py +++ b/services/nodemanager/setup.py @@ -40,14 +40,14 @@ setup(name='arvados-node-manager', 'setuptools' ], dependency_links=[ - "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev2.zip" + "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev3.zip" ], test_suite='tests', tests_require=[ 'requests', 'pbr<1.7.0', 'mock>=1.0', - 'apache-libcloud==2.2.2.dev2', + 'apache-libcloud==2.2.2.dev3', ], zip_safe=False, cmdclass={'egg_info': tagger}, diff --git a/tools/arvbox/bin/arvbox b/tools/arvbox/bin/arvbox index 6d535eaed4..f4a65da537 100755 --- a/tools/arvbox/bin/arvbox +++ b/tools/arvbox/bin/arvbox @@ -46,6 +46,10 @@ if test -z "$SSO_ROOT" ; then SSO_ROOT="$ARVBOX_DATA/sso-devise-omniauth-provider" fi +if test -z "$COMPOSER_ROOT" ; then + COMPOSER_ROOT="$ARVBOX_DATA/composer" +fi + PG_DATA="$ARVBOX_DATA/postgres" VAR_DATA="$ARVBOX_DATA/var" PASSENGER="$ARVBOX_DATA/passenger" @@ -193,6 +197,9 @@ run() { if ! test -d "$SSO_ROOT" ; then git clone https://github.com/curoverse/sso-devise-omniauth-provider.git "$SSO_ROOT" fi + if ! test -d "$COMPOSER_ROOT" ; then + git clone https://github.com/curoverse/composer.git "$COMPOSER_ROOT" + fi if test "$CONFIG" = test ; then @@ -205,6 +212,7 @@ run() { --privileged \ "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \ "--volume=$SSO_ROOT:/usr/src/sso:rw" \ + "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \ "--volume=$PG_DATA:/var/lib/postgresql:rw" \ "--volume=$VAR_DATA:/var/lib/arvados:rw" \ "--volume=$PASSENGER:/var/lib/passenger:rw" \ @@ -246,6 +254,7 @@ run() { --privileged \ "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \ "--volume=$SSO_ROOT:/usr/src/sso:rw" \ + "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \ "--volume=$PG_DATA:/var/lib/postgresql:rw" \ "--volume=$VAR_DATA:/var/lib/arvados:rw" \ "--volume=$PASSENGER:/var/lib/passenger:rw" \ diff --git a/tools/arvbox/lib/arvbox/docker/Dockerfile.base b/tools/arvbox/lib/arvbox/docker/Dockerfile.base index a12c1070f1..cf4608733d 100644 --- a/tools/arvbox/lib/arvbox/docker/Dockerfile.base +++ b/tools/arvbox/lib/arvbox/docker/Dockerfile.base @@ -18,7 +18,8 @@ RUN apt-get update && \ libjson-perl nginx gitolite3 lsof libreadline-dev \ apt-transport-https ca-certificates slurm-wlm \ linkchecker python3-virtualenv python-virtualenv xvfb iceweasel \ - libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr && \ + libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr \ + libsecret-1-dev && \ apt-get clean ENV RUBYVERSION_MINOR 2.3 diff --git a/tools/arvbox/lib/arvbox/docker/common.sh b/tools/arvbox/lib/arvbox/docker/common.sh index 62225df6ce..466ef1fcee 100644 --- a/tools/arvbox/lib/arvbox/docker/common.sh +++ b/tools/arvbox/lib/arvbox/docker/common.sh @@ -21,6 +21,7 @@ services=( [workbench]=80 [api]=8000 [sso]=8900 + [composer]=4200 [arv-git-httpd]=9001 [keep-web]=9002 [keepproxy]=25100 diff --git a/tools/arvbox/lib/arvbox/docker/createusers.sh b/tools/arvbox/lib/arvbox/docker/createusers.sh index 3296a3cd17..a4689f004a 100755 --- a/tools/arvbox/lib/arvbox/docker/createusers.sh +++ b/tools/arvbox/lib/arvbox/docker/createusers.sh @@ -19,6 +19,7 @@ if ! grep "^arvbox:" /etc/passwd >/dev/null 2>/dev/null ; then --uid $HOSTUID --gid $HOSTGID \ --non-unique \ --groups docker \ + --shell /bin/bash \ arvbox useradd --home-dir /var/lib/arvados/git --uid $HOSTUID --gid $HOSTGID --non-unique git useradd --groups docker crunch diff --git a/tools/arvbox/lib/arvbox/docker/service/composer/log/main/.gitstub b/tools/arvbox/lib/arvbox/docker/service/composer/log/main/.gitstub new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/arvbox/lib/arvbox/docker/service/composer/log/run b/tools/arvbox/lib/arvbox/docker/service/composer/log/run new file mode 120000 index 0000000000..d6aef4a77d --- /dev/null +++ b/tools/arvbox/lib/arvbox/docker/service/composer/log/run @@ -0,0 +1 @@ +/usr/local/lib/arvbox/logger \ No newline at end of file diff --git a/tools/arvbox/lib/arvbox/docker/service/composer/run b/tools/arvbox/lib/arvbox/docker/service/composer/run new file mode 100755 index 0000000000..cd2f86a27e --- /dev/null +++ b/tools/arvbox/lib/arvbox/docker/service/composer/run @@ -0,0 +1,8 @@ +#!/bin/sh +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + +set -e + +/usr/local/lib/arvbox/runsu.sh $0-service $1 diff --git a/tools/arvbox/lib/arvbox/docker/service/composer/run-service b/tools/arvbox/lib/arvbox/docker/service/composer/run-service new file mode 100755 index 0000000000..6578ea5820 --- /dev/null +++ b/tools/arvbox/lib/arvbox/docker/service/composer/run-service @@ -0,0 +1,22 @@ +#!/bin/bash +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + +exec 2>&1 +set -ex -o pipefail + +. /usr/local/lib/arvbox/common.sh + +cd /usr/src/composer + +npm install yarn + +PATH=$PATH:/usr/src/composer/node_modules/.bin + +yarn install + +if test "$1" != "--only-deps" ; then + echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/arvados-configuration.yml + exec ng serve --host 0.0.0.0 --port 4200 --env=webdev +fi diff --git a/tools/arvbox/lib/arvbox/docker/service/sso/run-service b/tools/arvbox/lib/arvbox/docker/service/sso/run-service index ee35bb916f..1dfffaf59e 100755 --- a/tools/arvbox/lib/arvbox/docker/service/sso/run-service +++ b/tools/arvbox/lib/arvbox/docker/service/sso/run-service @@ -16,7 +16,8 @@ else fi run_bundler --without=development -bundle exec passenger start --runtime-check-only --runtime-dir=/var/lib/passenger +bundle exec passenger-config build-native-support +bundle exec passenger-config install-standalone-runtime if test "$1" = "--only-deps" ; then exit @@ -90,6 +91,5 @@ if test "$1" = "--only-setup" ; then fi exec bundle exec passenger start --port=${services[sso]} \ - --runtime-dir=/var/lib/passenger \ --ssl --ssl-certificate=/var/lib/arvados/self-signed.pem \ --ssl-certificate-key=/var/lib/arvados/self-signed.key