//
// SPDX-License-Identifier: AGPL-3.0
-window.CollectionsTable = {
+window.SearchResultsTable = {
maybeLoadMore: function(dom) {
var loader = this.loader
if (loader.state != loader.READY)
},
view: function(vnode) {
var loader = vnode.attrs.loader
+ var iconsMap = {
+ collections: m('i.fa.fa-fw.fa-archive'),
+ projects: m('i.fa.fa-fw.fa-folder'),
+ }
return m('table.table.table-condensed', [
m('thead', m('tr', [
m('th'),
m('td', [
item.workbenchBaseURL() &&
m('a.btn.btn-xs.btn-default', {
- href: item.workbenchBaseURL()+'collections/'+item.uuid,
- }, 'Show'),
+ 'data-original-title': 'show '+item.objectType.description,
+ 'data-placement': 'top',
+ 'data-toggle': 'tooltip',
+ href: item.workbenchBaseURL()+'/'+item.objectType.wb_path+'/'+item.uuid,
+ // Bootstrap's tooltip feature
+ oncreate: function(vnode) { $(vnode.dom).tooltip() },
+ }, iconsMap[item.objectType.wb_path]),
]),
m('td.arvados-uuid', item.uuid),
m('td', item.name || '(unnamed)'),
},
}
-window.CollectionsSearch = {
+window.Search = {
oninit: function(vnode) {
vnode.state.sessionDB = new SessionDB()
vnode.state.searchEntered = m.stream()
var workbenchBaseURL = function() {
return vnode.state.sessionDB.workbenchBaseURL(session)
}
- return new MultipageLoader({
+ var searchable_objects = [
+ {
+ wb_path: 'projects',
+ api_path: 'arvados/v1/groups',
+ filters: [['group_class', '=', 'project']],
+ description: 'project',
+ },
+ {
+ wb_path: 'collections',
+ api_path: 'arvados/v1/collections',
+ filters: [],
+ description: 'collection',
+ },
+ ]
+ return new MergingLoader({
sessionKey: key,
- loadFunc: function(filters) {
- var tsquery = to_tsquery(q)
- if (tsquery) {
- filters = filters.slice(0)
- filters.push(['any', '@@', tsquery])
- }
- return vnode.state.sessionDB.request(session, 'arvados/v1/collections', {
- data: {
- filters: JSON.stringify(filters),
- count: 'none',
+ // For every session, search for every object type
+ children: searchable_objects.map(function(obj_type) {
+ return new MultipageLoader({
+ sessionKey: key,
+ loadFunc: function(filters) {
+ // Apply additional type dependant filters
+ filters = filters.concat(obj_type.filters)
+ var tsquery = to_tsquery(q)
+ if (tsquery) {
+ filters.push(['any', '@@', tsquery])
+ }
+ return vnode.state.sessionDB.request(session, obj_type.api_path, {
+ data: {
+ filters: JSON.stringify(filters),
+ count: 'none',
+ },
+ }).then(function(resp) {
+ resp.items.map(function(item) {
+ item.workbenchBaseURL = workbenchBaseURL
+ item.objectType = obj_type
+ })
+ return resp
+ })
},
- }).then(function(resp) {
- resp.items.map(function(item) {
- item.workbenchBaseURL = workbenchBaseURL
- })
- return resp
})
- },
+ }),
})
- })
+ }),
})
})
},
m('.row', [
m('.col-md-6', [
m('.input-group', [
- m('input#search.form-control[placeholder=Search]', {
+ m('input#search.form-control[placeholder=Search collections and projects]', {
oninput: m.withAttr('value', vnode.state.searchEntered),
value: vnode.state.searchEntered(),
}),
m('a[href="/sessions"]', 'Add/remove sites'),
]),
]),
- m(CollectionsTable, {
+ m(SearchResultsTable, {
loader: vnode.state.loader,
}),
],
return m('.container', [
m('p', [
'You can log in to multiple Arvados sites here, then use the ',
- m('a[href="/collections/multisite"]', 'multi-site search'),
- ' page to search collections on all sites at once.',
+ m('a[href="/search"]', 'multi-site search'),
+ ' page to search collections and projects on all sites at once.',
]),
m('table.table.table-condensed.table-hover', [
m('thead', m('tr', [
// the host part of apihostport is an IPv4 or [IPv6]
// address.
if (!session.baseURL.match('://(\\[|\\d+\\.\\d+\\.\\d+\\.\\d+[:/])'))
- return session.baseURL.replace('://', '://workbench.')
+ var wbUrl = session.baseURL.replace('://', '://workbench.')
+ // Remove the trailing slash, if it's there.
+ return wbUrl.slice(-1) == '/' ? wbUrl.slice(0, -1) : wbUrl
return null
},
// Return a m.stream that will get fulfilled with the
skip_around_filter(:require_thread_api_token,
only: [:show_file, :show_file_links])
skip_before_filter(:find_object_by_uuid,
- only: [:provenance, :show_file, :show_file_links, :multisite])
+ only: [:provenance, :show_file, :show_file_links])
# We depend on show_file to display the user agreement:
skip_before_filter :check_user_agreements, only: :show_file
skip_before_filter :check_user_profile, only: :show_file
# SPDX-License-Identifier: AGPL-3.0
class SearchController < ApplicationController
+ skip_before_filter :ensure_arvados_api_exists
+
def find_objects_for_index
search_what = Group
if params[:project_uuid]
@client_mtx = Mutex.new
end
- def api(resources_kind, action, data=nil, tokens={})
+ def api(resources_kind, action, data=nil, tokens={}, include_anon_token=true)
profile_checkpoint
'reader_tokens' => ((tokens[:reader_tokens] ||
Thread.current[:reader_tokens] ||
[]) +
- [Rails.configuration.anonymous_user_token]).to_json,
+ (include_anon_token ? [Rails.configuration.anonymous_user_token] : [])).to_json,
}
if !data.nil?
data.each do |k,v|
end
def self.current
- res = arvados_api_client.api self, '/current'
+ res = arvados_api_client.api self, '/current', nil, {}, false
arvados_api_client.unpack_api_response(res)
end
<%=
target = Rails.configuration.multi_site_search
if target == true
- target = {controller: 'collections', action: 'multisite'}
+ target = {controller: 'search', action: 'index'}
end
link_to("Multi-site search", target, {class: 'btn btn-default'}) %>
</form>
SPDX-License-Identifier: AGPL-3.0 -->
-<div data-mount-mithril="CollectionsSearch"></div>
+<div data-mount-mithril="Search"></div>
post 'remove_selected_files', on: :member
get 'tags', on: :member
post 'save_tags', on: :member
- get 'multisite', on: :collection
+ get 'multisite', on: :collection, to: redirect('/search')
end
get('/collections/download/:uuid/:reader_token/*file' => 'collections#show_file',
format: false)
get 'tab_counts', on: :member
get 'public', on: :collection
end
-
+
resources :search do
get 'choose', :on => :collection
end
match '/_health/ping', to: 'healthcheck#ping', via: [:get]
get '/tests/mithril', to: 'tests#mithril'
-
+
get '/status', to: 'status#status'
-
+
# Send unroutable requests to an arbitrary controller
# (ends up at ApplicationController#render_not_found)
match '*a', to: 'links#render_not_found', via: [:get, :post]
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip build-essential unzip
-
-# Install RVM
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
- curl -L https://get.rvm.io | bash -s stable && \
- /usr/local/rvm/bin/rvm install 2.3 && \
- /usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
- /usr/local/rvm/bin/rvm-exec default gem install bundler && \
- /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
-
-# Install golang binary
-ADD generated/go1.8.3.linux-amd64.tar.gz /usr/local/
-RUN ln -s /usr/local/go/bin/go /usr/local/bin/
-
-# Install nodejs and npm
-ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
-RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
-ENV WORKSPACE /arvados
-CMD ["/usr/local/rvm/bin/rvm-exec", "default", "bash", "/jenkins/run-build-packages.sh", "--target", "ubuntu1204"]
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install RVM
-RUN apt-get update && \
- apt-get -y install --no-install-recommends curl ca-certificates g++ && \
- gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
- curl -L https://get.rvm.io | bash -s stable && \
- /usr/local/rvm/bin/rvm install 2.3 && \
- /usr/local/rvm/bin/rvm alias create default ruby-2.3
-
-# udev daemon can't start in a container, so don't try.
-RUN mkdir -p /etc/udev/disabled
-
-RUN echo "deb file:///arvados/packages/ubuntu1204/ /" >>/etc/apt/sources.list
true
else
FINAL_EXITCODE=$?
+ echo
+ echo "Build packages failed for $(basename $(dirname "$dockerfile_path"))"
+ echo
fi
done
if test $FINAL_EXITCODE != 0 ; then
+ echo
echo "Build packages failed with code $FINAL_EXITCODE" >&2
+ echo
fi
exit $FINAL_EXITCODE
sdk/go/httpserver
sdk/go/manifest
sdk/go/blockdigest
-sdk/go/streamer
+sdk/go/asyncbuf
sdk/go/stats
sdk/go/crunchrunner
sdk/cwl
sdk/go/health
sdk/go/httpserver
sdk/go/manifest
- sdk/go/streamer
+ sdk/go/asyncbuf
sdk/go/crunchrunner
sdk/go/stats
lib/crunchstat
# apt.arvados.org
deb http://apt.arvados.org/ jessie main
+deb http://apt.arvados.org/ jessie-dev main
arvargs.conformance_test = None
arvargs.use_container = True
arvargs.relax_path_checks = True
- arvargs.validate = None
arvargs.print_supported_versions = False
make_fs_access = partial(CollectionFsAccess,
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+ "bytes"
+ "io"
+ "sync"
+)
+
+// A Buffer is an io.Writer that distributes written data
+// asynchronously to multiple concurrent readers.
+//
+// NewReader() can be called at any time. In all cases, every returned
+// io.Reader reads all data written to the Buffer.
+//
+// Behavior is undefined if Write is called after Close or
+// CloseWithError.
+type Buffer interface {
+ io.WriteCloser
+
+ // NewReader() returns an io.Reader that reads all data
+ // written to the Buffer.
+ NewReader() io.Reader
+
+ // Close, but return the given error (instead of io.EOF) to
+ // all readers when they reach the end of the buffer.
+ //
+ // CloseWithError(nil) is equivalent to
+ // CloseWithError(io.EOF).
+ CloseWithError(error) error
+}
+
+type buffer struct {
+ data *bytes.Buffer
+ cond sync.Cond
+ err error // nil if there might be more writes
+}
+
+// NewBuffer creates a new Buffer using buf as its initial
+// contents. The new Buffer takes ownership of buf, and the caller
+// should not use buf after this call.
+func NewBuffer(buf []byte) Buffer {
+ return &buffer{
+ data: bytes.NewBuffer(buf),
+ cond: sync.Cond{L: &sync.Mutex{}},
+ }
+}
+
+func (b *buffer) Write(p []byte) (int, error) {
+ defer b.cond.Broadcast()
+ b.cond.L.Lock()
+ defer b.cond.L.Unlock()
+ if b.err != nil {
+ return 0, b.err
+ }
+ return b.data.Write(p)
+}
+
+func (b *buffer) Close() error {
+ return b.CloseWithError(nil)
+}
+
+func (b *buffer) CloseWithError(err error) error {
+ defer b.cond.Broadcast()
+ b.cond.L.Lock()
+ defer b.cond.L.Unlock()
+ if err == nil {
+ b.err = io.EOF
+ } else {
+ b.err = err
+ }
+ return nil
+}
+
+func (b *buffer) NewReader() io.Reader {
+ return &reader{b: b}
+}
+
+type reader struct {
+ b *buffer
+ read int // # bytes already read
+}
+
+func (r *reader) Read(p []byte) (int, error) {
+ r.b.cond.L.Lock()
+ for {
+ switch {
+ case r.read < r.b.data.Len():
+ buf := r.b.data.Bytes()
+ r.b.cond.L.Unlock()
+ n := copy(p, buf[r.read:])
+ r.read += n
+ return n, nil
+ case r.b.err != nil || len(p) == 0:
+ // r.b.err != nil means we reached EOF. And
+ // even if we're not at EOF, there's no need
+ // to block if len(p)==0.
+ err := r.b.err
+ r.b.cond.L.Unlock()
+ return 0, err
+ default:
+ r.b.cond.Wait()
+ }
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+ "crypto/md5"
+ "errors"
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestNoWrites(c *check.C) {
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ r2 := b.NewReader()
+ b.Close()
+ s.checkReader(c, r1, []byte{}, nil, nil)
+ s.checkReader(c, r2, []byte{}, nil, nil)
+}
+
+func (s *Suite) TestNoReaders(c *check.C) {
+ b := NewBuffer(nil)
+ n, err := b.Write([]byte("foobar"))
+ err2 := b.Close()
+ c.Check(n, check.Equals, 6)
+ c.Check(err, check.IsNil)
+ c.Check(err2, check.IsNil)
+}
+
+func (s *Suite) TestWriteReadClose(c *check.C) {
+ done := make(chan bool, 2)
+ b := NewBuffer(nil)
+ n, err := b.Write([]byte("foobar"))
+ c.Check(n, check.Equals, 6)
+ c.Check(err, check.IsNil)
+ r1 := b.NewReader()
+ r2 := b.NewReader()
+ go s.checkReader(c, r1, []byte("foobar"), nil, done)
+ go s.checkReader(c, r2, []byte("foobar"), nil, done)
+ time.Sleep(time.Millisecond)
+ c.Check(len(done), check.Equals, 0)
+ b.Close()
+ <-done
+ <-done
+}
+
+func (s *Suite) TestPrefillWriteCloseRead(c *check.C) {
+ done := make(chan bool, 2)
+ b := NewBuffer([]byte("baz"))
+ n, err := b.Write([]byte("waz"))
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+ b.Close()
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwaz"), nil, done)
+ r2 := b.NewReader()
+ go s.checkReader(c, r2, []byte("bazwaz"), nil, done)
+ <-done
+ <-done
+}
+
+func (s *Suite) TestWriteReadCloseRead(c *check.C) {
+ done := make(chan bool, 1)
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwazqux"), nil, done)
+
+ b.Write([]byte("bazwaz"))
+
+ r2 := b.NewReader()
+ r2.Read(make([]byte, 3))
+
+ b.Write([]byte("qux"))
+ b.Close()
+
+ s.checkReader(c, r2, []byte("wazqux"), nil, nil)
+ <-done
+}
+
+func (s *Suite) TestReadAtEOF(c *check.C) {
+ buf := make([]byte, 8)
+
+ b := NewBuffer([]byte{1, 2, 3})
+
+ r := b.NewReader()
+ n, err := r.Read(buf)
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+
+ // Reading zero bytes at EOF, but before Close(), doesn't
+ // block or error
+ done := make(chan bool)
+ go func() {
+ defer close(done)
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.IsNil)
+ }()
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ c.Error("timeout")
+ }
+
+ b.Close()
+
+ // Reading zero bytes after Close() returns EOF
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+
+ // Reading from start after Close() returns 3 bytes, then EOF
+ r = b.NewReader()
+ n, err = r.Read(buf)
+ c.Check(n, check.Equals, 3)
+ if err != nil {
+ c.Check(err, check.Equals, io.EOF)
+ }
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+ n, err = r.Read(buf)
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+}
+
+func (s *Suite) TestCloseWithError(c *check.C) {
+ errFake := errors.New("it's not even a real error")
+
+ done := make(chan bool, 1)
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done)
+
+ b.Write([]byte("bazwaz"))
+
+ r2 := b.NewReader()
+ r2.Read(make([]byte, 3))
+
+ b.Write([]byte("qux"))
+ b.CloseWithError(errFake)
+
+ s.checkReader(c, r2, []byte("wazqux"), errFake, nil)
+ <-done
+}
+
+// Write n*n bytes, n at a time; read them into n goroutines using
+// varying buffer sizes; compare checksums.
+func (s *Suite) TestManyReaders(c *check.C) {
+ const n = 256
+
+ b := NewBuffer(nil)
+
+ expectSum := make(chan []byte)
+ go func() {
+ hash := md5.New()
+ buf := make([]byte, n)
+ for i := 0; i < n; i++ {
+ time.Sleep(10 * time.Nanosecond)
+ rand.Read(buf)
+ b.Write(buf)
+ hash.Write(buf)
+ }
+ expectSum <- hash.Sum(nil)
+ b.Close()
+ }()
+
+ gotSum := make(chan []byte)
+ for i := 0; i < n; i++ {
+ go func(bufSize int) {
+ got := md5.New()
+ io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize))
+ gotSum <- got.Sum(nil)
+ }(i + n/2)
+ }
+
+ expect := <-expectSum
+ for i := 0; i < n; i++ {
+ c.Check(expect, check.DeepEquals, <-gotSum)
+ }
+}
+
+func (s *Suite) BenchmarkOneReader(c *check.C) {
+ s.benchmarkReaders(c, 1)
+}
+
+func (s *Suite) BenchmarkManyReaders(c *check.C) {
+ s.benchmarkReaders(c, 100)
+}
+
+func (s *Suite) benchmarkReaders(c *check.C, readers int) {
+ var n int64
+ t0 := time.Now()
+
+ buf := make([]byte, 10000)
+ rand.Read(buf)
+ for i := 0; i < 10; i++ {
+ b := NewBuffer(nil)
+ go func() {
+ for i := 0; i < c.N; i++ {
+ b.Write(buf)
+ }
+ b.Close()
+ }()
+
+ var wg sync.WaitGroup
+ for i := 0; i < readers; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ nn, _ := io.Copy(ioutil.Discard, b.NewReader())
+ atomic.AddInt64(&n, int64(nn))
+ }()
+ }
+ wg.Wait()
+ }
+ c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000)
+}
+
+func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
+ buf, err := ioutil.ReadAll(r)
+ c.Check(err, check.Equals, expectError)
+ c.Check(buf, check.DeepEquals, expectData)
+ if done != nil {
+ done <- true
+ }
+}
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
_, err = io.Copy(this.Hash, this.Reader)
if closer, ok := this.Reader.(io.Closer); ok {
- err2 := closer.Close()
- if err2 != nil && err == nil {
- return err2
+ closeErr := closer.Close()
+ if err == nil {
+ err = closeErr
}
}
if err != nil {
return err
}
-
- sum := this.Hash.Sum(nil)
- if fmt.Sprintf("%x", sum) != this.Check {
- err = BadChecksum
+ if fmt.Sprintf("%x", this.Hash.Sum(nil)) != this.Check {
+ return BadChecksum
}
-
- return err
+ return nil
}
"time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
+ "git.curoverse.com/arvados.git/sdk/go/asyncbuf"
)
// A Keep "block" is 64MB.
bufsize = BLOCKSIZE
}
- t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
- defer t.Close()
-
- return kc.putReplicas(hash, t, dataBytes)
+ buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+ go func() {
+ _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+ buf.CloseWithError(err)
+ }()
+ return kc.putReplicas(hash, buf.NewReader, dataBytes)
}
// PutHB writes a block to Keep. The hash of the bytes is given in
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
- t := streamer.AsyncStreamFromSlice(buf)
- defer t.Close()
- return kc.putReplicas(hash, t, int64(len(buf)))
+ newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+ return kc.putReplicas(hash, newReader, int64(len(buf)))
}
// PutB writes a block to Keep. It computes the hash itself.
package keepclient
import (
+ "bytes"
"crypto/md5"
"errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
. "gopkg.in/check.v1"
)
make(chan string)}
UploadToStubHelper(c, st,
- func(kc *KeepClient, url string, reader io.ReadCloser,
- writer io.WriteCloser, upload_status chan uploadStatus) {
-
- tr := streamer.AsyncStreamFromReader(512, reader)
- defer tr.Close()
-
- br1 := tr.MakeStreamReader()
-
- go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
-
- writer.Write([]byte("foo"))
- writer.Close()
+ func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
+ go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, 0)
<-st.handled
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
)
// Function used to emit debug messages. The easiest way to enable
response string
}
-func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
upload_status chan<- uploadStatus, expectedLength int64, requestID int32) {
var req *http.Request
if req, err = http.NewRequest("PUT", url, nil); err != nil {
DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error())
upload_status <- uploadStatus{err, url, 0, 0, ""}
- body.Close()
return
}
req.ContentLength = expectedLength
if expectedLength > 0 {
- // Do() will close the body ReadCloser when it is done
- // with it.
- req.Body = body
+ req.Body = ioutil.NopCloser(body)
} else {
- // "For client requests, a value of 0 means unknown if Body is
- // not nil." In this case we do want the body to be empty, so
- // don't set req.Body. However, we still need to close the
- // body ReadCloser.
- body.Close()
+ // "For client requests, a value of 0 means unknown if
+ // Body is not nil." In this case we do want the body
+ // to be empty, so don't set req.Body.
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
func (this *KeepClient) putReplicas(
hash string,
- tr *streamer.AsyncStream,
+ getReader func() io.Reader,
expectedLength int64) (locator string, replicas int, err error) {
// Generate an arbitrary ID to identify this specific
// Start some upload requests
if next_server < len(sv) {
DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
+ go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID)
next_server += 1
active += 1
} else {
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* AsyncStream pulls data in from a io.Reader source (such as a file or network
-socket) and fans out to any number of StreamReader sinks.
-
-Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
-any point in the lifetime of the AsyncStream, and each StreamReader will read
-the contents of the buffer up to the "frontier" of the buffer, at which point
-the StreamReader blocks until new data is read from the source.
-
-This is useful for minimizing readthrough latency as sinks can read and act on
-data from the source without waiting for the source to be completely buffered.
-It is also useful as a cache in situations where re-reading the original source
-potentially is costly, since the buffer retains a copy of the source data.
-
-Usage:
-
-Begin reading into a buffer with maximum size 'buffersize' from 'source':
- stream := AsyncStreamFromReader(buffersize, source)
-
-To create a new reader (this can be called multiple times, each reader starts
-at the beginning of the buffer):
- reader := tr.MakeStreamReader()
-
-Make sure to close the reader when you're done with it.
- reader.Close()
-
-When you're done with the stream:
- stream.Close()
-
-Alternately, if you already have a filled buffer and just want to read out from it:
- stream := AsyncStreamFromSlice(buf)
-
- r := tr.MakeStreamReader()
-
-*/
-
-package streamer
-
-import (
- "errors"
- "io"
-)
-
-var ErrAlreadyClosed = errors.New("cannot close a stream twice")
-
-type AsyncStream struct {
- buffer []byte
- requests chan sliceRequest
- add_reader chan bool
- subtract_reader chan bool
- wait_zero_readers chan bool
- closed bool
-}
-
-// Reads from the buffer managed by the Transfer()
-type StreamReader struct {
- offset int
- stream *AsyncStream
- responses chan sliceResult
-}
-
-func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
- t := &AsyncStream{
- buffer: make([]byte, buffersize),
- requests: make(chan sliceRequest),
- add_reader: make(chan bool),
- subtract_reader: make(chan bool),
- wait_zero_readers: make(chan bool),
- }
-
- go t.transfer(source)
- go t.readersMonitor()
-
- return t
-}
-
-func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{
- buffer: buf,
- requests: make(chan sliceRequest),
- add_reader: make(chan bool),
- subtract_reader: make(chan bool),
- wait_zero_readers: make(chan bool),
- }
-
- go t.transfer(nil)
- go t.readersMonitor()
-
- return t
-}
-
-func (this *AsyncStream) MakeStreamReader() *StreamReader {
- this.add_reader <- true
- return &StreamReader{0, this, make(chan sliceResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this *StreamReader) Read(p []byte) (n int, err error) {
- this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
- rr, valid := <-this.responses
- if valid {
- this.offset += len(rr.slice)
- return copy(p, rr.slice), rr.err
- } else {
- return 0, io.ErrUnexpectedEOF
- }
-}
-
-func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
- // Record starting offset in order to correctly report the number of bytes sent
- starting_offset := this.offset
- for {
- this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
- rr, valid := <-this.responses
- if valid {
- this.offset += len(rr.slice)
- if rr.err != nil {
- if rr.err == io.EOF {
- // EOF is not an error.
- return int64(this.offset - starting_offset), nil
- } else {
- return int64(this.offset - starting_offset), rr.err
- }
- } else {
- dest.Write(rr.slice)
- }
- } else {
- return int64(this.offset), io.ErrUnexpectedEOF
- }
- }
-}
-
-// Close the responses channel
-func (this *StreamReader) Close() error {
- if this.stream == nil {
- return ErrAlreadyClosed
- }
- this.stream.subtract_reader <- true
- close(this.responses)
- this.stream = nil
- return nil
-}
-
-func (this *AsyncStream) Close() error {
- if this.closed {
- return ErrAlreadyClosed
- }
- this.closed = true
- this.wait_zero_readers <- true
- close(this.requests)
- close(this.add_reader)
- close(this.subtract_reader)
- close(this.wait_zero_readers)
- return nil
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package streamer
-
-import (
- . "gopkg.in/check.v1"
- "io"
- "testing"
- "time"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) { TestingT(t) }
-
-var _ = Suite(&StandaloneSuite{})
-
-// Standalone tests
-type StandaloneSuite struct{}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
- ReadIntoBufferHelper(c, 225)
- ReadIntoBufferHelper(c, 224)
-}
-
-func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-}
-
-func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 96)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 96; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 96) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-}
-
-func ReadIntoBufferHelper(c *C, bufsize int) {
- buffer := make([]byte, bufsize)
-
- reader, writer := io.Pipe()
- slices := make(chan nextSlice)
-
- go readIntoBuffer(buffer, reader, slices)
-
- HelperWrite128andCheck(c, buffer, writer, slices)
- HelperWrite96andCheck(c, buffer, writer, slices)
-
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.EOF)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
- buffer := make([]byte, 223)
- reader, writer := io.Pipe()
- slices := make(chan nextSlice)
-
- go readIntoBuffer(buffer, reader, slices)
-
- HelperWrite128andCheck(c, buffer, writer, slices)
-
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
-
- // Write will deadlock because it can't write all the data, so
- // spin it off to a goroutine
- go writer.Write(out)
- s1 := <-slices
-
- c.Check(len(s1.slice), Equals, 95)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 95; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 95) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-
- writer.Close()
- s1 = <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(512, reader)
-
- br1 := tr.MakeStreamReader()
- out := make([]byte, 128)
-
- {
- // Write some data, and read into a buffer shorter than
- // available data
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
-
- writer.Write(out[:100])
-
- in := make([]byte, 64)
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Write some more data, and read into buffer longer than
- // available data
- in := make([]byte, 64)
- n, err := br1.Read(in)
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, out[64+i])
- }
-
- }
-
- {
- // Test read before write
- type Rd struct {
- n int
- err error
- }
- rd := make(chan Rd)
- in := make([]byte, 64)
-
- go func() {
- n, err := br1.Read(in)
- rd <- Rd{n, err}
- }()
-
- time.Sleep(100 * time.Millisecond)
- writer.Write(out[100:])
-
- got := <-rd
-
- c.Check(got.n, Equals, 28)
- c.Check(got.err, Equals, nil)
-
- for i := 0; i < 28; i += 1 {
- c.Check(in[i], Equals, out[100+i])
- }
- }
-
- br2 := tr.MakeStreamReader()
- {
- // Test 'catch up' reader
- in := make([]byte, 256)
- n, err := br2.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Test closing the reader
- writer.Close()
-
- in := make([]byte, 256)
- n1, err1 := br1.Read(in)
- n2, err2 := br2.Read(in)
- c.Check(n1, Equals, 0)
- c.Check(err1, Equals, io.EOF)
- c.Check(n2, Equals, 0)
- c.Check(err2, Equals, io.EOF)
- }
-
- {
- // Test 'catch up' reader after closing
- br3 := tr.MakeStreamReader()
- in := make([]byte, 256)
- n, err := br3.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
-
- n, err = br3.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(100, reader)
- defer tr.Close()
-
- sr := tr.MakeStreamReader()
- defer sr.Close()
-
- out := make([]byte, 101)
- go writer.Write(out)
-
- n, err := sr.Read(out)
- c.Check(n, Equals, 100)
- c.Check(err, IsNil)
-
- n, err = sr.Read(out)
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- tr := AsyncStreamFromSlice(buffer)
-
- br1 := tr.MakeStreamReader()
-
- in := make([]byte, 64)
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, buffer[i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, buffer[64+i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- tr := AsyncStreamFromSlice(buffer)
- defer tr.Close()
-
- br1 := tr.MakeStreamReader()
- defer br1.Close()
-
- reader, writer := io.Pipe()
-
- go func() {
- p := make([]byte, 100)
- n, err := reader.Read(p)
- c.Check(n, Equals, 100)
- c.Check(err, Equals, nil)
- c.Check(p, DeepEquals, buffer)
- }()
-
- io.Copy(writer, br1)
-}
-
-func (s *StandaloneSuite) TestManyReaders(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(512, reader)
- defer tr.Close()
-
- sr := tr.MakeStreamReader()
- go func() {
- time.Sleep(100 * time.Millisecond)
- sr.Close()
- }()
-
- for i := 0; i < 200; i += 1 {
- go func() {
- br1 := tr.MakeStreamReader()
- defer br1.Close()
-
- p := make([]byte, 3)
- n, err := br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("foo"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("bar"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("baz"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }()
- }
-
- writer.Write([]byte("foo"))
- writer.Write([]byte("bar"))
- writer.Write([]byte("baz"))
- writer.Close()
-}
-
-func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
- buffer := make([]byte, 100)
- tr := AsyncStreamFromSlice(buffer)
- sr := tr.MakeStreamReader()
- c.Check(sr.Close(), IsNil)
- c.Check(sr.Close(), Equals, ErrAlreadyClosed)
- c.Check(tr.Close(), IsNil)
- c.Check(tr.Close(), Equals, ErrAlreadyClosed)
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* Internal implementation of AsyncStream.
-Outline of operation:
-
-The kernel is the transfer() goroutine. It manages concurrent reads and
-appends to the "body" slice. "body" is a slice of "source_buffer" that
-represents the segment of the buffer that is already filled in and available
-for reading.
-
-To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
-from the io.Reader source directly into source_buffer. Each read goes into a
-slice of buffer which spans the section immediately following the end of the
-current "body". Each time a Read completes, a slice representing the the
-section just filled in (or any read errors/EOF) is sent over the "slices"
-channel back to the transfer() function.
-
-Meanwhile, the transfer() function selects() on two channels, the "requests"
-channel and the "slices" channel.
-
-When a message is received on the "slices" channel, this means the a new
-section of the buffer has data, or an error is signaled. Since the data has
-been read directly into the source_buffer, it is able to simply increases the
-size of the body slice to encompass the newly filled in section. Then any
-pending reads are serviced with handleReadRequest (described below).
-
-When a message is received on the "requests" channel, it means a StreamReader
-wants access to a slice of the buffer. This is passed to handleReadRequest().
-
-The handleReadRequest() function takes a sliceRequest consisting of a buffer
-offset, maximum size, and channel to send the response. If there was an error
-reported from the source reader, it is returned. If the offset is less than
-the size of the body, the request can proceed, and it sends a body slice
-spanning the segment from offset to min(offset+maxsize, end of the body). If
-source reader status is EOF (done filling the buffer) and the read request
-offset is beyond end of the body, it responds with EOF. Otherwise, the read
-request is for a slice beyond the current size of "body" but we expect the body
-to expand as more data is added, so the request gets added to a wait list.
-
-The transfer() runs until the requests channel is closed by AsyncStream.Close()
-
-To track readers, streamer uses the readersMonitor() goroutine. This goroutine
-chooses which channels to receive from based on the number of outstanding
-readers. When a new reader is created, it sends a message on the add_reader
-channel. If the number of readers is already at MAX_READERS, this blocks the
-sender until an existing reader is closed. When a reader is closed, it sends a
-message on the subtract_reader channel. Finally, when AsyncStream.Close() is
-called, it sends a message on the wait_zero_readers channel, which will block
-the sender unless there are zero readers and it is safe to shut down the
-AsyncStream.
-*/
-
-package streamer
-
-import (
- "io"
-)
-
-const MAX_READERS = 100
-
-// A slice passed from readIntoBuffer() to transfer()
-type nextSlice struct {
- slice []byte
- reader_error error
-}
-
-// A read request to the Transfer() function
-type sliceRequest struct {
- offset int
- maxsize int
- result chan<- sliceResult
-}
-
-// A read result from the Transfer() function
-type sliceResult struct {
- slice []byte
- err error
-}
-
-// Supports writing into a buffer
-type bufferWriter struct {
- buf []byte
- ptr int
-}
-
-// Copy p into this.buf, increment pointer and return number of bytes read.
-func (this *bufferWriter) Write(p []byte) (n int, err error) {
- n = copy(this.buf[this.ptr:], p)
- this.ptr += n
- return n, nil
-}
-
-// Read repeatedly from the reader and write sequentially into the specified
-// buffer, and report each read to channel 'c'. Completes when Reader 'r'
-// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
- defer close(slices)
-
- if writeto, ok := r.(io.WriterTo); ok {
- n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
- if err != nil {
- slices <- nextSlice{nil, err}
- } else {
- slices <- nextSlice{buffer[:n], nil}
- slices <- nextSlice{nil, io.EOF}
- }
- return
- } else {
- // Initially entire buffer is available
- ptr := buffer[:]
- for {
- var n int
- var err error
- if len(ptr) > 0 {
- const readblock = 64 * 1024
- // Read 64KiB into the next part of the buffer
- if len(ptr) > readblock {
- n, err = r.Read(ptr[:readblock])
- } else {
- n, err = r.Read(ptr)
- }
- } else {
- // Ran out of buffer space, try reading one more byte
- var b [1]byte
- n, err = r.Read(b[:])
-
- if n > 0 {
- // Reader has more data but we have nowhere to
- // put it, so we're stuffed
- slices <- nextSlice{nil, io.ErrShortBuffer}
- } else {
- // Return some other error (hopefully EOF)
- slices <- nextSlice{nil, err}
- }
- return
- }
-
- // End on error (includes EOF)
- if err != nil {
- slices <- nextSlice{nil, err}
- return
- }
-
- if n > 0 {
- // Make a slice with the contents of the read
- slices <- nextSlice{ptr[:n], nil}
-
- // Adjust the scratch space slice
- ptr = ptr[n:]
- }
- }
- }
-}
-
-// Handle a read request. Returns true if a response was sent, and false if
-// the request should be queued.
-func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
- if (reader_status != nil) && (reader_status != io.EOF) {
- req.result <- sliceResult{nil, reader_status}
- return true
- } else if req.offset < len(body) {
- var end int
- if req.offset+req.maxsize < len(body) {
- end = req.offset + req.maxsize
- } else {
- end = len(body)
- }
- req.result <- sliceResult{body[req.offset:end], nil}
- return true
- } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
- req.result <- sliceResult{nil, io.EOF}
- return true
- } else {
- return false
- }
-}
-
-// Mediates between reads and appends.
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel. Completes
-// when 'requests' channel is closed.
-func (this *AsyncStream) transfer(source_reader io.Reader) {
- source_buffer := this.buffer
- requests := this.requests
-
- // currently buffered data
- var body []byte
-
- // for receiving slices from readIntoBuffer
- var slices chan nextSlice = nil
-
- // indicates the status of the underlying reader
- var reader_status error = nil
-
- if source_reader != nil {
- // 'body' is the buffer slice representing the body content read so far
- body = source_buffer[:0]
-
- // used to communicate slices of the buffer as they are
- // readIntoBuffer will close 'slices' when it is done with it
- slices = make(chan nextSlice)
-
- // Spin it off
- go readIntoBuffer(source_buffer, source_reader, slices)
- } else {
- // use the whole buffer
- body = source_buffer[:]
-
- // buffer is complete
- reader_status = io.EOF
- }
-
- pending_requests := make([]sliceRequest, 0)
-
- for {
- select {
- case req, valid := <-requests:
- // Handle a buffer read request
- if valid {
- if !handleReadRequest(req, body, reader_status) {
- pending_requests = append(pending_requests, req)
- }
- } else {
- // closed 'requests' channel indicates we're done
- return
- }
-
- case bk, valid := <-slices:
- // Got a new slice from the reader
- if valid {
- reader_status = bk.reader_error
-
- if bk.slice != nil {
- // adjust body bounds now that another slice has been read
- body = source_buffer[0 : len(body)+len(bk.slice)]
- }
-
- // handle pending reads
- n := 0
- for n < len(pending_requests) {
- if handleReadRequest(pending_requests[n], body, reader_status) {
- // move the element from the back of the slice to
- // position 'n', then shorten the slice by one element
- pending_requests[n] = pending_requests[len(pending_requests)-1]
- pending_requests = pending_requests[0 : len(pending_requests)-1]
- } else {
-
- // Request wasn't handled, so keep it in the request slice
- n += 1
- }
- }
- } else {
- if reader_status == nil {
- // slices channel closed without signaling EOF
- reader_status = io.ErrUnexpectedEOF
- }
- slices = nil
- }
- }
- }
-}
-
-func (this *AsyncStream) readersMonitor() {
- var readers int = 0
-
- for {
- if readers == 0 {
- select {
- case _, ok := <-this.wait_zero_readers:
- if ok {
- // nothing, just implicitly unblock the sender
- } else {
- return
- }
- case _, ok := <-this.add_reader:
- if ok {
- readers += 1
- } else {
- return
- }
- }
- } else if readers > 0 && readers < MAX_READERS {
- select {
- case _, ok := <-this.add_reader:
- if ok {
- readers += 1
- } else {
- return
- }
-
- case _, ok := <-this.subtract_reader:
- if ok {
- readers -= 1
- } else {
- return
- }
- }
- } else if readers == MAX_READERS {
- _, ok := <-this.subtract_reader
- if ok {
- readers -= 1
- } else {
- return
- }
- }
- }
-}
end
def require_auth_scope
- if @read_auths.empty?
+ unless current_user && @read_auths.any? { |auth| auth.user.andand.uuid == current_user.uuid }
if require_login != false
send_error("Forbidden", status: 403)
end
user = nil
api_client = nil
api_client_auth = nil
- supplied_token =
- params["api_token"] ||
- params["oauth_token"] ||
- env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1]
- if supplied_token
- api_client_auth = ApiClientAuthorization.
+ if request.get? || params["_method"] == 'GET'
+ reader_tokens = params["reader_tokens"]
+ if reader_tokens.is_a? String
+ reader_tokens = SafeJSON.load(reader_tokens)
+ end
+ else
+ reader_tokens = nil
+ end
+
+ # Set current_user etc. based on the primary session token if a
+ # valid one is present. Otherwise, use the first valid token in
+ # reader_tokens.
+ [params["api_token"],
+ params["oauth_token"],
+ env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1],
+ *reader_tokens,
+ ].each do |supplied|
+ next if !supplied
+ try_auth = ApiClientAuthorization.
includes(:api_client, :user).
- where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
+ where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied).
first
- if api_client_auth.andand.user
+ if try_auth.andand.user
+ api_client_auth = try_auth
user = api_client_auth.user
api_client = api_client_auth.api_client
- else
- # Token seems valid, but points to a non-existent (deleted?) user.
- api_client_auth = nil
+ break
end
end
Thread.current[:api_client_ip_address] = remote_ip
assert_response 403
end
+ test "narrow + wide scoped tokens for different users" do
+ get_args = [{
+ reader_tokens: [api_client_authorizations(:anonymous).api_token]
+ }, auth(:active_userlist)]
+ get(v1_url('users'), *get_args)
+ assert_response :success
+ get(v1_url('users', ''), *get_args) # Add trailing slash.
+ assert_response :success
+ get(v1_url('users', 'current'), *get_args)
+ assert_response 403
+ get(v1_url('virtual_machines'), *get_args)
+ assert_response 403
+ end
+
test "specimens token can see exactly owned specimens" do
get_args = [{}, auth(:active_specimens)]
get(v1_url('specimens'), *get_args)
[nil, :active_noscope].each do |main_auth|
[:spectator, :spectator_specimens].each do |read_auth|
- test "#{main_auth} auth with reader token #{read_auth} can read" do
- assert_includes(get_specimen_uuids(main_auth, read_auth),
- spectator_specimen, "did not find spectator specimen")
- end
-
- test "#{main_auth} auth with JSON read token #{read_auth} can read" do
- assert_includes(get_specimen_uuids(main_auth, read_auth, :to_json),
- spectator_specimen, "did not find spectator specimen")
- end
-
- test "#{main_auth} auth with reader token #{read_auth} can't write" do
- assert_post_denied(main_auth, read_auth)
- end
+ [:to_a, :to_json].each do |formatter|
+ test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can#{"'t" if main_auth} read" do
+ get_specimens(main_auth, read_auth)
+ assert_response(if main_auth then 403 else 200 end)
+ end
- test "#{main_auth} auth with JSON read token #{read_auth} can't write" do
- assert_post_denied(main_auth, read_auth, :to_json)
+ test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can't write" do
+ assert_post_denied(main_auth, read_auth, formatter)
+ end
end
end
end
}
}
+var errorBlacklist = []string{"Cannot connect to the Docker daemon"}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+
+func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+ for _, d := range errorBlacklist {
+ if strings.Index(goterr.Error(), d) != -1 {
+ runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+ if *brokenNodeHook == "" {
+ runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+ } else {
+ runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+ // run killme script
+ c := exec.Command(*brokenNodeHook)
+ c.Stdout = runner.CrunchLog
+ c.Stderr = runner.CrunchLog
+ err := c.Run()
+ if err != nil {
+ runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+ }
+ }
+ return true
+ }
+ }
+ return false
+}
+
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
// check for and/or load image
err = runner.LoadImage()
if err != nil {
- runner.finalState = "Cancelled"
+ if !runner.checkBrokenNode(err) {
+ // Failed to load image but not due to a "broken node"
+ // condition, probably user error.
+ runner.finalState = "Cancelled"
+ }
err = fmt.Errorf("While loading container image: %v", err)
return
}
return
}
- runner.StartCrunchstat()
-
if runner.IsCancelled() {
return
}
}
runner.finalState = "Cancelled"
+ runner.StartCrunchstat()
+
err = runner.StartContainer()
if err != nil {
+ runner.checkBrokenNode(err)
return
}
}
api.Retries = 8
- var kc *keepclient.KeepClient
- kc, err = keepclient.MakeKeepClient(api)
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
+ kc, kcerr := keepclient.MakeKeepClient(api)
+ if kcerr != nil {
+ log.Fatalf("%s: %v", containerId, kcerr)
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
- var docker *dockerclient.Client
// API version 1.21 corresponds to Docker 1.9, which is currently the
// minimum version we want to support.
- docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
- }
-
+ docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
dockerClientProxy := ThinDockerClientProxy{Docker: docker}
cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+
+ if dockererr != nil {
+ cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+ cr.checkBrokenNode(dockererr)
+ cr.CrunchLog.Close()
+ os.Exit(1)
+ }
+
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent
}
func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+ if t.finish == 2 {
+ return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+ }
+
if t.imageLoaded == image {
return dockertypes.ImageInspect{}, nil, nil
} else {
}
func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+ if t.finish == 2 {
+ return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+ }
_, err := io.Copy(ioutil.Discard, input)
if err != nil {
return dockertypes.ImageLoadResponse{}, err
if api.CalledWith("container.state", "Complete") != nil {
c.Check(err, IsNil)
}
- c.Check(api.WasSetRunning, Equals, true)
-
- c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+ if exitCode != 2 {
+ c.Check(api.WasSetRunning, Equals, true)
+ c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+ }
if err != nil {
for k, v := range api.Logs {
_, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "", 0)
c.Assert(err, NotNil)
}
+
+func (s *TestSuite) TestFullBrokenDocker1(c *C) {
+ tf, err := ioutil.TempFile("", "brokenNodeHook-")
+ c.Assert(err, IsNil)
+ defer os.Remove(tf.Name())
+
+ tf.Write([]byte(`#!/bin/sh
+exec echo killme
+`))
+ tf.Close()
+ os.Chmod(tf.Name(), 0700)
+
+ ech := tf.Name()
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
+
+}
+
+func (s *TestSuite) TestFullBrokenDocker2(c *C) {
+ ech := ""
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+}
"fmt"
"io/ioutil"
"log"
+ "math/rand"
"net/http"
"net/http/httptest"
"os"
"strings"
+ "sync"
"testing"
"time"
}
}
+func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+ router.(*proxyHandler).timeout = time.Nanosecond
+
+ buf := make([]byte, 1<<20)
+ rand.Read(buf)
+ var wg sync.WaitGroup
+ for i := 0; i < 128; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ kc.PutB(buf)
+ }()
+ }
+ done := make(chan bool)
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ c.Error("timeout")
+ }
+}
+
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
kc := runProxy(c, nil, false)
defer closeListener()
RUN pip install -U setuptools
-ENV NODEVERSION v6.11.2
+ENV NODEVERSION v6.11.4
# Install nodejs binary
RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
FROM arvados/arvbox-base
ARG arvados_version
ARG sso_version=master
+ARG composer_version=master
RUN cd /usr/src && \
git clone --no-checkout https://github.com/curoverse/arvados.git && \
git -C arvados checkout ${arvados_version} && \
git clone --no-checkout https://github.com/curoverse/sso-devise-omniauth-provider.git sso && \
- git -C sso checkout ${sso_version}
+ git -C sso checkout ${sso_version} && \
+ git clone --no-checkout https://github.com/curoverse/composer.git && \
+ git -C composer checkout ${composer_version}
ADD service/ /var/lib/arvbox/service
RUN ln -sf /var/lib/arvbox/service /etc
rm -rf tmp
mkdir -p tmp/cache
+bundle exec rake assets:precompile
bundle exec rake db:migrate
set +u
mkdir tmp
chown arvbox:arvbox tmp
+if test -s /var/lib/arvados/workbench_rails_env ; then
+ export RAILS_ENV=$(cat /var/lib/arvados/workbench_rails_env)
+else
+ export RAILS_ENV=development
+fi
+
if test "$1" != "--only-deps" ; then
exec bundle exec passenger start --port 80 \
--user arvbox --runtime-dir=/var/lib/passenger
keep_web_download_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
keep_web_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
arvados_docsite: http://$localip:${services[doc]}/
+ force_ssl: false
EOF
+bundle exec rake assets:precompile
+
(cd config && /usr/local/lib/arvbox/application_yml_override.py)