//
// SPDX-License-Identifier: AGPL-3.0
-window.CollectionsTable = {
+window.SearchResultsTable = {
maybeLoadMore: function(dom) {
var loader = this.loader
if (loader.state != loader.READY)
},
view: function(vnode) {
var loader = vnode.attrs.loader
+ var iconsMap = {
+ collections: m('i.fa.fa-fw.fa-archive'),
+ projects: m('i.fa.fa-fw.fa-folder'),
+ }
return m('table.table.table-condensed', [
m('thead', m('tr', [
m('th'),
m('td', [
item.workbenchBaseURL() &&
m('a.btn.btn-xs.btn-default', {
- href: item.workbenchBaseURL()+'collections/'+item.uuid,
- }, 'Show'),
+ 'data-original-title': 'show '+item.objectType.description,
+ 'data-placement': 'top',
+ 'data-toggle': 'tooltip',
+ href: item.workbenchBaseURL()+'/'+item.objectType.wb_path+'/'+item.uuid,
+ // Bootstrap's tooltip feature
+ oncreate: function(vnode) { $(vnode.dom).tooltip() },
+ }, iconsMap[item.objectType.wb_path]),
]),
m('td.arvados-uuid', item.uuid),
m('td', item.name || '(unnamed)'),
},
}
-window.CollectionsSearch = {
+window.Search = {
oninit: function(vnode) {
vnode.state.sessionDB = new SessionDB()
vnode.state.searchEntered = m.stream()
var workbenchBaseURL = function() {
return vnode.state.sessionDB.workbenchBaseURL(session)
}
- return new MultipageLoader({
+ var searchable_objects = [
+ {
+ wb_path: 'projects',
+ api_path: 'arvados/v1/groups',
+ filters: [['group_class', '=', 'project']],
+ description: 'project',
+ },
+ {
+ wb_path: 'collections',
+ api_path: 'arvados/v1/collections',
+ filters: [],
+ description: 'collection',
+ },
+ ]
+ return new MergingLoader({
sessionKey: key,
- loadFunc: function(filters) {
- var tsquery = to_tsquery(q)
- if (tsquery) {
- filters = filters.slice(0)
- filters.push(['any', '@@', tsquery])
- }
- return vnode.state.sessionDB.request(session, 'arvados/v1/collections', {
- data: {
- filters: JSON.stringify(filters),
- count: 'none',
+ // For every session, search for every object type
+ children: searchable_objects.map(function(obj_type) {
+ return new MultipageLoader({
+ sessionKey: key,
+ loadFunc: function(filters) {
+ // Apply additional type dependant filters
+ filters = filters.concat(obj_type.filters)
+ var tsquery = to_tsquery(q)
+ if (tsquery) {
+ filters.push(['any', '@@', tsquery])
+ }
+ return vnode.state.sessionDB.request(session, obj_type.api_path, {
+ data: {
+ filters: JSON.stringify(filters),
+ count: 'none',
+ },
+ }).then(function(resp) {
+ resp.items.map(function(item) {
+ item.workbenchBaseURL = workbenchBaseURL
+ item.objectType = obj_type
+ })
+ return resp
+ })
},
- }).then(function(resp) {
- resp.items.map(function(item) {
- item.workbenchBaseURL = workbenchBaseURL
- })
- return resp
})
- },
+ }),
})
- })
+ }),
})
})
},
m('.row', [
m('.col-md-6', [
m('.input-group', [
- m('input#search.form-control[placeholder=Search]', {
+ m('input#search.form-control[placeholder=Search collections and projects]', {
oninput: m.withAttr('value', vnode.state.searchEntered),
value: vnode.state.searchEntered(),
}),
m('a[href="/sessions"]', 'Add/remove sites'),
]),
]),
- m(CollectionsTable, {
+ m(SearchResultsTable, {
loader: vnode.state.loader,
}),
],
return m('.container', [
m('p', [
'You can log in to multiple Arvados sites here, then use the ',
- m('a[href="/collections/multisite"]', 'multi-site search'),
- ' page to search collections on all sites at once.',
+ m('a[href="/search"]', 'multi-site search'),
+ ' page to search collections and projects on all sites at once.',
]),
m('table.table.table-condensed.table-hover', [
m('thead', m('tr', [
// the host part of apihostport is an IPv4 or [IPv6]
// address.
if (!session.baseURL.match('://(\\[|\\d+\\.\\d+\\.\\d+\\.\\d+[:/])'))
- return session.baseURL.replace('://', '://workbench.')
+ var wbUrl = session.baseURL.replace('://', '://workbench.')
+ // Remove the trailing slash, if it's there.
+ return wbUrl.slice(-1) == '/' ? wbUrl.slice(0, -1) : wbUrl
return null
},
// Return a m.stream that will get fulfilled with the
skip_around_filter(:require_thread_api_token,
only: [:show_file, :show_file_links])
skip_before_filter(:find_object_by_uuid,
- only: [:provenance, :show_file, :show_file_links, :multisite])
+ only: [:provenance, :show_file, :show_file_links])
# We depend on show_file to display the user agreement:
skip_before_filter :check_user_agreements, only: :show_file
skip_before_filter :check_user_profile, only: :show_file
# SPDX-License-Identifier: AGPL-3.0
class SearchController < ApplicationController
+ skip_before_filter :ensure_arvados_api_exists
+
def find_objects_for_index
search_what = Group
if params[:project_uuid]
<%=
target = Rails.configuration.multi_site_search
if target == true
- target = {controller: 'collections', action: 'multisite'}
+ target = {controller: 'search', action: 'index'}
end
link_to("Multi-site search", target, {class: 'btn btn-default'}) %>
</form>
SPDX-License-Identifier: AGPL-3.0 -->
-<div data-mount-mithril="CollectionsSearch"></div>
+<div data-mount-mithril="Search"></div>
post 'remove_selected_files', on: :member
get 'tags', on: :member
post 'save_tags', on: :member
- get 'multisite', on: :collection
+ get 'multisite', on: :collection, to: redirect('/search')
end
get('/collections/download/:uuid/:reader_token/*file' => 'collections#show_file',
format: false)
get 'tab_counts', on: :member
get 'public', on: :collection
end
-
+
resources :search do
get 'choose', :on => :collection
end
match '/_health/ping', to: 'healthcheck#ping', via: [:get]
get '/tests/mithril', to: 'tests#mithril'
-
+
get '/status', to: 'status#status'
-
+
# Send unroutable requests to an arbitrary controller
# (ends up at ApplicationController#render_not_found)
match '*a', to: 'links#render_not_found', via: [:get, :post]
#
# SPDX-License-Identifier: AGPL-3.0
-LIBCLOUD_PIN=2.2.2.dev2
+LIBCLOUD_PIN=2.2.2.dev3
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip build-essential unzip
-
-# Install RVM
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
- curl -L https://get.rvm.io | bash -s stable && \
- /usr/local/rvm/bin/rvm install 2.3 && \
- /usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
- /usr/local/rvm/bin/rvm-exec default gem install bundler && \
- /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
-
-# Install golang binary
-ADD generated/go1.8.3.linux-amd64.tar.gz /usr/local/
-RUN ln -s /usr/local/go/bin/go /usr/local/bin/
-
-# Install nodejs and npm
-ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
-RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
-ENV WORKSPACE /arvados
-CMD ["/usr/local/rvm/bin/rvm-exec", "default", "bash", "/jenkins/run-build-packages.sh", "--target", "ubuntu1204"]
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install RVM
-RUN apt-get update && \
- apt-get -y install --no-install-recommends curl ca-certificates g++ && \
- gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
- curl -L https://get.rvm.io | bash -s stable && \
- /usr/local/rvm/bin/rvm install 2.3 && \
- /usr/local/rvm/bin/rvm alias create default ruby-2.3
-
-# udev daemon can't start in a container, so don't try.
-RUN mkdir -p /etc/udev/disabled
-
-RUN echo "deb file:///arvados/packages/ubuntu1204/ /" >>/etc/apt/sources.list
true
else
FINAL_EXITCODE=$?
+ echo
+ echo "Build packages failed for $(basename $(dirname "$dockerfile_path"))"
+ echo
fi
done
if test $FINAL_EXITCODE != 0 ; then
+ echo
echo "Build packages failed with code $FINAL_EXITCODE" >&2
+ echo
fi
exit $FINAL_EXITCODE
sdk/go/httpserver
sdk/go/manifest
sdk/go/blockdigest
-sdk/go/streamer
+sdk/go/asyncbuf
sdk/go/stats
sdk/go/crunchrunner
sdk/cwl
sdk/go/health
sdk/go/httpserver
sdk/go/manifest
- sdk/go/streamer
+ sdk/go/asyncbuf
sdk/go/crunchrunner
sdk/go/stats
lib/crunchstat
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+ "bytes"
+ "io"
+ "sync"
+)
+
+// A Buffer is an io.Writer that distributes written data
+// asynchronously to multiple concurrent readers.
+//
+// NewReader() can be called at any time. In all cases, every returned
+// io.Reader reads all data written to the Buffer.
+//
+// Behavior is undefined if Write is called after Close or
+// CloseWithError.
+type Buffer interface {
+ io.WriteCloser
+
+ // NewReader() returns an io.Reader that reads all data
+ // written to the Buffer.
+ NewReader() io.Reader
+
+ // Close, but return the given error (instead of io.EOF) to
+ // all readers when they reach the end of the buffer.
+ //
+ // CloseWithError(nil) is equivalent to
+ // CloseWithError(io.EOF).
+ CloseWithError(error) error
+}
+
+type buffer struct {
+ data *bytes.Buffer
+ cond sync.Cond
+ err error // nil if there might be more writes
+}
+
+// NewBuffer creates a new Buffer using buf as its initial
+// contents. The new Buffer takes ownership of buf, and the caller
+// should not use buf after this call.
+func NewBuffer(buf []byte) Buffer {
+ return &buffer{
+ data: bytes.NewBuffer(buf),
+ cond: sync.Cond{L: &sync.Mutex{}},
+ }
+}
+
+func (b *buffer) Write(p []byte) (int, error) {
+ defer b.cond.Broadcast()
+ b.cond.L.Lock()
+ defer b.cond.L.Unlock()
+ if b.err != nil {
+ return 0, b.err
+ }
+ return b.data.Write(p)
+}
+
+func (b *buffer) Close() error {
+ return b.CloseWithError(nil)
+}
+
+func (b *buffer) CloseWithError(err error) error {
+ defer b.cond.Broadcast()
+ b.cond.L.Lock()
+ defer b.cond.L.Unlock()
+ if err == nil {
+ b.err = io.EOF
+ } else {
+ b.err = err
+ }
+ return nil
+}
+
+func (b *buffer) NewReader() io.Reader {
+ return &reader{b: b}
+}
+
+type reader struct {
+ b *buffer
+ read int // # bytes already read
+}
+
+func (r *reader) Read(p []byte) (int, error) {
+ r.b.cond.L.Lock()
+ for {
+ switch {
+ case r.read < r.b.data.Len():
+ buf := r.b.data.Bytes()
+ r.b.cond.L.Unlock()
+ n := copy(p, buf[r.read:])
+ r.read += n
+ return n, nil
+ case r.b.err != nil || len(p) == 0:
+ // r.b.err != nil means we reached EOF. And
+ // even if we're not at EOF, there's no need
+ // to block if len(p)==0.
+ err := r.b.err
+ r.b.cond.L.Unlock()
+ return 0, err
+ default:
+ r.b.cond.Wait()
+ }
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+ "crypto/md5"
+ "errors"
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestNoWrites(c *check.C) {
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ r2 := b.NewReader()
+ b.Close()
+ s.checkReader(c, r1, []byte{}, nil, nil)
+ s.checkReader(c, r2, []byte{}, nil, nil)
+}
+
+func (s *Suite) TestNoReaders(c *check.C) {
+ b := NewBuffer(nil)
+ n, err := b.Write([]byte("foobar"))
+ err2 := b.Close()
+ c.Check(n, check.Equals, 6)
+ c.Check(err, check.IsNil)
+ c.Check(err2, check.IsNil)
+}
+
+func (s *Suite) TestWriteReadClose(c *check.C) {
+ done := make(chan bool, 2)
+ b := NewBuffer(nil)
+ n, err := b.Write([]byte("foobar"))
+ c.Check(n, check.Equals, 6)
+ c.Check(err, check.IsNil)
+ r1 := b.NewReader()
+ r2 := b.NewReader()
+ go s.checkReader(c, r1, []byte("foobar"), nil, done)
+ go s.checkReader(c, r2, []byte("foobar"), nil, done)
+ time.Sleep(time.Millisecond)
+ c.Check(len(done), check.Equals, 0)
+ b.Close()
+ <-done
+ <-done
+}
+
+func (s *Suite) TestPrefillWriteCloseRead(c *check.C) {
+ done := make(chan bool, 2)
+ b := NewBuffer([]byte("baz"))
+ n, err := b.Write([]byte("waz"))
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+ b.Close()
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwaz"), nil, done)
+ r2 := b.NewReader()
+ go s.checkReader(c, r2, []byte("bazwaz"), nil, done)
+ <-done
+ <-done
+}
+
+func (s *Suite) TestWriteReadCloseRead(c *check.C) {
+ done := make(chan bool, 1)
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwazqux"), nil, done)
+
+ b.Write([]byte("bazwaz"))
+
+ r2 := b.NewReader()
+ r2.Read(make([]byte, 3))
+
+ b.Write([]byte("qux"))
+ b.Close()
+
+ s.checkReader(c, r2, []byte("wazqux"), nil, nil)
+ <-done
+}
+
+func (s *Suite) TestReadAtEOF(c *check.C) {
+ buf := make([]byte, 8)
+
+ b := NewBuffer([]byte{1, 2, 3})
+
+ r := b.NewReader()
+ n, err := r.Read(buf)
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+
+ // Reading zero bytes at EOF, but before Close(), doesn't
+ // block or error
+ done := make(chan bool)
+ go func() {
+ defer close(done)
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.IsNil)
+ }()
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ c.Error("timeout")
+ }
+
+ b.Close()
+
+ // Reading zero bytes after Close() returns EOF
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+
+ // Reading from start after Close() returns 3 bytes, then EOF
+ r = b.NewReader()
+ n, err = r.Read(buf)
+ c.Check(n, check.Equals, 3)
+ if err != nil {
+ c.Check(err, check.Equals, io.EOF)
+ }
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+ n, err = r.Read(buf)
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+}
+
+func (s *Suite) TestCloseWithError(c *check.C) {
+ errFake := errors.New("it's not even a real error")
+
+ done := make(chan bool, 1)
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done)
+
+ b.Write([]byte("bazwaz"))
+
+ r2 := b.NewReader()
+ r2.Read(make([]byte, 3))
+
+ b.Write([]byte("qux"))
+ b.CloseWithError(errFake)
+
+ s.checkReader(c, r2, []byte("wazqux"), errFake, nil)
+ <-done
+}
+
+// Write n*n bytes, n at a time; read them into n goroutines using
+// varying buffer sizes; compare checksums.
+func (s *Suite) TestManyReaders(c *check.C) {
+ const n = 256
+
+ b := NewBuffer(nil)
+
+ expectSum := make(chan []byte)
+ go func() {
+ hash := md5.New()
+ buf := make([]byte, n)
+ for i := 0; i < n; i++ {
+ time.Sleep(10 * time.Nanosecond)
+ rand.Read(buf)
+ b.Write(buf)
+ hash.Write(buf)
+ }
+ expectSum <- hash.Sum(nil)
+ b.Close()
+ }()
+
+ gotSum := make(chan []byte)
+ for i := 0; i < n; i++ {
+ go func(bufSize int) {
+ got := md5.New()
+ io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize))
+ gotSum <- got.Sum(nil)
+ }(i + n/2)
+ }
+
+ expect := <-expectSum
+ for i := 0; i < n; i++ {
+ c.Check(expect, check.DeepEquals, <-gotSum)
+ }
+}
+
+func (s *Suite) BenchmarkOneReader(c *check.C) {
+ s.benchmarkReaders(c, 1)
+}
+
+func (s *Suite) BenchmarkManyReaders(c *check.C) {
+ s.benchmarkReaders(c, 100)
+}
+
+func (s *Suite) benchmarkReaders(c *check.C, readers int) {
+ var n int64
+ t0 := time.Now()
+
+ buf := make([]byte, 10000)
+ rand.Read(buf)
+ for i := 0; i < 10; i++ {
+ b := NewBuffer(nil)
+ go func() {
+ for i := 0; i < c.N; i++ {
+ b.Write(buf)
+ }
+ b.Close()
+ }()
+
+ var wg sync.WaitGroup
+ for i := 0; i < readers; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ nn, _ := io.Copy(ioutil.Discard, b.NewReader())
+ atomic.AddInt64(&n, int64(nn))
+ }()
+ }
+ wg.Wait()
+ }
+ c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000)
+}
+
+func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
+ buf, err := ioutil.ReadAll(r)
+ c.Check(err, check.Equals, expectError)
+ c.Check(buf, check.DeepEquals, expectData)
+ if done != nil {
+ done <- true
+ }
+}
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
_, err = io.Copy(this.Hash, this.Reader)
if closer, ok := this.Reader.(io.Closer); ok {
- err2 := closer.Close()
- if err2 != nil && err == nil {
- return err2
+ closeErr := closer.Close()
+ if err == nil {
+ err = closeErr
}
}
if err != nil {
return err
}
-
- sum := this.Hash.Sum(nil)
- if fmt.Sprintf("%x", sum) != this.Check {
- err = BadChecksum
+ if fmt.Sprintf("%x", this.Hash.Sum(nil)) != this.Check {
+ return BadChecksum
}
-
- return err
+ return nil
}
"time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
+ "git.curoverse.com/arvados.git/sdk/go/asyncbuf"
)
// A Keep "block" is 64MB.
bufsize = BLOCKSIZE
}
- t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
- defer t.Close()
-
- return kc.putReplicas(hash, t, dataBytes)
+ buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+ go func() {
+ _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+ buf.CloseWithError(err)
+ }()
+ return kc.putReplicas(hash, buf.NewReader, dataBytes)
}
// PutHB writes a block to Keep. The hash of the bytes is given in
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
- t := streamer.AsyncStreamFromSlice(buf)
- defer t.Close()
- return kc.putReplicas(hash, t, int64(len(buf)))
+ newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+ return kc.putReplicas(hash, newReader, int64(len(buf)))
}
// PutB writes a block to Keep. It computes the hash itself.
package keepclient
import (
+ "bytes"
"crypto/md5"
"errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
. "gopkg.in/check.v1"
)
make(chan string)}
UploadToStubHelper(c, st,
- func(kc *KeepClient, url string, reader io.ReadCloser,
- writer io.WriteCloser, upload_status chan uploadStatus) {
-
- tr := streamer.AsyncStreamFromReader(512, reader)
- defer tr.Close()
-
- br1 := tr.MakeStreamReader()
-
- go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
-
- writer.Write([]byte("foo"))
- writer.Close()
+ func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
+ go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, 0)
<-st.handled
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
)
// Function used to emit debug messages. The easiest way to enable
response string
}
-func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
upload_status chan<- uploadStatus, expectedLength int64, requestID int32) {
var req *http.Request
if req, err = http.NewRequest("PUT", url, nil); err != nil {
DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error())
upload_status <- uploadStatus{err, url, 0, 0, ""}
- body.Close()
return
}
req.ContentLength = expectedLength
if expectedLength > 0 {
- // Do() will close the body ReadCloser when it is done
- // with it.
- req.Body = body
+ req.Body = ioutil.NopCloser(body)
} else {
- // "For client requests, a value of 0 means unknown if Body is
- // not nil." In this case we do want the body to be empty, so
- // don't set req.Body. However, we still need to close the
- // body ReadCloser.
- body.Close()
+ // "For client requests, a value of 0 means unknown if
+ // Body is not nil." In this case we do want the body
+ // to be empty, so don't set req.Body.
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
func (this *KeepClient) putReplicas(
hash string,
- tr *streamer.AsyncStream,
+ getReader func() io.Reader,
expectedLength int64) (locator string, replicas int, err error) {
// Generate an arbitrary ID to identify this specific
// Start some upload requests
if next_server < len(sv) {
DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
+ go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID)
next_server += 1
active += 1
} else {
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* AsyncStream pulls data in from a io.Reader source (such as a file or network
-socket) and fans out to any number of StreamReader sinks.
-
-Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
-any point in the lifetime of the AsyncStream, and each StreamReader will read
-the contents of the buffer up to the "frontier" of the buffer, at which point
-the StreamReader blocks until new data is read from the source.
-
-This is useful for minimizing readthrough latency as sinks can read and act on
-data from the source without waiting for the source to be completely buffered.
-It is also useful as a cache in situations where re-reading the original source
-potentially is costly, since the buffer retains a copy of the source data.
-
-Usage:
-
-Begin reading into a buffer with maximum size 'buffersize' from 'source':
- stream := AsyncStreamFromReader(buffersize, source)
-
-To create a new reader (this can be called multiple times, each reader starts
-at the beginning of the buffer):
- reader := tr.MakeStreamReader()
-
-Make sure to close the reader when you're done with it.
- reader.Close()
-
-When you're done with the stream:
- stream.Close()
-
-Alternately, if you already have a filled buffer and just want to read out from it:
- stream := AsyncStreamFromSlice(buf)
-
- r := tr.MakeStreamReader()
-
-*/
-
-package streamer
-
-import (
- "errors"
- "io"
-)
-
-var ErrAlreadyClosed = errors.New("cannot close a stream twice")
-
-type AsyncStream struct {
- buffer []byte
- requests chan sliceRequest
- add_reader chan bool
- subtract_reader chan bool
- wait_zero_readers chan bool
- closed bool
-}
-
-// Reads from the buffer managed by the Transfer()
-type StreamReader struct {
- offset int
- stream *AsyncStream
- responses chan sliceResult
-}
-
-func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
- t := &AsyncStream{
- buffer: make([]byte, buffersize),
- requests: make(chan sliceRequest),
- add_reader: make(chan bool),
- subtract_reader: make(chan bool),
- wait_zero_readers: make(chan bool),
- }
-
- go t.transfer(source)
- go t.readersMonitor()
-
- return t
-}
-
-func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{
- buffer: buf,
- requests: make(chan sliceRequest),
- add_reader: make(chan bool),
- subtract_reader: make(chan bool),
- wait_zero_readers: make(chan bool),
- }
-
- go t.transfer(nil)
- go t.readersMonitor()
-
- return t
-}
-
-func (this *AsyncStream) MakeStreamReader() *StreamReader {
- this.add_reader <- true
- return &StreamReader{0, this, make(chan sliceResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this *StreamReader) Read(p []byte) (n int, err error) {
- this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
- rr, valid := <-this.responses
- if valid {
- this.offset += len(rr.slice)
- return copy(p, rr.slice), rr.err
- } else {
- return 0, io.ErrUnexpectedEOF
- }
-}
-
-func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
- // Record starting offset in order to correctly report the number of bytes sent
- starting_offset := this.offset
- for {
- this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
- rr, valid := <-this.responses
- if valid {
- this.offset += len(rr.slice)
- if rr.err != nil {
- if rr.err == io.EOF {
- // EOF is not an error.
- return int64(this.offset - starting_offset), nil
- } else {
- return int64(this.offset - starting_offset), rr.err
- }
- } else {
- dest.Write(rr.slice)
- }
- } else {
- return int64(this.offset), io.ErrUnexpectedEOF
- }
- }
-}
-
-// Close the responses channel
-func (this *StreamReader) Close() error {
- if this.stream == nil {
- return ErrAlreadyClosed
- }
- this.stream.subtract_reader <- true
- close(this.responses)
- this.stream = nil
- return nil
-}
-
-func (this *AsyncStream) Close() error {
- if this.closed {
- return ErrAlreadyClosed
- }
- this.closed = true
- this.wait_zero_readers <- true
- close(this.requests)
- close(this.add_reader)
- close(this.subtract_reader)
- close(this.wait_zero_readers)
- return nil
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package streamer
-
-import (
- . "gopkg.in/check.v1"
- "io"
- "testing"
- "time"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) { TestingT(t) }
-
-var _ = Suite(&StandaloneSuite{})
-
-// Standalone tests
-type StandaloneSuite struct{}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
- ReadIntoBufferHelper(c, 225)
- ReadIntoBufferHelper(c, 224)
-}
-
-func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-}
-
-func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 96)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 96; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 96) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-}
-
-func ReadIntoBufferHelper(c *C, bufsize int) {
- buffer := make([]byte, bufsize)
-
- reader, writer := io.Pipe()
- slices := make(chan nextSlice)
-
- go readIntoBuffer(buffer, reader, slices)
-
- HelperWrite128andCheck(c, buffer, writer, slices)
- HelperWrite96andCheck(c, buffer, writer, slices)
-
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.EOF)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
- buffer := make([]byte, 223)
- reader, writer := io.Pipe()
- slices := make(chan nextSlice)
-
- go readIntoBuffer(buffer, reader, slices)
-
- HelperWrite128andCheck(c, buffer, writer, slices)
-
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
-
- // Write will deadlock because it can't write all the data, so
- // spin it off to a goroutine
- go writer.Write(out)
- s1 := <-slices
-
- c.Check(len(s1.slice), Equals, 95)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 95; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 95) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-
- writer.Close()
- s1 = <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(512, reader)
-
- br1 := tr.MakeStreamReader()
- out := make([]byte, 128)
-
- {
- // Write some data, and read into a buffer shorter than
- // available data
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
-
- writer.Write(out[:100])
-
- in := make([]byte, 64)
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Write some more data, and read into buffer longer than
- // available data
- in := make([]byte, 64)
- n, err := br1.Read(in)
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, out[64+i])
- }
-
- }
-
- {
- // Test read before write
- type Rd struct {
- n int
- err error
- }
- rd := make(chan Rd)
- in := make([]byte, 64)
-
- go func() {
- n, err := br1.Read(in)
- rd <- Rd{n, err}
- }()
-
- time.Sleep(100 * time.Millisecond)
- writer.Write(out[100:])
-
- got := <-rd
-
- c.Check(got.n, Equals, 28)
- c.Check(got.err, Equals, nil)
-
- for i := 0; i < 28; i += 1 {
- c.Check(in[i], Equals, out[100+i])
- }
- }
-
- br2 := tr.MakeStreamReader()
- {
- // Test 'catch up' reader
- in := make([]byte, 256)
- n, err := br2.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Test closing the reader
- writer.Close()
-
- in := make([]byte, 256)
- n1, err1 := br1.Read(in)
- n2, err2 := br2.Read(in)
- c.Check(n1, Equals, 0)
- c.Check(err1, Equals, io.EOF)
- c.Check(n2, Equals, 0)
- c.Check(err2, Equals, io.EOF)
- }
-
- {
- // Test 'catch up' reader after closing
- br3 := tr.MakeStreamReader()
- in := make([]byte, 256)
- n, err := br3.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
-
- n, err = br3.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(100, reader)
- defer tr.Close()
-
- sr := tr.MakeStreamReader()
- defer sr.Close()
-
- out := make([]byte, 101)
- go writer.Write(out)
-
- n, err := sr.Read(out)
- c.Check(n, Equals, 100)
- c.Check(err, IsNil)
-
- n, err = sr.Read(out)
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- tr := AsyncStreamFromSlice(buffer)
-
- br1 := tr.MakeStreamReader()
-
- in := make([]byte, 64)
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, buffer[i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, buffer[64+i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- tr := AsyncStreamFromSlice(buffer)
- defer tr.Close()
-
- br1 := tr.MakeStreamReader()
- defer br1.Close()
-
- reader, writer := io.Pipe()
-
- go func() {
- p := make([]byte, 100)
- n, err := reader.Read(p)
- c.Check(n, Equals, 100)
- c.Check(err, Equals, nil)
- c.Check(p, DeepEquals, buffer)
- }()
-
- io.Copy(writer, br1)
-}
-
-func (s *StandaloneSuite) TestManyReaders(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(512, reader)
- defer tr.Close()
-
- sr := tr.MakeStreamReader()
- go func() {
- time.Sleep(100 * time.Millisecond)
- sr.Close()
- }()
-
- for i := 0; i < 200; i += 1 {
- go func() {
- br1 := tr.MakeStreamReader()
- defer br1.Close()
-
- p := make([]byte, 3)
- n, err := br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("foo"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("bar"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("baz"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }()
- }
-
- writer.Write([]byte("foo"))
- writer.Write([]byte("bar"))
- writer.Write([]byte("baz"))
- writer.Close()
-}
-
-func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
- buffer := make([]byte, 100)
- tr := AsyncStreamFromSlice(buffer)
- sr := tr.MakeStreamReader()
- c.Check(sr.Close(), IsNil)
- c.Check(sr.Close(), Equals, ErrAlreadyClosed)
- c.Check(tr.Close(), IsNil)
- c.Check(tr.Close(), Equals, ErrAlreadyClosed)
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* Internal implementation of AsyncStream.
-Outline of operation:
-
-The kernel is the transfer() goroutine. It manages concurrent reads and
-appends to the "body" slice. "body" is a slice of "source_buffer" that
-represents the segment of the buffer that is already filled in and available
-for reading.
-
-To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
-from the io.Reader source directly into source_buffer. Each read goes into a
-slice of buffer which spans the section immediately following the end of the
-current "body". Each time a Read completes, a slice representing the the
-section just filled in (or any read errors/EOF) is sent over the "slices"
-channel back to the transfer() function.
-
-Meanwhile, the transfer() function selects() on two channels, the "requests"
-channel and the "slices" channel.
-
-When a message is received on the "slices" channel, this means the a new
-section of the buffer has data, or an error is signaled. Since the data has
-been read directly into the source_buffer, it is able to simply increases the
-size of the body slice to encompass the newly filled in section. Then any
-pending reads are serviced with handleReadRequest (described below).
-
-When a message is received on the "requests" channel, it means a StreamReader
-wants access to a slice of the buffer. This is passed to handleReadRequest().
-
-The handleReadRequest() function takes a sliceRequest consisting of a buffer
-offset, maximum size, and channel to send the response. If there was an error
-reported from the source reader, it is returned. If the offset is less than
-the size of the body, the request can proceed, and it sends a body slice
-spanning the segment from offset to min(offset+maxsize, end of the body). If
-source reader status is EOF (done filling the buffer) and the read request
-offset is beyond end of the body, it responds with EOF. Otherwise, the read
-request is for a slice beyond the current size of "body" but we expect the body
-to expand as more data is added, so the request gets added to a wait list.
-
-The transfer() runs until the requests channel is closed by AsyncStream.Close()
-
-To track readers, streamer uses the readersMonitor() goroutine. This goroutine
-chooses which channels to receive from based on the number of outstanding
-readers. When a new reader is created, it sends a message on the add_reader
-channel. If the number of readers is already at MAX_READERS, this blocks the
-sender until an existing reader is closed. When a reader is closed, it sends a
-message on the subtract_reader channel. Finally, when AsyncStream.Close() is
-called, it sends a message on the wait_zero_readers channel, which will block
-the sender unless there are zero readers and it is safe to shut down the
-AsyncStream.
-*/
-
-package streamer
-
-import (
- "io"
-)
-
-const MAX_READERS = 100
-
-// A slice passed from readIntoBuffer() to transfer()
-type nextSlice struct {
- slice []byte
- reader_error error
-}
-
-// A read request to the Transfer() function
-type sliceRequest struct {
- offset int
- maxsize int
- result chan<- sliceResult
-}
-
-// A read result from the Transfer() function
-type sliceResult struct {
- slice []byte
- err error
-}
-
-// Supports writing into a buffer
-type bufferWriter struct {
- buf []byte
- ptr int
-}
-
-// Copy p into this.buf, increment pointer and return number of bytes read.
-func (this *bufferWriter) Write(p []byte) (n int, err error) {
- n = copy(this.buf[this.ptr:], p)
- this.ptr += n
- return n, nil
-}
-
-// Read repeatedly from the reader and write sequentially into the specified
-// buffer, and report each read to channel 'c'. Completes when Reader 'r'
-// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
- defer close(slices)
-
- if writeto, ok := r.(io.WriterTo); ok {
- n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
- if err != nil {
- slices <- nextSlice{nil, err}
- } else {
- slices <- nextSlice{buffer[:n], nil}
- slices <- nextSlice{nil, io.EOF}
- }
- return
- } else {
- // Initially entire buffer is available
- ptr := buffer[:]
- for {
- var n int
- var err error
- if len(ptr) > 0 {
- const readblock = 64 * 1024
- // Read 64KiB into the next part of the buffer
- if len(ptr) > readblock {
- n, err = r.Read(ptr[:readblock])
- } else {
- n, err = r.Read(ptr)
- }
- } else {
- // Ran out of buffer space, try reading one more byte
- var b [1]byte
- n, err = r.Read(b[:])
-
- if n > 0 {
- // Reader has more data but we have nowhere to
- // put it, so we're stuffed
- slices <- nextSlice{nil, io.ErrShortBuffer}
- } else {
- // Return some other error (hopefully EOF)
- slices <- nextSlice{nil, err}
- }
- return
- }
-
- // End on error (includes EOF)
- if err != nil {
- slices <- nextSlice{nil, err}
- return
- }
-
- if n > 0 {
- // Make a slice with the contents of the read
- slices <- nextSlice{ptr[:n], nil}
-
- // Adjust the scratch space slice
- ptr = ptr[n:]
- }
- }
- }
-}
-
-// Handle a read request. Returns true if a response was sent, and false if
-// the request should be queued.
-func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
- if (reader_status != nil) && (reader_status != io.EOF) {
- req.result <- sliceResult{nil, reader_status}
- return true
- } else if req.offset < len(body) {
- var end int
- if req.offset+req.maxsize < len(body) {
- end = req.offset + req.maxsize
- } else {
- end = len(body)
- }
- req.result <- sliceResult{body[req.offset:end], nil}
- return true
- } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
- req.result <- sliceResult{nil, io.EOF}
- return true
- } else {
- return false
- }
-}
-
-// Mediates between reads and appends.
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel. Completes
-// when 'requests' channel is closed.
-func (this *AsyncStream) transfer(source_reader io.Reader) {
- source_buffer := this.buffer
- requests := this.requests
-
- // currently buffered data
- var body []byte
-
- // for receiving slices from readIntoBuffer
- var slices chan nextSlice = nil
-
- // indicates the status of the underlying reader
- var reader_status error = nil
-
- if source_reader != nil {
- // 'body' is the buffer slice representing the body content read so far
- body = source_buffer[:0]
-
- // used to communicate slices of the buffer as they are
- // readIntoBuffer will close 'slices' when it is done with it
- slices = make(chan nextSlice)
-
- // Spin it off
- go readIntoBuffer(source_buffer, source_reader, slices)
- } else {
- // use the whole buffer
- body = source_buffer[:]
-
- // buffer is complete
- reader_status = io.EOF
- }
-
- pending_requests := make([]sliceRequest, 0)
-
- for {
- select {
- case req, valid := <-requests:
- // Handle a buffer read request
- if valid {
- if !handleReadRequest(req, body, reader_status) {
- pending_requests = append(pending_requests, req)
- }
- } else {
- // closed 'requests' channel indicates we're done
- return
- }
-
- case bk, valid := <-slices:
- // Got a new slice from the reader
- if valid {
- reader_status = bk.reader_error
-
- if bk.slice != nil {
- // adjust body bounds now that another slice has been read
- body = source_buffer[0 : len(body)+len(bk.slice)]
- }
-
- // handle pending reads
- n := 0
- for n < len(pending_requests) {
- if handleReadRequest(pending_requests[n], body, reader_status) {
- // move the element from the back of the slice to
- // position 'n', then shorten the slice by one element
- pending_requests[n] = pending_requests[len(pending_requests)-1]
- pending_requests = pending_requests[0 : len(pending_requests)-1]
- } else {
-
- // Request wasn't handled, so keep it in the request slice
- n += 1
- }
- }
- } else {
- if reader_status == nil {
- // slices channel closed without signaling EOF
- reader_status = io.ErrUnexpectedEOF
- }
- slices = nil
- }
- }
- }
-}
-
-func (this *AsyncStream) readersMonitor() {
- var readers int = 0
-
- for {
- if readers == 0 {
- select {
- case _, ok := <-this.wait_zero_readers:
- if ok {
- // nothing, just implicitly unblock the sender
- } else {
- return
- }
- case _, ok := <-this.add_reader:
- if ok {
- readers += 1
- } else {
- return
- }
- }
- } else if readers > 0 && readers < MAX_READERS {
- select {
- case _, ok := <-this.add_reader:
- if ok {
- readers += 1
- } else {
- return
- }
-
- case _, ok := <-this.subtract_reader:
- if ok {
- readers -= 1
- } else {
- return
- }
- }
- } else if readers == MAX_READERS {
- _, ok := <-this.subtract_reader
- if ok {
- readers -= 1
- } else {
- return
- }
- }
- }
-}
end
def require_auth_scope
- if @read_auths.empty?
+ unless current_user && @read_auths.any? { |auth| auth.user.andand.uuid == current_user.uuid }
if require_login != false
send_error("Forbidden", status: 403)
end
user = nil
api_client = nil
api_client_auth = nil
- supplied_token =
- params["api_token"] ||
- params["oauth_token"] ||
- env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1]
- if supplied_token
- api_client_auth = ApiClientAuthorization.
+ if request.get? || params["_method"] == 'GET'
+ reader_tokens = params["reader_tokens"]
+ if reader_tokens.is_a? String
+ reader_tokens = SafeJSON.load(reader_tokens)
+ end
+ else
+ reader_tokens = nil
+ end
+
+ # Set current_user etc. based on the primary session token if a
+ # valid one is present. Otherwise, use the first valid token in
+ # reader_tokens.
+ [params["api_token"],
+ params["oauth_token"],
+ env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1],
+ *reader_tokens,
+ ].each do |supplied|
+ next if !supplied
+ try_auth = ApiClientAuthorization.
includes(:api_client, :user).
- where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
+ where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied).
first
- if api_client_auth.andand.user
+ if try_auth.andand.user
+ api_client_auth = try_auth
user = api_client_auth.user
api_client = api_client_auth.api_client
- else
- # Token seems valid, but points to a non-existent (deleted?) user.
- api_client_auth = nil
+ break
end
end
Thread.current[:api_client_ip_address] = remote_ip
assert_response 403
end
+ test "narrow + wide scoped tokens for different users" do
+ get_args = [{
+ reader_tokens: [api_client_authorizations(:anonymous).api_token]
+ }, auth(:active_userlist)]
+ get(v1_url('users'), *get_args)
+ assert_response :success
+ get(v1_url('users', ''), *get_args) # Add trailing slash.
+ assert_response :success
+ get(v1_url('users', 'current'), *get_args)
+ assert_response 403
+ get(v1_url('virtual_machines'), *get_args)
+ assert_response 403
+ end
+
test "specimens token can see exactly owned specimens" do
get_args = [{}, auth(:active_specimens)]
get(v1_url('specimens'), *get_args)
[nil, :active_noscope].each do |main_auth|
[:spectator, :spectator_specimens].each do |read_auth|
- test "#{main_auth} auth with reader token #{read_auth} can read" do
- assert_includes(get_specimen_uuids(main_auth, read_auth),
- spectator_specimen, "did not find spectator specimen")
- end
-
- test "#{main_auth} auth with JSON read token #{read_auth} can read" do
- assert_includes(get_specimen_uuids(main_auth, read_auth, :to_json),
- spectator_specimen, "did not find spectator specimen")
- end
-
- test "#{main_auth} auth with reader token #{read_auth} can't write" do
- assert_post_denied(main_auth, read_auth)
- end
+ [:to_a, :to_json].each do |formatter|
+ test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can#{"'t" if main_auth} read" do
+ get_specimens(main_auth, read_auth)
+ assert_response(if main_auth then 403 else 200 end)
+ end
- test "#{main_auth} auth with JSON read token #{read_auth} can't write" do
- assert_post_denied(main_auth, read_auth, :to_json)
+ test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can't write" do
+ assert_post_denied(main_auth, read_auth, formatter)
+ end
end
end
end
"fmt"
"io/ioutil"
"log"
+ "math/rand"
"net/http"
"net/http/httptest"
"os"
"strings"
+ "sync"
"testing"
"time"
}
}
+func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+ router.(*proxyHandler).timeout = time.Nanosecond
+
+ buf := make([]byte, 1<<20)
+ rand.Read(buf)
+ var wg sync.WaitGroup
+ for i := 0; i < 128; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ kc.PutB(buf)
+ }()
+ }
+ done := make(chan bool)
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ c.Error("timeout")
+ }
+}
+
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
kc := runProxy(c, nil, false)
defer closeListener()
create_calls += 1
if create_calls < 2:
raise RateLimitReachedError(429, "Rate limit exceeded",
- retry_after=12)
+ headers={'retry-after': '12'})
elif create_calls < 3:
raise BaseHTTPError(429, "Rate limit exceeded",
{'retry-after': '2'})
'setuptools'
],
dependency_links=[
- "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev2.zip"
+ "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev3.zip"
],
test_suite='tests',
tests_require=[
'requests',
'pbr<1.7.0',
'mock>=1.0',
- 'apache-libcloud==2.2.2.dev2',
+ 'apache-libcloud==2.2.2.dev3',
],
zip_safe=False,
cmdclass={'egg_info': tagger},
SSO_ROOT="$ARVBOX_DATA/sso-devise-omniauth-provider"
fi
+if test -z "$COMPOSER_ROOT" ; then
+ COMPOSER_ROOT="$ARVBOX_DATA/composer"
+fi
+
PG_DATA="$ARVBOX_DATA/postgres"
VAR_DATA="$ARVBOX_DATA/var"
PASSENGER="$ARVBOX_DATA/passenger"
if ! test -d "$SSO_ROOT" ; then
git clone https://github.com/curoverse/sso-devise-omniauth-provider.git "$SSO_ROOT"
fi
+ if ! test -d "$COMPOSER_ROOT" ; then
+ git clone https://github.com/curoverse/composer.git "$COMPOSER_ROOT"
+ fi
if test "$CONFIG" = test ; then
--privileged \
"--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
"--volume=$SSO_ROOT:/usr/src/sso:rw" \
+ "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
"--volume=$PG_DATA:/var/lib/postgresql:rw" \
"--volume=$VAR_DATA:/var/lib/arvados:rw" \
"--volume=$PASSENGER:/var/lib/passenger:rw" \
--privileged \
"--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
"--volume=$SSO_ROOT:/usr/src/sso:rw" \
+ "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
"--volume=$PG_DATA:/var/lib/postgresql:rw" \
"--volume=$VAR_DATA:/var/lib/arvados:rw" \
"--volume=$PASSENGER:/var/lib/passenger:rw" \
libjson-perl nginx gitolite3 lsof libreadline-dev \
apt-transport-https ca-certificates slurm-wlm \
linkchecker python3-virtualenv python-virtualenv xvfb iceweasel \
- libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr && \
+ libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr \
+ libsecret-1-dev && \
apt-get clean
ENV RUBYVERSION_MINOR 2.3
[workbench]=80
[api]=8000
[sso]=8900
+ [composer]=4200
[arv-git-httpd]=9001
[keep-web]=9002
[keepproxy]=25100
--uid $HOSTUID --gid $HOSTGID \
--non-unique \
--groups docker \
+ --shell /bin/bash \
arvbox
useradd --home-dir /var/lib/arvados/git --uid $HOSTUID --gid $HOSTGID --non-unique git
useradd --groups docker crunch
--- /dev/null
+/usr/local/lib/arvbox/logger
\ No newline at end of file
--- /dev/null
+#!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+set -e
+
+/usr/local/lib/arvbox/runsu.sh $0-service $1
--- /dev/null
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+exec 2>&1
+set -ex -o pipefail
+
+. /usr/local/lib/arvbox/common.sh
+
+cd /usr/src/composer
+
+npm install yarn
+
+PATH=$PATH:/usr/src/composer/node_modules/.bin
+
+yarn install
+
+if test "$1" != "--only-deps" ; then
+ echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/arvados-configuration.yml
+ exec ng serve --host 0.0.0.0 --port 4200 --env=webdev
+fi
fi
run_bundler --without=development
-bundle exec passenger start --runtime-check-only --runtime-dir=/var/lib/passenger
+bundle exec passenger-config build-native-support
+bundle exec passenger-config install-standalone-runtime
if test "$1" = "--only-deps" ; then
exit
fi
exec bundle exec passenger start --port=${services[sso]} \
- --runtime-dir=/var/lib/passenger \
--ssl --ssl-certificate=/var/lib/arvados/self-signed.pem \
--ssl-certificate-key=/var/lib/arvados/self-signed.key