//
// SPDX-License-Identifier: AGPL-3.0
-window.CollectionsTable = {
+window.SearchResultsTable = {
maybeLoadMore: function(dom) {
var loader = this.loader
if (loader.state != loader.READY)
},
view: function(vnode) {
var loader = vnode.attrs.loader
+ var iconsMap = {
+ collections: m('i.fa.fa-fw.fa-archive'),
+ projects: m('i.fa.fa-fw.fa-folder'),
+ }
return m('table.table.table-condensed', [
m('thead', m('tr', [
m('th'),
loader.items().map(function(item) {
return m('tr', [
m('td', [
- // Guess workbench.{apihostport} is a
- // Workbench... unless the host part of
- // apihostport is an IPv4 or [IPv6]
- // address.
- item.session.baseURL.match('://(\\[|\\d+\\.\\d+\\.\\d+\\.\\d+[:/])') ? null :
+ item.workbenchBaseURL() &&
m('a.btn.btn-xs.btn-default', {
- href: item.session.baseURL.replace('://', '://workbench.')+'collections/'+item.uuid,
- }, 'Show'),
+ 'data-original-title': 'show '+item.objectType.description,
+ 'data-placement': 'top',
+ 'data-toggle': 'tooltip',
+ href: item.workbenchBaseURL()+'/'+item.objectType.wb_path+'/'+item.uuid,
+ // Bootstrap's tooltip feature
+ oncreate: function(vnode) { $(vnode.dom).tooltip() },
+ }, iconsMap[item.objectType.wb_path]),
]),
m('td.arvados-uuid', item.uuid),
m('td', item.name || '(unnamed)'),
},
}
-window.CollectionsSearch = {
+window.Search = {
oninit: function(vnode) {
vnode.state.sessionDB = new SessionDB()
vnode.state.searchEntered = m.stream()
vnode.state.loader = new MergingLoader({
children: Object.keys(sessions).map(function(key) {
var session = sessions[key]
- return new MultipageLoader({
+ var workbenchBaseURL = function() {
+ return vnode.state.sessionDB.workbenchBaseURL(session)
+ }
+ var searchable_objects = [
+ {
+ wb_path: 'projects',
+ api_path: 'arvados/v1/groups',
+ filters: [['group_class', '=', 'project']],
+ description: 'project',
+ },
+ {
+ wb_path: 'collections',
+ api_path: 'arvados/v1/collections',
+ filters: [],
+ description: 'collection',
+ },
+ ]
+ return new MergingLoader({
sessionKey: key,
- loadFunc: function(filters) {
- var tsquery = to_tsquery(q)
- if (tsquery) {
- filters = filters.slice(0)
- filters.push(['any', '@@', tsquery])
- }
- return vnode.state.sessionDB.request(session, 'arvados/v1/collections', {
- data: {
- filters: JSON.stringify(filters),
- count: 'none',
+ // For every session, search for every object type
+ children: searchable_objects.map(function(obj_type) {
+ return new MultipageLoader({
+ sessionKey: key,
+ loadFunc: function(filters) {
+ // Apply additional type dependant filters
+ filters = filters.concat(obj_type.filters)
+ var tsquery = to_tsquery(q)
+ if (tsquery) {
+ filters.push(['any', '@@', tsquery])
+ }
+ return vnode.state.sessionDB.request(session, obj_type.api_path, {
+ data: {
+ filters: JSON.stringify(filters),
+ count: 'none',
+ },
+ }).then(function(resp) {
+ resp.items.map(function(item) {
+ item.workbenchBaseURL = workbenchBaseURL
+ item.objectType = obj_type
+ })
+ return resp
+ })
},
- }).then(function(resp) {
- resp.items.map(function(item) {
- item.session = session
- })
- return resp
})
- },
+ }),
})
- })
+ }),
})
})
},
m('.row', [
m('.col-md-6', [
m('.input-group', [
- m('input#search.form-control[placeholder=Search]', {
+ m('input#search.form-control[placeholder=Search collections and projects]', {
oninput: m.withAttr('value', vnode.state.searchEntered),
value: vnode.state.searchEntered(),
}),
m('a[href="/sessions"]', 'Add/remove sites'),
]),
]),
- m(CollectionsTable, {
+ m(SearchResultsTable, {
loader: vnode.state.loader,
}),
],
return m('.container', [
m('p', [
'You can log in to multiple Arvados sites here, then use the ',
- m('a[href="/collections/multisite"]', 'multi-site search'),
- ' page to search collections on all sites at once.',
+ m('a[href="/search"]', 'multi-site search'),
+ ' page to search collections and projects on all sites at once.',
]),
m('table.table.table-condensed.table-hover', [
m('thead', m('tr', [
window.SessionDB = function() {
var db = this
Object.assign(db, {
+ discoveryCache: {},
loadFromLocalStorage: function() {
try {
return JSON.parse(window.localStorage.getItem('sessions')) || {}
})
})
},
+ // Return the Workbench base URL advertised by the session's
+ // API server, or a reasonable guess, or (if neither strategy
+ // works out) null.
+ workbenchBaseURL: function(session) {
+ var dd = db.discoveryDoc(session)()
+ if (!dd)
+ // Don't fall back to guessing until we receive the discovery doc
+ return null
+ if (dd.workbenchUrl)
+ return dd.workbenchUrl
+ // Guess workbench.{apihostport} is a Workbench... unless
+ // the host part of apihostport is an IPv4 or [IPv6]
+ // address.
+ if (!session.baseURL.match('://(\\[|\\d+\\.\\d+\\.\\d+\\.\\d+[:/])'))
+ var wbUrl = session.baseURL.replace('://', '://workbench.')
+ // Remove the trailing slash, if it's there.
+ return wbUrl.slice(-1) == '/' ? wbUrl.slice(0, -1) : wbUrl
+ return null
+ },
+ // Return a m.stream that will get fulfilled with the
+ // discovery doc from a session's API server.
+ discoveryDoc: function(session) {
+ var cache = db.discoveryCache[session.baseURL]
+ if (!cache) {
+ db.discoveryCache[session.baseURL] = cache = m.stream()
+ m.request(session.baseURL+'discovery/v1/apis/arvados/v1/rest').then(cache)
+ }
+ return cache
+ },
request: function(session, path, opts) {
opts = opts || {}
opts.headers = opts.headers || {}
skip_around_filter(:require_thread_api_token,
only: [:show_file, :show_file_links])
skip_before_filter(:find_object_by_uuid,
- only: [:provenance, :show_file, :show_file_links, :multisite])
+ only: [:provenance, :show_file, :show_file_links])
# We depend on show_file to display the user agreement:
skip_before_filter :check_user_agreements, only: :show_file
skip_before_filter :check_user_profile, only: :show_file
# SPDX-License-Identifier: AGPL-3.0
class SearchController < ApplicationController
+ skip_before_filter :ensure_arvados_api_exists
+
def find_objects_for_index
search_what = Group
if params[:project_uuid]
@client_mtx = Mutex.new
end
- def api(resources_kind, action, data=nil, tokens={})
+ def api(resources_kind, action, data=nil, tokens={}, include_anon_token=true)
profile_checkpoint
'reader_tokens' => ((tokens[:reader_tokens] ||
Thread.current[:reader_tokens] ||
[]) +
- [Rails.configuration.anonymous_user_token]).to_json,
+ (include_anon_token ? [Rails.configuration.anonymous_user_token] : [])).to_json,
}
if !data.nil?
data.each do |k,v|
end
def self.current
- res = arvados_api_client.api self, '/current'
+ res = arvados_api_client.api self, '/current', nil, {}, false
arvados_api_client.unpack_api_response(res)
end
<%=
target = Rails.configuration.multi_site_search
if target == true
- target = {controller: 'collections', action: 'multisite'}
+ target = {controller: 'search', action: 'index'}
end
link_to("Multi-site search", target, {class: 'btn btn-default'}) %>
</form>
# fetch children of all the active crs in one call, if there are any
active_crs = recent_crs.each {|cr| cr if (cr.priority.andand > 0 and cr.state != 'Final' and cr.container_uuid)}
- active_cr_uuids = active_crs.map(&:uuid)
- active_cr_containers = active_crs.map {|cr| cr.container_uuid}.compact.uniq
- cr_children = {}
- if active_cr_containers.any?
- active_cr_containers.each { |c| cr_children[c] = []}
- cols = ContainerRequest.columns.map(&:name) - %w(id updated_at mounts)
- reqs = ContainerRequest.select(cols).where(requesting_container_uuid: active_cr_containers).results
- reqs.each {|cr| cr_children[cr.requesting_container_uuid] << cr} if reqs
- end
wus = {}
outputs = []
recent_procs.each do |p|
- if p.uuid.in?(active_cr_uuids)
- wu = p.work_unit(nil, child_objects=cr_children[p.container_uuid])
- else
- wu = p.work_unit
- end
+ wu = p.work_unit
wus[p] = wu
outputs << wu.outputs
<% else %>
<div class="dashboard-panel-info-row row-<%=wu.uuid%>">
<div class="row">
- <div class="col-md-6">
+ <div class="col-md-6 text-overflow-ellipsis">
<%= link_to_if_arvados_object p, {friendly_name: true} %>
</div>
- <% if wu.is_running? %>
- <div class="col-md-6">
- <div class="progress" style="margin-bottom: 0px">
- <% wu.progress %>
- </div>
- </div>
- <% else %>
<div class="col-md-2">
<span class="label label-<%=wu.state_bootstrap_class%>"><%=wu.state_label%></span>
</div>
- <% end %>
</div>
- <%
- children = wu.children
- running = children.select { |c| c.state_label == "Running" }
- queued = children.select { |c| c.state_label == "Queued" }
- %>
-
<div class="clearfix">
Started at <%= render_localized_date(wu.started_at || wu.created_at, "noseconds") %>.
<% wu_time = Time.now - (wu.started_at || wu.created_at) %>
Active for <%= render_runtime(wu_time, false) %>.
<div class="pull-right">
- <% running.each do |r| %>
- <span class="label label-<%= r.state_bootstrap_class %>"> <%= r.label || r.state_label || 'Not ready' %> </span>
- <% end %>
- <% queued.each do |q| %>
- <span class="label label-<%= q.state_bootstrap_class %>"> <%= q.label || r.state_label || 'Not ready' %> </span>
- <% end %>
</div>
</div>
</div>
SPDX-License-Identifier: AGPL-3.0 -->
-<div data-mount-mithril="CollectionsSearch"></div>
+<div data-mount-mithril="Search"></div>
post 'remove_selected_files', on: :member
get 'tags', on: :member
post 'save_tags', on: :member
- get 'multisite', on: :collection
+ get 'multisite', on: :collection, to: redirect('/search')
end
get('/collections/download/:uuid/:reader_token/*file' => 'collections#show_file',
format: false)
get 'tab_counts', on: :member
get 'public', on: :collection
end
-
+
resources :search do
get 'choose', :on => :collection
end
match '/_health/ping', to: 'healthcheck#ping', via: [:get]
get '/tests/mithril', to: 'tests#mithril'
-
+
get '/status', to: 'status#status'
-
+
# Send unroutable requests to an arbitrary controller
# (ends up at ApplicationController#render_not_found)
match '*a', to: 'links#render_not_found', via: [:get, :post]
{
fixture: 'container_requests',
state: 'running',
- selectors: [['div.progress', true]]
+ selectors: [['.label-info', true, 'Running']]
},
{
fixture: 'pipeline_instances',
{
fixture: 'pipeline_instances',
state: 'pipeline_in_running_state',
- selectors: [['div.progress', true]]
+ selectors: [['.label-info', true, 'Running']]
},
].each do |c|
uuid = api_fixture(c[:fixture])[c[:state]]['uuid']
end
within('.recent-processes') do
- assert_text 'running'
within('.row-zzzzz-xvhdp-cr4runningcntnr') do
- assert_text 'requester_for_running_cr'
+ assert_text 'running'
end
assert_text 'zzzzz-d1hrv-twodonepipeline'
#
# SPDX-License-Identifier: AGPL-3.0
-LIBCLOUD_PIN=2.2.2.dev2
+LIBCLOUD_PIN=2.2.2.dev3
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip build-essential unzip
-
-# Install RVM
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
- curl -L https://get.rvm.io | bash -s stable && \
- /usr/local/rvm/bin/rvm install 2.3 && \
- /usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
- /usr/local/rvm/bin/rvm-exec default gem install bundler && \
- /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
-
-# Install golang binary
-ADD generated/go1.8.3.linux-amd64.tar.gz /usr/local/
-RUN ln -s /usr/local/go/bin/go /usr/local/bin/
-
-# Install nodejs and npm
-ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
-RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
-ENV WORKSPACE /arvados
-CMD ["/usr/local/rvm/bin/rvm-exec", "default", "bash", "/jenkins/run-build-packages.sh", "--target", "ubuntu1204"]
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install RVM
-RUN apt-get update && \
- apt-get -y install --no-install-recommends curl ca-certificates g++ && \
- gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
- curl -L https://get.rvm.io | bash -s stable && \
- /usr/local/rvm/bin/rvm install 2.3 && \
- /usr/local/rvm/bin/rvm alias create default ruby-2.3
-
-# udev daemon can't start in a container, so don't try.
-RUN mkdir -p /etc/udev/disabled
-
-RUN echo "deb file:///arvados/packages/ubuntu1204/ /" >>/etc/apt/sources.list
true
else
FINAL_EXITCODE=$?
+ echo
+ echo "Build packages failed for $(basename $(dirname "$dockerfile_path"))"
+ echo
fi
done
if test $FINAL_EXITCODE != 0 ; then
+ echo
echo "Build packages failed with code $FINAL_EXITCODE" >&2
+ echo
fi
exit $FINAL_EXITCODE
sdk/go/httpserver
sdk/go/manifest
sdk/go/blockdigest
-sdk/go/streamer
+sdk/go/asyncbuf
sdk/go/stats
sdk/go/crunchrunner
sdk/cwl
echo -n 'cadaver: '
cadaver --version | grep -w cadaver \
|| fatal "No cadaver. Try: apt-get install cadaver"
+ echo -n 'libattr1 xattr.h: '
+ find /usr/include -path '*/attr/xattr.h' | egrep --max-count=1 . \
+ || fatal "No libattr1 xattr.h. Try: apt-get install libattr1-dev"
+ echo -n 'libcurl curl.h: '
+ find /usr/include -path '*/curl/curl.h' | egrep --max-count=1 . \
+ || fatal "No libcurl curl.h. Try: apt-get install libcurl4-gnutls-dev"
+ echo -n 'libpq libpq-fe.h: '
+ find /usr/include -path '*/postgresql/libpq-fe.h' | egrep --max-count=1 . \
+ || fatal "No libpq libpq-fe.h. Try: apt-get install libpq-dev"
+ echo -n 'services/api/config/database.yml: '
+ if [[ ! -f "$WORKSPACE/services/api/config/database.yml" ]]; then
+ fatal "Please provide a database.yml file for the test suite"
+ else
+ echo "OK"
+ fi
+ echo -n 'postgresql: '
+ psql --version || fatal "No postgresql. Try: apt-get install postgresql postgresql-client-common"
+ echo -n 'phantomjs: '
+ phantomjs --version || fatal "No phantomjs. Try: apt-get install phantomjs"
+ echo -n 'xvfb: '
+ which Xvfb || fatal "No xvfb. Try: apt-get install xvfb"
+ echo -n 'graphviz: '
+ dot -V || fatal "No graphviz. Try: apt-get install graphviz"
}
rotate_logfile() {
start_api() {
echo 'Starting API server...'
+ if [[ ! -d "$WORKSPACE/services/api/log" ]]; then
+ mkdir -p "$WORKSPACE/services/api/log"
+ fi
+ # Remove empty api.pid file if it exists
+ if [[ -f "$WORKSPACE/tmp/api.pid" && ! -s "$WORKSPACE/tmp/api.pid" ]]; then
+ rm -f "$WORKSPACE/tmp/api.pid"
+ fi
cd "$WORKSPACE" \
&& eval $(python sdk/python/tests/run_test_server.py start --auth admin) \
&& export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
sdk/go/health
sdk/go/httpserver
sdk/go/manifest
- sdk/go/streamer
+ sdk/go/asyncbuf
sdk/go/crunchrunner
sdk/go/stats
lib/crunchstat
- user/topics/run-command.html.textile.liquid
- user/reference/job-pipeline-ref.html.textile.liquid
- user/examples/crunch-examples.html.textile.liquid
+ - user/topics/arv-sync-groups.html.textile.liquid
- Query the metadata database:
- user/topics/tutorial-trait-search.html.textile.liquid
- Arvados License:
|==--submit-runner-ram== SUBMIT_RUNNER_RAM|RAM (in MiB) required for the workflow runner job (default 1024)|
|==--submit-runner-image== SUBMIT_RUNNER_IMAGE|Docker image for workflow runner job, default arvados/jobs|
|==--name== NAME| Name to use for workflow execution instance.|
-|==--on-error {stop,continue}|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.|
+|==--on-error {stop,continue}==|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.|
|==--enable-dev==| Enable loading and running development versions of CWL spec.|
|==--intermediate-output-ttl== N|If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).|
|==--trash-intermediate==| Immediately trash intermediate outputs on workflow success.|
--- /dev/null
+---
+layout: default
+navsection: userguide
+title: "Using arv-sync-groups"
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+The @arv-sync-groups@ tool allows to synchronize remote groups into Arvados from an external source.
+
+h1. Using arv-sync-groups
+
+This tool reads a CSV (comma-separated values) file having information about external groups and their members. When running it for the first time, it'll create a special group named 'Externally synchronized groups' meant to be the parent of all the remote groups.
+
+Every line on the file should have 2 values: a group name and a local user identifier, meaning that the named user is a member of the group. The tool will create the group if it doesn't exist, and add the user to it. If group member is not present on the input file, the account will be removed from the group.
+
+Users can be identified by their email address or username: the tool will check if every user exist on the system, and report back when not found. Groups on the other hand, are identified by their name.
+
+This tool is designed to be run periodically reading a file created by a remote auth system (ie: LDAP) dump script, applying what's included on the file as the source of truth.
+
+
+bq. NOTE: @arv-sync-groups@ needs to perform several administrative tasks on Arvados, so must be run using a superuser token
+
+h2. Options
+
+The following command line options are supported:
+
+table(table table-bordered table-condensed).
+|_. Option |_. Description |
+|==--help==| This list of options|
+|==--parent-group-uuid==| UUID of group to own all the externally synchronized groups|
+|==--user-id== | Identifier to use in looking up user. One of 'email' or 'username' (Default: 'email')|
+|==--verbose==| Log informational messages (Default: False)|
+|==--version==| Print version and exit|
+
+h2. Examples
+
+To sync groups using the username to identify every account, reading from some @external_groups.csv@ file, the command should be called as follows:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arv-sync-groups --user-id username /path/to/external_groups.csv </span>
+</code></pre>
+</notextile>
+
+If you want to use a specific preexisting group as the parent of all the remote groups, you can do it this way:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arv-sync-groups --parent-group-uuid <preexisting group UUID> --user-id username /path/to/external_groups.csv </span>
+</code></pre>
+</notextile>
# apt.arvados.org
deb http://apt.arvados.org/ jessie main
+deb http://apt.arvados.org/ jessie-dev main
arvargs.conformance_test = None
arvargs.use_container = True
arvargs.relax_path_checks = True
- arvargs.validate = None
arvargs.print_supported_versions = False
make_fs_access = partial(CollectionFsAccess,
import ruamel.yaml as yaml
from .runner import upload_dependencies, packed_workflow, upload_workflow_collection, trim_anonymous_location, remove_redundant_fields
-from .pathmapper import trim_listing
+from .pathmapper import ArvPathMapper, trim_listing
from .arvtool import ArvadosCommandTool
from .perf import Perf
False)
with Perf(metrics, "subworkflow adjust"):
+ joborder_resolved = copy.deepcopy(joborder)
joborder_keepmount = copy.deepcopy(joborder)
+ reffiles = []
+ visit_class(joborder_keepmount, ("File", "Directory"), lambda x: reffiles.append(x))
+
+ mapper = ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ "/keep/%s",
+ "/keep/%s/%s",
+ **kwargs)
+
def keepmount(obj):
remove_redundant_fields(obj)
with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
if obj["location"].startswith("keep:"):
- obj["location"] = "/keep/" + obj["location"][5:]
+ obj["location"] = mapper.mapper(obj["location"]).target
if "listing" in obj:
del obj["listing"]
elif obj["location"].startswith("_:"):
visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
+ def resolved(obj):
+ if obj["location"].startswith("keep:"):
+ obj["location"] = mapper.mapper(obj["location"]).resolved
+
+ visit_class(joborder_resolved, ("File", "Directory"), resolved)
+
if self.wf_pdh is None:
adjustFileObjs(packed, keepmount)
adjustDirObjs(packed, keepmount)
})
kwargs["loader"] = self.doc_loader
kwargs["avsc_names"] = self.doc_schema
- return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder, output_callback, **kwargs)
+ return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
else:
return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
'cwltool==1.0.20170928192020',
- 'schema-salad==2.6.20170927145003',
+ 'schema-salad==2.6.20171116190026',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
'arvados-python-client>=0.1.20170526013812',
# SPDX-License-Identifier: Apache-2.0
if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
- arv-put --portable-data-hash testdir
+ arv-put --portable-data-hash testdir/*
fi
exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum
out: null
tool: 12418-glob-empty-collection.cwl
doc: "Test glob output on empty collection"
+
+- job: null
+ output:
+ out: out
+ tool: wf/runin-wf.cwl
+ doc: "RunInSingleContainer cwl.input.json needs to be consistent with pathmapper manipulations"
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: Workflow
+cwlVersion: v1.0
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+inputs:
+ sleeptime:
+ type: int
+ default: 5
+ fileblub:
+ type: File
+ default:
+ class: File
+ location: keep:d7514270f356df848477718d58308cc4+94/a
+ secondaryFiles:
+ - class: File
+ location: keep:d7514270f356df848477718d58308cc4+94/b
+outputs:
+ out:
+ type: string
+ outputSource: substep/out
+requirements:
+ SubworkflowFeatureRequirement: {}
+ ScatterFeatureRequirement: {}
+ InlineJavascriptRequirement: {}
+ StepInputExpressionRequirement: {}
+steps:
+ substep:
+ in:
+ sleeptime: sleeptime
+ fileblub: fileblub
+ out: [out]
+ hints:
+ - class: arv:RunInSingleContainer
+ run:
+ class: Workflow
+ id: mysub
+ inputs:
+ fileblub: File
+ outputs:
+ out:
+ type: string
+ outputSource: sleep1/out
+ steps:
+ sleep1:
+ in:
+ fileblub: fileblub
+ out: [out]
+ run:
+ class: CommandLineTool
+ id: subtool
+ inputs:
+ fileblub:
+ type: File
+ inputBinding: {position: 1}
+ outputs:
+ out:
+ type: string
+ outputBinding:
+ outputEval: "out"
+ baseCommand: cat
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+ "bytes"
+ "io"
+ "sync"
+)
+
+// A Buffer is an io.Writer that distributes written data
+// asynchronously to multiple concurrent readers.
+//
+// NewReader() can be called at any time. In all cases, every returned
+// io.Reader reads all data written to the Buffer.
+//
+// Behavior is undefined if Write is called after Close or
+// CloseWithError.
+type Buffer interface {
+ io.WriteCloser
+
+ // NewReader() returns an io.Reader that reads all data
+ // written to the Buffer.
+ NewReader() io.Reader
+
+ // Close, but return the given error (instead of io.EOF) to
+ // all readers when they reach the end of the buffer.
+ //
+ // CloseWithError(nil) is equivalent to
+ // CloseWithError(io.EOF).
+ CloseWithError(error) error
+}
+
+type buffer struct {
+ data *bytes.Buffer
+ cond sync.Cond
+ err error // nil if there might be more writes
+}
+
+// NewBuffer creates a new Buffer using buf as its initial
+// contents. The new Buffer takes ownership of buf, and the caller
+// should not use buf after this call.
+func NewBuffer(buf []byte) Buffer {
+ return &buffer{
+ data: bytes.NewBuffer(buf),
+ cond: sync.Cond{L: &sync.Mutex{}},
+ }
+}
+
+func (b *buffer) Write(p []byte) (int, error) {
+ defer b.cond.Broadcast()
+ b.cond.L.Lock()
+ defer b.cond.L.Unlock()
+ if b.err != nil {
+ return 0, b.err
+ }
+ return b.data.Write(p)
+}
+
+func (b *buffer) Close() error {
+ return b.CloseWithError(nil)
+}
+
+func (b *buffer) CloseWithError(err error) error {
+ defer b.cond.Broadcast()
+ b.cond.L.Lock()
+ defer b.cond.L.Unlock()
+ if err == nil {
+ b.err = io.EOF
+ } else {
+ b.err = err
+ }
+ return nil
+}
+
+func (b *buffer) NewReader() io.Reader {
+ return &reader{b: b}
+}
+
+type reader struct {
+ b *buffer
+ read int // # bytes already read
+}
+
+func (r *reader) Read(p []byte) (int, error) {
+ r.b.cond.L.Lock()
+ for {
+ switch {
+ case r.read < r.b.data.Len():
+ buf := r.b.data.Bytes()
+ r.b.cond.L.Unlock()
+ n := copy(p, buf[r.read:])
+ r.read += n
+ return n, nil
+ case r.b.err != nil || len(p) == 0:
+ // r.b.err != nil means we reached EOF. And
+ // even if we're not at EOF, there's no need
+ // to block if len(p)==0.
+ err := r.b.err
+ r.b.cond.L.Unlock()
+ return 0, err
+ default:
+ r.b.cond.Wait()
+ }
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+ "crypto/md5"
+ "errors"
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestNoWrites(c *check.C) {
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ r2 := b.NewReader()
+ b.Close()
+ s.checkReader(c, r1, []byte{}, nil, nil)
+ s.checkReader(c, r2, []byte{}, nil, nil)
+}
+
+func (s *Suite) TestNoReaders(c *check.C) {
+ b := NewBuffer(nil)
+ n, err := b.Write([]byte("foobar"))
+ err2 := b.Close()
+ c.Check(n, check.Equals, 6)
+ c.Check(err, check.IsNil)
+ c.Check(err2, check.IsNil)
+}
+
+func (s *Suite) TestWriteReadClose(c *check.C) {
+ done := make(chan bool, 2)
+ b := NewBuffer(nil)
+ n, err := b.Write([]byte("foobar"))
+ c.Check(n, check.Equals, 6)
+ c.Check(err, check.IsNil)
+ r1 := b.NewReader()
+ r2 := b.NewReader()
+ go s.checkReader(c, r1, []byte("foobar"), nil, done)
+ go s.checkReader(c, r2, []byte("foobar"), nil, done)
+ time.Sleep(time.Millisecond)
+ c.Check(len(done), check.Equals, 0)
+ b.Close()
+ <-done
+ <-done
+}
+
+func (s *Suite) TestPrefillWriteCloseRead(c *check.C) {
+ done := make(chan bool, 2)
+ b := NewBuffer([]byte("baz"))
+ n, err := b.Write([]byte("waz"))
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+ b.Close()
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwaz"), nil, done)
+ r2 := b.NewReader()
+ go s.checkReader(c, r2, []byte("bazwaz"), nil, done)
+ <-done
+ <-done
+}
+
+func (s *Suite) TestWriteReadCloseRead(c *check.C) {
+ done := make(chan bool, 1)
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwazqux"), nil, done)
+
+ b.Write([]byte("bazwaz"))
+
+ r2 := b.NewReader()
+ r2.Read(make([]byte, 3))
+
+ b.Write([]byte("qux"))
+ b.Close()
+
+ s.checkReader(c, r2, []byte("wazqux"), nil, nil)
+ <-done
+}
+
+func (s *Suite) TestReadAtEOF(c *check.C) {
+ buf := make([]byte, 8)
+
+ b := NewBuffer([]byte{1, 2, 3})
+
+ r := b.NewReader()
+ n, err := r.Read(buf)
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+
+ // Reading zero bytes at EOF, but before Close(), doesn't
+ // block or error
+ done := make(chan bool)
+ go func() {
+ defer close(done)
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.IsNil)
+ }()
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ c.Error("timeout")
+ }
+
+ b.Close()
+
+ // Reading zero bytes after Close() returns EOF
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+
+ // Reading from start after Close() returns 3 bytes, then EOF
+ r = b.NewReader()
+ n, err = r.Read(buf)
+ c.Check(n, check.Equals, 3)
+ if err != nil {
+ c.Check(err, check.Equals, io.EOF)
+ }
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+ n, err = r.Read(buf)
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+}
+
+func (s *Suite) TestCloseWithError(c *check.C) {
+ errFake := errors.New("it's not even a real error")
+
+ done := make(chan bool, 1)
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done)
+
+ b.Write([]byte("bazwaz"))
+
+ r2 := b.NewReader()
+ r2.Read(make([]byte, 3))
+
+ b.Write([]byte("qux"))
+ b.CloseWithError(errFake)
+
+ s.checkReader(c, r2, []byte("wazqux"), errFake, nil)
+ <-done
+}
+
+// Write n*n bytes, n at a time; read them into n goroutines using
+// varying buffer sizes; compare checksums.
+func (s *Suite) TestManyReaders(c *check.C) {
+ const n = 256
+
+ b := NewBuffer(nil)
+
+ expectSum := make(chan []byte)
+ go func() {
+ hash := md5.New()
+ buf := make([]byte, n)
+ for i := 0; i < n; i++ {
+ time.Sleep(10 * time.Nanosecond)
+ rand.Read(buf)
+ b.Write(buf)
+ hash.Write(buf)
+ }
+ expectSum <- hash.Sum(nil)
+ b.Close()
+ }()
+
+ gotSum := make(chan []byte)
+ for i := 0; i < n; i++ {
+ go func(bufSize int) {
+ got := md5.New()
+ io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize))
+ gotSum <- got.Sum(nil)
+ }(i + n/2)
+ }
+
+ expect := <-expectSum
+ for i := 0; i < n; i++ {
+ c.Check(expect, check.DeepEquals, <-gotSum)
+ }
+}
+
+func (s *Suite) BenchmarkOneReader(c *check.C) {
+ s.benchmarkReaders(c, 1)
+}
+
+func (s *Suite) BenchmarkManyReaders(c *check.C) {
+ s.benchmarkReaders(c, 100)
+}
+
+func (s *Suite) benchmarkReaders(c *check.C, readers int) {
+ var n int64
+ t0 := time.Now()
+
+ buf := make([]byte, 10000)
+ rand.Read(buf)
+ for i := 0; i < 10; i++ {
+ b := NewBuffer(nil)
+ go func() {
+ for i := 0; i < c.N; i++ {
+ b.Write(buf)
+ }
+ b.Close()
+ }()
+
+ var wg sync.WaitGroup
+ for i := 0; i < readers; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ nn, _ := io.Copy(ioutil.Discard, b.NewReader())
+ atomic.AddInt64(&n, int64(nn))
+ }()
+ }
+ wg.Wait()
+ }
+ c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000)
+}
+
+func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
+ buf, err := ioutil.ReadAll(r)
+ c.Check(err, check.Equals, expectError)
+ c.Check(buf, check.DeepEquals, expectData)
+ if done != nil {
+ done <- true
+ }
+}
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
package httpserver
import (
+ "math/rand"
+ "net/http"
"strconv"
"sync"
"time"
// Prefix is prepended to each returned ID.
Prefix string
- lastID int64
- mtx sync.Mutex
+ mtx sync.Mutex
+ src rand.Source
}
// Next returns a new ID string. It is safe to call Next from multiple
// goroutines.
func (g *IDGenerator) Next() string {
- id := time.Now().UnixNano()
g.mtx.Lock()
- if id <= g.lastID {
- id = g.lastID + 1
+ defer g.mtx.Unlock()
+ if g.src == nil {
+ g.src = rand.NewSource(time.Now().UnixNano())
}
- g.lastID = id
- g.mtx.Unlock()
- return g.Prefix + strconv.FormatInt(id, 36)
+ a, b := g.src.Int63(), g.src.Int63()
+ id := strconv.FormatInt(a, 36) + strconv.FormatInt(b, 36)
+ for len(id) > 20 {
+ id = id[:20]
+ }
+ return g.Prefix + id
+}
+
+// AddRequestIDs wraps an http.Handler, adding an X-Request-Id header
+// to each request that doesn't already have one.
+func AddRequestIDs(h http.Handler) http.Handler {
+ gen := &IDGenerator{Prefix: "req-"}
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ if req.Header.Get("X-Request-Id") == "" {
+ req.Header.Set("X-Request-Id", gen.Next())
+ }
+ h.ServeHTTP(w, req)
+ })
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+ "context"
+ "net/http"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/stats"
+ log "github.com/Sirupsen/logrus"
+)
+
+type contextKey struct {
+ name string
+}
+
+var requestTimeContextKey = contextKey{"requestTime"}
+
+// LogRequests wraps an http.Handler, logging each request and
+// response via logrus.
+func LogRequests(h http.Handler) http.Handler {
+ return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
+ w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
+ req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
+ lgr := log.WithFields(log.Fields{
+ "RequestID": req.Header.Get("X-Request-Id"),
+ "remoteAddr": req.RemoteAddr,
+ "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+ "reqMethod": req.Method,
+ "reqPath": req.URL.Path[1:],
+ "reqBytes": req.ContentLength,
+ })
+ logRequest(w, req, lgr)
+ defer logResponse(w, req, lgr)
+ h.ServeHTTP(w, req)
+ })
+}
+
+func logRequest(w *responseTimer, req *http.Request, lgr *log.Entry) {
+ lgr.Info("request")
+}
+
+func logResponse(w *responseTimer, req *http.Request, lgr *log.Entry) {
+ if tStart, ok := req.Context().Value(&requestTimeContextKey).(time.Time); ok {
+ tDone := time.Now()
+ lgr = lgr.WithFields(log.Fields{
+ "timeTotal": stats.Duration(tDone.Sub(tStart)),
+ "timeToStatus": stats.Duration(w.writeTime.Sub(tStart)),
+ "timeWriteBody": stats.Duration(tDone.Sub(w.writeTime)),
+ })
+ }
+ lgr.WithFields(log.Fields{
+ "respStatusCode": w.WroteStatus(),
+ "respStatus": http.StatusText(w.WroteStatus()),
+ "respBytes": w.WroteBodyBytes(),
+ }).Info("response")
+}
+
+type responseTimer struct {
+ ResponseWriter
+ wrote bool
+ writeTime time.Time
+}
+
+func (rt *responseTimer) WriteHeader(code int) {
+ if !rt.wrote {
+ rt.wrote = true
+ rt.writeTime = time.Now()
+ }
+ rt.ResponseWriter.WriteHeader(code)
+}
+
+func (rt *responseTimer) Write(p []byte) (int, error) {
+ if !rt.wrote {
+ rt.wrote = true
+ rt.writeTime = time.Now()
+ }
+ return rt.ResponseWriter.Write(p)
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+ "bytes"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "testing"
+ "time"
+
+ log "github.com/Sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestLogRequests(c *check.C) {
+ defer log.SetOutput(os.Stdout)
+ captured := &bytes.Buffer{}
+ log.SetOutput(captured)
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: time.RFC3339Nano,
+ })
+ h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.Write([]byte("hello world"))
+ })
+ req, err := http.NewRequest("GET", "https://foo.example/bar", nil)
+ req.Header.Set("X-Forwarded-For", "1.2.3.4:12345")
+ c.Assert(err, check.IsNil)
+ resp := httptest.NewRecorder()
+ AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req)
+
+ dec := json.NewDecoder(captured)
+
+ gotReq := make(map[string]interface{})
+ err = dec.Decode(&gotReq)
+ c.Logf("%#v", gotReq)
+ c.Check(gotReq["RequestID"], check.Matches, "req-[a-z0-9]{20}")
+ c.Check(gotReq["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+ c.Check(gotReq["msg"], check.Equals, "request")
+
+ gotResp := make(map[string]interface{})
+ err = dec.Decode(&gotResp)
+ c.Logf("%#v", gotResp)
+ c.Check(gotResp["RequestID"], check.Equals, gotReq["RequestID"])
+ c.Check(gotResp["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+ c.Check(gotResp["msg"], check.Equals, "response")
+
+ c.Assert(gotResp["time"], check.FitsTypeOf, "")
+ _, err = time.Parse(time.RFC3339Nano, gotResp["time"].(string))
+ c.Check(err, check.IsNil)
+
+ for _, key := range []string{"timeToStatus", "timeWriteBody", "timeTotal"} {
+ c.Assert(gotResp[key], check.FitsTypeOf, float64(0))
+ c.Check(gotResp[key].(float64), check.Not(check.Equals), float64(0))
+ }
+}
_, err = io.Copy(this.Hash, this.Reader)
if closer, ok := this.Reader.(io.Closer); ok {
- err2 := closer.Close()
- if err2 != nil && err == nil {
- return err2
+ closeErr := closer.Close()
+ if err == nil {
+ err = closeErr
}
}
if err != nil {
return err
}
-
- sum := this.Hash.Sum(nil)
- if fmt.Sprintf("%x", sum) != this.Check {
- err = BadChecksum
+ if fmt.Sprintf("%x", this.Hash.Sum(nil)) != this.Check {
+ return BadChecksum
}
-
- return err
+ return nil
}
"time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
+ "git.curoverse.com/arvados.git/sdk/go/asyncbuf"
)
// A Keep "block" is 64MB.
bufsize = BLOCKSIZE
}
- t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
- defer t.Close()
-
- return kc.putReplicas(hash, t, dataBytes)
+ buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+ go func() {
+ _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+ buf.CloseWithError(err)
+ }()
+ return kc.putReplicas(hash, buf.NewReader, dataBytes)
}
// PutHB writes a block to Keep. The hash of the bytes is given in
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
- t := streamer.AsyncStreamFromSlice(buf)
- defer t.Close()
- return kc.putReplicas(hash, t, int64(len(buf)))
+ newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+ return kc.putReplicas(hash, newReader, int64(len(buf)))
}
// PutB writes a block to Keep. It computes the hash itself.
package keepclient
import (
+ "bytes"
"crypto/md5"
"errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
. "gopkg.in/check.v1"
)
make(chan string)}
UploadToStubHelper(c, st,
- func(kc *KeepClient, url string, reader io.ReadCloser,
- writer io.WriteCloser, upload_status chan uploadStatus) {
-
- tr := streamer.AsyncStreamFromReader(512, reader)
- defer tr.Close()
-
- br1 := tr.MakeStreamReader()
-
- go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
-
- writer.Write([]byte("foo"))
- writer.Close()
+ func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
+ go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, 0)
<-st.handled
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
)
// Function used to emit debug messages. The easiest way to enable
response string
}
-func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
upload_status chan<- uploadStatus, expectedLength int64, requestID int32) {
var req *http.Request
if req, err = http.NewRequest("PUT", url, nil); err != nil {
DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error())
upload_status <- uploadStatus{err, url, 0, 0, ""}
- body.Close()
return
}
req.ContentLength = expectedLength
if expectedLength > 0 {
- // Do() will close the body ReadCloser when it is done
- // with it.
- req.Body = body
+ req.Body = ioutil.NopCloser(body)
} else {
- // "For client requests, a value of 0 means unknown if Body is
- // not nil." In this case we do want the body to be empty, so
- // don't set req.Body. However, we still need to close the
- // body ReadCloser.
- body.Close()
+ // "For client requests, a value of 0 means unknown if
+ // Body is not nil." In this case we do want the body
+ // to be empty, so don't set req.Body.
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
func (this *KeepClient) putReplicas(
hash string,
- tr *streamer.AsyncStream,
+ getReader func() io.Reader,
expectedLength int64) (locator string, replicas int, err error) {
// Generate an arbitrary ID to identify this specific
// Start some upload requests
if next_server < len(sv) {
DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
+ go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID)
next_server += 1
active += 1
} else {
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* AsyncStream pulls data in from a io.Reader source (such as a file or network
-socket) and fans out to any number of StreamReader sinks.
-
-Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
-any point in the lifetime of the AsyncStream, and each StreamReader will read
-the contents of the buffer up to the "frontier" of the buffer, at which point
-the StreamReader blocks until new data is read from the source.
-
-This is useful for minimizing readthrough latency as sinks can read and act on
-data from the source without waiting for the source to be completely buffered.
-It is also useful as a cache in situations where re-reading the original source
-potentially is costly, since the buffer retains a copy of the source data.
-
-Usage:
-
-Begin reading into a buffer with maximum size 'buffersize' from 'source':
- stream := AsyncStreamFromReader(buffersize, source)
-
-To create a new reader (this can be called multiple times, each reader starts
-at the beginning of the buffer):
- reader := tr.MakeStreamReader()
-
-Make sure to close the reader when you're done with it.
- reader.Close()
-
-When you're done with the stream:
- stream.Close()
-
-Alternately, if you already have a filled buffer and just want to read out from it:
- stream := AsyncStreamFromSlice(buf)
-
- r := tr.MakeStreamReader()
-
-*/
-
-package streamer
-
-import (
- "errors"
- "io"
-)
-
-var ErrAlreadyClosed = errors.New("cannot close a stream twice")
-
-type AsyncStream struct {
- buffer []byte
- requests chan sliceRequest
- add_reader chan bool
- subtract_reader chan bool
- wait_zero_readers chan bool
- closed bool
-}
-
-// Reads from the buffer managed by the Transfer()
-type StreamReader struct {
- offset int
- stream *AsyncStream
- responses chan sliceResult
-}
-
-func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
- t := &AsyncStream{
- buffer: make([]byte, buffersize),
- requests: make(chan sliceRequest),
- add_reader: make(chan bool),
- subtract_reader: make(chan bool),
- wait_zero_readers: make(chan bool),
- }
-
- go t.transfer(source)
- go t.readersMonitor()
-
- return t
-}
-
-func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{
- buffer: buf,
- requests: make(chan sliceRequest),
- add_reader: make(chan bool),
- subtract_reader: make(chan bool),
- wait_zero_readers: make(chan bool),
- }
-
- go t.transfer(nil)
- go t.readersMonitor()
-
- return t
-}
-
-func (this *AsyncStream) MakeStreamReader() *StreamReader {
- this.add_reader <- true
- return &StreamReader{0, this, make(chan sliceResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this *StreamReader) Read(p []byte) (n int, err error) {
- this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
- rr, valid := <-this.responses
- if valid {
- this.offset += len(rr.slice)
- return copy(p, rr.slice), rr.err
- } else {
- return 0, io.ErrUnexpectedEOF
- }
-}
-
-func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
- // Record starting offset in order to correctly report the number of bytes sent
- starting_offset := this.offset
- for {
- this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
- rr, valid := <-this.responses
- if valid {
- this.offset += len(rr.slice)
- if rr.err != nil {
- if rr.err == io.EOF {
- // EOF is not an error.
- return int64(this.offset - starting_offset), nil
- } else {
- return int64(this.offset - starting_offset), rr.err
- }
- } else {
- dest.Write(rr.slice)
- }
- } else {
- return int64(this.offset), io.ErrUnexpectedEOF
- }
- }
-}
-
-// Close the responses channel
-func (this *StreamReader) Close() error {
- if this.stream == nil {
- return ErrAlreadyClosed
- }
- this.stream.subtract_reader <- true
- close(this.responses)
- this.stream = nil
- return nil
-}
-
-func (this *AsyncStream) Close() error {
- if this.closed {
- return ErrAlreadyClosed
- }
- this.closed = true
- this.wait_zero_readers <- true
- close(this.requests)
- close(this.add_reader)
- close(this.subtract_reader)
- close(this.wait_zero_readers)
- return nil
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package streamer
-
-import (
- . "gopkg.in/check.v1"
- "io"
- "testing"
- "time"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) { TestingT(t) }
-
-var _ = Suite(&StandaloneSuite{})
-
-// Standalone tests
-type StandaloneSuite struct{}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
- ReadIntoBufferHelper(c, 225)
- ReadIntoBufferHelper(c, 224)
-}
-
-func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-}
-
-func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 96)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 96; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 96) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-}
-
-func ReadIntoBufferHelper(c *C, bufsize int) {
- buffer := make([]byte, bufsize)
-
- reader, writer := io.Pipe()
- slices := make(chan nextSlice)
-
- go readIntoBuffer(buffer, reader, slices)
-
- HelperWrite128andCheck(c, buffer, writer, slices)
- HelperWrite96andCheck(c, buffer, writer, slices)
-
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.EOF)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
- buffer := make([]byte, 223)
- reader, writer := io.Pipe()
- slices := make(chan nextSlice)
-
- go readIntoBuffer(buffer, reader, slices)
-
- HelperWrite128andCheck(c, buffer, writer, slices)
-
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
-
- // Write will deadlock because it can't write all the data, so
- // spin it off to a goroutine
- go writer.Write(out)
- s1 := <-slices
-
- c.Check(len(s1.slice), Equals, 95)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 95; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 95) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-
- writer.Close()
- s1 = <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(512, reader)
-
- br1 := tr.MakeStreamReader()
- out := make([]byte, 128)
-
- {
- // Write some data, and read into a buffer shorter than
- // available data
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
-
- writer.Write(out[:100])
-
- in := make([]byte, 64)
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Write some more data, and read into buffer longer than
- // available data
- in := make([]byte, 64)
- n, err := br1.Read(in)
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, out[64+i])
- }
-
- }
-
- {
- // Test read before write
- type Rd struct {
- n int
- err error
- }
- rd := make(chan Rd)
- in := make([]byte, 64)
-
- go func() {
- n, err := br1.Read(in)
- rd <- Rd{n, err}
- }()
-
- time.Sleep(100 * time.Millisecond)
- writer.Write(out[100:])
-
- got := <-rd
-
- c.Check(got.n, Equals, 28)
- c.Check(got.err, Equals, nil)
-
- for i := 0; i < 28; i += 1 {
- c.Check(in[i], Equals, out[100+i])
- }
- }
-
- br2 := tr.MakeStreamReader()
- {
- // Test 'catch up' reader
- in := make([]byte, 256)
- n, err := br2.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Test closing the reader
- writer.Close()
-
- in := make([]byte, 256)
- n1, err1 := br1.Read(in)
- n2, err2 := br2.Read(in)
- c.Check(n1, Equals, 0)
- c.Check(err1, Equals, io.EOF)
- c.Check(n2, Equals, 0)
- c.Check(err2, Equals, io.EOF)
- }
-
- {
- // Test 'catch up' reader after closing
- br3 := tr.MakeStreamReader()
- in := make([]byte, 256)
- n, err := br3.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
-
- n, err = br3.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(100, reader)
- defer tr.Close()
-
- sr := tr.MakeStreamReader()
- defer sr.Close()
-
- out := make([]byte, 101)
- go writer.Write(out)
-
- n, err := sr.Read(out)
- c.Check(n, Equals, 100)
- c.Check(err, IsNil)
-
- n, err = sr.Read(out)
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- tr := AsyncStreamFromSlice(buffer)
-
- br1 := tr.MakeStreamReader()
-
- in := make([]byte, 64)
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, buffer[i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, buffer[64+i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- tr := AsyncStreamFromSlice(buffer)
- defer tr.Close()
-
- br1 := tr.MakeStreamReader()
- defer br1.Close()
-
- reader, writer := io.Pipe()
-
- go func() {
- p := make([]byte, 100)
- n, err := reader.Read(p)
- c.Check(n, Equals, 100)
- c.Check(err, Equals, nil)
- c.Check(p, DeepEquals, buffer)
- }()
-
- io.Copy(writer, br1)
-}
-
-func (s *StandaloneSuite) TestManyReaders(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(512, reader)
- defer tr.Close()
-
- sr := tr.MakeStreamReader()
- go func() {
- time.Sleep(100 * time.Millisecond)
- sr.Close()
- }()
-
- for i := 0; i < 200; i += 1 {
- go func() {
- br1 := tr.MakeStreamReader()
- defer br1.Close()
-
- p := make([]byte, 3)
- n, err := br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("foo"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("bar"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("baz"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }()
- }
-
- writer.Write([]byte("foo"))
- writer.Write([]byte("bar"))
- writer.Write([]byte("baz"))
- writer.Close()
-}
-
-func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
- buffer := make([]byte, 100)
- tr := AsyncStreamFromSlice(buffer)
- sr := tr.MakeStreamReader()
- c.Check(sr.Close(), IsNil)
- c.Check(sr.Close(), Equals, ErrAlreadyClosed)
- c.Check(tr.Close(), IsNil)
- c.Check(tr.Close(), Equals, ErrAlreadyClosed)
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* Internal implementation of AsyncStream.
-Outline of operation:
-
-The kernel is the transfer() goroutine. It manages concurrent reads and
-appends to the "body" slice. "body" is a slice of "source_buffer" that
-represents the segment of the buffer that is already filled in and available
-for reading.
-
-To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
-from the io.Reader source directly into source_buffer. Each read goes into a
-slice of buffer which spans the section immediately following the end of the
-current "body". Each time a Read completes, a slice representing the the
-section just filled in (or any read errors/EOF) is sent over the "slices"
-channel back to the transfer() function.
-
-Meanwhile, the transfer() function selects() on two channels, the "requests"
-channel and the "slices" channel.
-
-When a message is received on the "slices" channel, this means the a new
-section of the buffer has data, or an error is signaled. Since the data has
-been read directly into the source_buffer, it is able to simply increases the
-size of the body slice to encompass the newly filled in section. Then any
-pending reads are serviced with handleReadRequest (described below).
-
-When a message is received on the "requests" channel, it means a StreamReader
-wants access to a slice of the buffer. This is passed to handleReadRequest().
-
-The handleReadRequest() function takes a sliceRequest consisting of a buffer
-offset, maximum size, and channel to send the response. If there was an error
-reported from the source reader, it is returned. If the offset is less than
-the size of the body, the request can proceed, and it sends a body slice
-spanning the segment from offset to min(offset+maxsize, end of the body). If
-source reader status is EOF (done filling the buffer) and the read request
-offset is beyond end of the body, it responds with EOF. Otherwise, the read
-request is for a slice beyond the current size of "body" but we expect the body
-to expand as more data is added, so the request gets added to a wait list.
-
-The transfer() runs until the requests channel is closed by AsyncStream.Close()
-
-To track readers, streamer uses the readersMonitor() goroutine. This goroutine
-chooses which channels to receive from based on the number of outstanding
-readers. When a new reader is created, it sends a message on the add_reader
-channel. If the number of readers is already at MAX_READERS, this blocks the
-sender until an existing reader is closed. When a reader is closed, it sends a
-message on the subtract_reader channel. Finally, when AsyncStream.Close() is
-called, it sends a message on the wait_zero_readers channel, which will block
-the sender unless there are zero readers and it is safe to shut down the
-AsyncStream.
-*/
-
-package streamer
-
-import (
- "io"
-)
-
-const MAX_READERS = 100
-
-// A slice passed from readIntoBuffer() to transfer()
-type nextSlice struct {
- slice []byte
- reader_error error
-}
-
-// A read request to the Transfer() function
-type sliceRequest struct {
- offset int
- maxsize int
- result chan<- sliceResult
-}
-
-// A read result from the Transfer() function
-type sliceResult struct {
- slice []byte
- err error
-}
-
-// Supports writing into a buffer
-type bufferWriter struct {
- buf []byte
- ptr int
-}
-
-// Copy p into this.buf, increment pointer and return number of bytes read.
-func (this *bufferWriter) Write(p []byte) (n int, err error) {
- n = copy(this.buf[this.ptr:], p)
- this.ptr += n
- return n, nil
-}
-
-// Read repeatedly from the reader and write sequentially into the specified
-// buffer, and report each read to channel 'c'. Completes when Reader 'r'
-// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
- defer close(slices)
-
- if writeto, ok := r.(io.WriterTo); ok {
- n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
- if err != nil {
- slices <- nextSlice{nil, err}
- } else {
- slices <- nextSlice{buffer[:n], nil}
- slices <- nextSlice{nil, io.EOF}
- }
- return
- } else {
- // Initially entire buffer is available
- ptr := buffer[:]
- for {
- var n int
- var err error
- if len(ptr) > 0 {
- const readblock = 64 * 1024
- // Read 64KiB into the next part of the buffer
- if len(ptr) > readblock {
- n, err = r.Read(ptr[:readblock])
- } else {
- n, err = r.Read(ptr)
- }
- } else {
- // Ran out of buffer space, try reading one more byte
- var b [1]byte
- n, err = r.Read(b[:])
-
- if n > 0 {
- // Reader has more data but we have nowhere to
- // put it, so we're stuffed
- slices <- nextSlice{nil, io.ErrShortBuffer}
- } else {
- // Return some other error (hopefully EOF)
- slices <- nextSlice{nil, err}
- }
- return
- }
-
- // End on error (includes EOF)
- if err != nil {
- slices <- nextSlice{nil, err}
- return
- }
-
- if n > 0 {
- // Make a slice with the contents of the read
- slices <- nextSlice{ptr[:n], nil}
-
- // Adjust the scratch space slice
- ptr = ptr[n:]
- }
- }
- }
-}
-
-// Handle a read request. Returns true if a response was sent, and false if
-// the request should be queued.
-func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
- if (reader_status != nil) && (reader_status != io.EOF) {
- req.result <- sliceResult{nil, reader_status}
- return true
- } else if req.offset < len(body) {
- var end int
- if req.offset+req.maxsize < len(body) {
- end = req.offset + req.maxsize
- } else {
- end = len(body)
- }
- req.result <- sliceResult{body[req.offset:end], nil}
- return true
- } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
- req.result <- sliceResult{nil, io.EOF}
- return true
- } else {
- return false
- }
-}
-
-// Mediates between reads and appends.
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel. Completes
-// when 'requests' channel is closed.
-func (this *AsyncStream) transfer(source_reader io.Reader) {
- source_buffer := this.buffer
- requests := this.requests
-
- // currently buffered data
- var body []byte
-
- // for receiving slices from readIntoBuffer
- var slices chan nextSlice = nil
-
- // indicates the status of the underlying reader
- var reader_status error = nil
-
- if source_reader != nil {
- // 'body' is the buffer slice representing the body content read so far
- body = source_buffer[:0]
-
- // used to communicate slices of the buffer as they are
- // readIntoBuffer will close 'slices' when it is done with it
- slices = make(chan nextSlice)
-
- // Spin it off
- go readIntoBuffer(source_buffer, source_reader, slices)
- } else {
- // use the whole buffer
- body = source_buffer[:]
-
- // buffer is complete
- reader_status = io.EOF
- }
-
- pending_requests := make([]sliceRequest, 0)
-
- for {
- select {
- case req, valid := <-requests:
- // Handle a buffer read request
- if valid {
- if !handleReadRequest(req, body, reader_status) {
- pending_requests = append(pending_requests, req)
- }
- } else {
- // closed 'requests' channel indicates we're done
- return
- }
-
- case bk, valid := <-slices:
- // Got a new slice from the reader
- if valid {
- reader_status = bk.reader_error
-
- if bk.slice != nil {
- // adjust body bounds now that another slice has been read
- body = source_buffer[0 : len(body)+len(bk.slice)]
- }
-
- // handle pending reads
- n := 0
- for n < len(pending_requests) {
- if handleReadRequest(pending_requests[n], body, reader_status) {
- // move the element from the back of the slice to
- // position 'n', then shorten the slice by one element
- pending_requests[n] = pending_requests[len(pending_requests)-1]
- pending_requests = pending_requests[0 : len(pending_requests)-1]
- } else {
-
- // Request wasn't handled, so keep it in the request slice
- n += 1
- }
- }
- } else {
- if reader_status == nil {
- // slices channel closed without signaling EOF
- reader_status = io.ErrUnexpectedEOF
- }
- slices = nil
- }
- }
- }
-}
-
-func (this *AsyncStream) readersMonitor() {
- var readers int = 0
-
- for {
- if readers == 0 {
- select {
- case _, ok := <-this.wait_zero_readers:
- if ok {
- // nothing, just implicitly unblock the sender
- } else {
- return
- }
- case _, ok := <-this.add_reader:
- if ok {
- readers += 1
- } else {
- return
- }
- }
- } else if readers > 0 && readers < MAX_READERS {
- select {
- case _, ok := <-this.add_reader:
- if ok {
- readers += 1
- } else {
- return
- }
-
- case _, ok := <-this.subtract_reader:
- if ok {
- readers -= 1
- } else {
- return
- }
- }
- } else if readers == MAX_READERS {
- _, ok := <-this.subtract_reader
- if ok {
- readers -= 1
- } else {
- return
- }
- }
- }
-}
def write_block_or_manifest(dest, src, api_client, args):
if '+A' in src:
# block locator
- kc = KeepClient(api_client=api_client)
+ kc = arvados.keep.KeepClient(api_client=api_client)
dest.write(kc.get(src, num_retries=args.retries))
else:
# collection UUID or portable data hash
self.assertEqual(0, r)
self.assertEqual(b'baz', self.stdout.getvalue())
+ def test_get_block(self):
+ # Get raw data using a block locator
+ blk = re.search(' (acbd18\S+\+A\S+) ', self.col_manifest).group(1)
+ r = self.run_get([blk, '-'])
+ self.assertEqual(0, r)
+ self.assertEqual(b'foo', self.stdout.getvalue())
+
def test_get_multiple_files(self):
# Download the entire collection to the temp directory
r = self.run_get(["{}/".format(self.col_loc), self.tempdir])
gem 'arvados', '>= 0.1.20150615153458'
gem 'arvados-cli', '>= 0.1.20161017193526'
-gem 'puma', '~> 2.0'
gem 'sshkey'
gem 'safe_yaml'
gem 'lograge'
protected_attributes (1.1.3)
activemodel (>= 4.0.1, < 5.0)
public_suffix (2.0.5)
- puma (2.16.0)
rack (1.6.8)
rack-test (0.6.3)
rack (>= 1.0)
passenger
pg
protected_attributes
- puma (~> 2.0)
rails (~> 4.0)
rails-observers
responders (~> 2.0)
end
def require_auth_scope
- if @read_auths.empty?
+ unless current_user && @read_auths.any? { |auth| auth.user.andand.uuid == current_user.uuid }
if require_login != false
send_error("Forbidden", status: 403)
end
@offset = 0
super
wanted_scopes.compact.each do |scope_list|
- sorted_scopes = scope_list.sort
- @objects = @objects.select { |auth| auth.scopes.sort == sorted_scopes }
+ if @objects.respond_to?(:where) && scope_list.length < 2
+ @objects = @objects.
+ where('scopes in (?)',
+ [scope_list.to_yaml, SafeJSON.dump(scope_list)])
+ else
+ if @objects.respond_to?(:where)
+ # Eliminate rows with scopes=['all'] before doing the
+ # expensive filter. They are typically the majority of
+ # rows, and they obviously won't match given
+ # scope_list.length>=2, so loading them all into
+ # ActiveRecord objects is a huge waste of time.
+ @objects = @objects.
+ where('scopes not in (?)',
+ [['all'].to_yaml, SafeJSON.dump(['all'])])
+ end
+ sorted_scopes = scope_list.sort
+ @objects = @objects.select { |auth| auth.scopes.sort == sorted_scopes }
+ end
end
@limit = @request_limit
@offset = @request_offset
crunchLimitLogBytesPerJob: Rails.application.config.crunch_limit_log_bytes_per_job,
crunchLogPartialLineThrottlePeriod: Rails.application.config.crunch_log_partial_line_throttle_period,
websocketUrl: Rails.application.config.websocket_address,
+ workbenchUrl: Rails.application.config.workbench_address,
parameters: {
alt: {
type: "string",
user = nil
api_client = nil
api_client_auth = nil
- supplied_token =
- params["api_token"] ||
- params["oauth_token"] ||
- env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1]
- if supplied_token
- api_client_auth = ApiClientAuthorization.
+ if request.get? || params["_method"] == 'GET'
+ reader_tokens = params["reader_tokens"]
+ if reader_tokens.is_a? String
+ reader_tokens = SafeJSON.load(reader_tokens)
+ end
+ else
+ reader_tokens = nil
+ end
+
+ # Set current_user etc. based on the primary session token if a
+ # valid one is present. Otherwise, use the first valid token in
+ # reader_tokens.
+ [params["api_token"],
+ params["oauth_token"],
+ env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1],
+ *reader_tokens,
+ ].each do |supplied|
+ next if !supplied
+ try_auth = ApiClientAuthorization.
includes(:api_client, :user).
- where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
+ where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied).
first
- if api_client_auth.andand.user
+ if try_auth.andand.user
+ api_client_auth = try_auth
user = api_client_auth.user
api_client = api_client_auth.api_client
- else
- # Token seems valid, but points to a non-existent (deleted?) user.
- api_client_auth = nil
+ break
end
end
Thread.current[:api_client_ip_address] = remote_ip
"discovery document was generated >#{MAX_SCHEMA_AGE}s ago")
end
- test "discovery document has defaultTrashLifetime" do
+ test "discovery document fields" do
get :index
assert_response :success
discovery_doc = JSON.parse(@response.body)
assert_includes discovery_doc, 'defaultTrashLifetime'
assert_equal discovery_doc['defaultTrashLifetime'], Rails.application.config.default_trash_lifetime
- end
-
- test "discovery document has source_version" do
- get :index
- assert_response :success
- discovery_doc = JSON.parse(@response.body)
assert_match(/^[0-9a-f]+(-modified)?$/, discovery_doc['source_version'])
+ assert_equal discovery_doc['websocketUrl'], Rails.application.config.websocket_address
+ assert_equal discovery_doc['workbenchUrl'], Rails.application.config.workbench_address
end
test "discovery document overrides source_version with config" do
assert_response 403
end
+ test "narrow + wide scoped tokens for different users" do
+ get_args = [{
+ reader_tokens: [api_client_authorizations(:anonymous).api_token]
+ }, auth(:active_userlist)]
+ get(v1_url('users'), *get_args)
+ assert_response :success
+ get(v1_url('users', ''), *get_args) # Add trailing slash.
+ assert_response :success
+ get(v1_url('users', 'current'), *get_args)
+ assert_response 403
+ get(v1_url('virtual_machines'), *get_args)
+ assert_response 403
+ end
+
test "specimens token can see exactly owned specimens" do
get_args = [{}, auth(:active_specimens)]
get(v1_url('specimens'), *get_args)
[nil, :active_noscope].each do |main_auth|
[:spectator, :spectator_specimens].each do |read_auth|
- test "#{main_auth} auth with reader token #{read_auth} can read" do
- assert_includes(get_specimen_uuids(main_auth, read_auth),
- spectator_specimen, "did not find spectator specimen")
- end
-
- test "#{main_auth} auth with JSON read token #{read_auth} can read" do
- assert_includes(get_specimen_uuids(main_auth, read_auth, :to_json),
- spectator_specimen, "did not find spectator specimen")
- end
-
- test "#{main_auth} auth with reader token #{read_auth} can't write" do
- assert_post_denied(main_auth, read_auth)
- end
+ [:to_a, :to_json].each do |formatter|
+ test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can#{"'t" if main_auth} read" do
+ get_specimens(main_auth, read_auth)
+ assert_response(if main_auth then 403 else 200 end)
+ end
- test "#{main_auth} auth with JSON read token #{read_auth} can't write" do
- assert_post_denied(main_auth, read_auth, :to_json)
+ test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can't write" do
+ assert_post_denied(main_auth, read_auth, formatter)
+ end
end
end
end
}
}
+var errorBlacklist = []string{"Cannot connect to the Docker daemon"}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+
+func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+ for _, d := range errorBlacklist {
+ if strings.Index(goterr.Error(), d) != -1 {
+ runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+ if *brokenNodeHook == "" {
+ runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+ } else {
+ runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+ // run killme script
+ c := exec.Command(*brokenNodeHook)
+ c.Stdout = runner.CrunchLog
+ c.Stderr = runner.CrunchLog
+ err := c.Run()
+ if err != nil {
+ runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+ }
+ }
+ return true
+ }
+ }
+ return false
+}
+
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
// check for and/or load image
err = runner.LoadImage()
if err != nil {
- runner.finalState = "Cancelled"
+ if !runner.checkBrokenNode(err) {
+ // Failed to load image but not due to a "broken node"
+ // condition, probably user error.
+ runner.finalState = "Cancelled"
+ }
err = fmt.Errorf("While loading container image: %v", err)
return
}
return
}
- runner.StartCrunchstat()
-
if runner.IsCancelled() {
return
}
}
runner.finalState = "Cancelled"
+ runner.StartCrunchstat()
+
err = runner.StartContainer()
if err != nil {
+ runner.checkBrokenNode(err)
return
}
}
api.Retries = 8
- var kc *keepclient.KeepClient
- kc, err = keepclient.MakeKeepClient(api)
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
+ kc, kcerr := keepclient.MakeKeepClient(api)
+ if kcerr != nil {
+ log.Fatalf("%s: %v", containerId, kcerr)
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
- var docker *dockerclient.Client
// API version 1.21 corresponds to Docker 1.9, which is currently the
// minimum version we want to support.
- docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
- }
-
+ docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
dockerClientProxy := ThinDockerClientProxy{Docker: docker}
cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+
+ if dockererr != nil {
+ cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+ cr.checkBrokenNode(dockererr)
+ cr.CrunchLog.Close()
+ os.Exit(1)
+ }
+
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent
}
func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+ if t.finish == 2 {
+ return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+ }
+
if t.imageLoaded == image {
return dockertypes.ImageInspect{}, nil, nil
} else {
}
func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+ if t.finish == 2 {
+ return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+ }
_, err := io.Copy(ioutil.Discard, input)
if err != nil {
return dockertypes.ImageLoadResponse{}, err
if api.CalledWith("container.state", "Complete") != nil {
c.Check(err, IsNil)
}
- c.Check(api.WasSetRunning, Equals, true)
-
- c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+ if exitCode != 2 {
+ c.Check(api.WasSetRunning, Equals, true)
+ c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+ }
if err != nil {
for k, v := range api.Logs {
_, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "", 0)
c.Assert(err, NotNil)
}
+
+func (s *TestSuite) TestFullBrokenDocker1(c *C) {
+ tf, err := ioutil.TempFile("", "brokenNodeHook-")
+ c.Assert(err, IsNil)
+ defer os.Remove(tf.Name())
+
+ tf.Write([]byte(`#!/bin/sh
+exec echo killme
+`))
+ tf.Close()
+ os.Chmod(tf.Name(), 0700)
+
+ ech := tf.Name()
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
+
+}
+
+func (s *TestSuite) TestFullBrokenDocker2(c *C) {
+ ech := ""
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+}
"fmt"
"io"
"io/ioutil"
- "log"
"net"
"net/http"
"os"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/health"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ log "github.com/Sirupsen/logrus"
"github.com/coreos/go-systemd/daemon"
"github.com/ghodss/yaml"
"github.com/gorilla/mux"
router http.Handler
)
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
func main() {
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ })
+
cfg := DefaultConfig()
flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
// Start serving requests.
router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
- http.Serve(listener, router)
+ http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
log.Println("shutting down")
}
Timeout: h.timeout,
Transport: h.transport,
},
- proto: req.Proto,
+ proto: req.Proto,
+ requestID: req.Header.Get("X-Request-Id"),
}
return &kc
}
"errors"
"fmt"
"io/ioutil"
- "log"
+ "math/rand"
"net/http"
"net/http/httptest"
"os"
"strings"
+ "sync"
"testing"
"time"
time.Sleep(ms * time.Millisecond)
}
if listener == nil {
- log.Fatalf("Timed out waiting for listener to start")
+ panic("Timed out waiting for listener to start")
}
}
}
}
+func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+ router.(*proxyHandler).timeout = time.Nanosecond
+
+ buf := make([]byte, 1<<20)
+ rand.Read(buf)
+ var wg sync.WaitGroup
+ for i := 0; i < 128; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ kc.PutB(buf)
+ }()
+ }
+ done := make(chan bool)
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ c.Error("timeout")
+ }
+}
+
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
kc := runProxy(c, nil, false)
defer closeListener()
{
_, _, err := kc.Ask(hash)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Ask (expected BlockNotFound)")
+ c.Log("Finished Ask (expected BlockNotFound)")
}
{
reader, _, _, err := kc.Get(hash)
c.Check(reader, Equals, nil)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Get (expected BlockNotFound)")
+ c.Log("Finished Get (expected BlockNotFound)")
}
// Note in bug #5309 among other errors keepproxy would set
c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB (expected success)")
+ c.Log("Finished PutB (expected success)")
}
{
blocklen, _, err := kc.Ask(hash2)
c.Assert(err, Equals, nil)
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Ask (expected success)")
+ c.Log("Finished Ask (expected success)")
}
{
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte("foo"))
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Get (expected success)")
+ c.Log("Finished Get (expected success)")
}
{
c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB zero block")
+ c.Log("Finished PutB zero block")
}
{
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte(""))
c.Check(blocklen, Equals, int64(0))
- log.Print("Finished Get zero block")
+ c.Log("Finished Get zero block")
}
}
errNotFound, _ := err.(keepclient.ErrNotFound)
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
- log.Print("Ask 1")
+ c.Log("Ask 1")
}
{
c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
- log.Print("PutB")
+ c.Log("PutB")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
+ c.Log("Ask 2")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
+ c.Log("Get")
}
}
errNotFound, _ := err.(keepclient.ErrNotFound)
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
- log.Print("Ask 1")
+ c.Log("Ask 1")
}
{
c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("PutB")
+ c.Log("PutB")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
+ c.Log("Ask 2")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
+ c.Log("Get")
}
}
var viaAlias = "keepproxy"
type proxyClient struct {
- client keepclient.HTTPClient
- proto string
+ client keepclient.HTTPClient
+ proto string
+ requestID string
}
func (pc *proxyClient) Do(req *http.Request) (*http.Response, error) {
req.Header.Add("Via", pc.proto+" "+viaAlias)
+ req.Header.Add("X-Request-Id", pc.requestID)
return pc.client.Do(req)
}
ok := make(chan struct{})
go func() {
req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
- (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
+ MakeRESTRouter().ServeHTTP(resp, req)
ok <- struct{}{}
}()
// Start a round-robin VolumeManager with the volumes we have found.
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
- // Middleware stack: logger, MaxRequests limiter, method handlers
+ // Middleware/handler stack
router := MakeRESTRouter()
limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
router.limiter = limiter
- http.Handle("/", &LoggingRESTRouter{router: limiter})
+ http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-// LoggingRESTRouter
-// LoggingResponseWriter
-
-import (
- "context"
- "net/http"
- "strings"
- "time"
-
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "git.curoverse.com/arvados.git/sdk/go/stats"
- log "github.com/Sirupsen/logrus"
-)
-
-// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
-type LoggingResponseWriter struct {
- Status int
- Length int
- http.ResponseWriter
- ResponseBody string
- sentHdr time.Time
-}
-
-// CloseNotify implements http.CloseNotifier.
-func (resp *LoggingResponseWriter) CloseNotify() <-chan bool {
- wrapped, ok := resp.ResponseWriter.(http.CloseNotifier)
- if !ok {
- // If upstream doesn't implement CloseNotifier, we can
- // satisfy the interface by returning a channel that
- // never sends anything (the interface doesn't
- // guarantee that anything will ever be sent on the
- // channel even if the client disconnects).
- return nil
- }
- return wrapped.CloseNotify()
-}
-
-// WriteHeader writes header to ResponseWriter
-func (resp *LoggingResponseWriter) WriteHeader(code int) {
- if resp.sentHdr == zeroTime {
- resp.sentHdr = time.Now()
- }
- resp.Status = code
- resp.ResponseWriter.WriteHeader(code)
-}
-
-var zeroTime time.Time
-
-func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
- if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime {
- resp.sentHdr = time.Now()
- }
- resp.Length += len(data)
- if resp.Status >= 400 {
- resp.ResponseBody += string(data)
- }
- return resp.ResponseWriter.Write(data)
-}
-
-// LoggingRESTRouter is used to add logging capabilities to mux.Router
-type LoggingRESTRouter struct {
- router http.Handler
- idGenerator httpserver.IDGenerator
-}
-
-func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
- tStart := time.Now()
-
- // Attach a requestID-aware logger to the request context.
- lgr := log.WithField("RequestID", loggingRouter.idGenerator.Next())
- ctx := context.WithValue(req.Context(), "logger", lgr)
- req = req.WithContext(ctx)
-
- lgr = lgr.WithFields(log.Fields{
- "remoteAddr": req.RemoteAddr,
- "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
- "reqMethod": req.Method,
- "reqPath": req.URL.Path[1:],
- "reqBytes": req.ContentLength,
- })
- lgr.Debug("request")
-
- resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
- loggingRouter.router.ServeHTTP(&resp, req)
- tDone := time.Now()
-
- statusText := http.StatusText(resp.Status)
- if resp.Status >= 400 {
- statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
- }
- if resp.sentHdr == zeroTime {
- // Nobody changed status or wrote any data, i.e., we
- // returned a 200 response with no body.
- resp.sentHdr = tDone
- }
-
- lgr.WithFields(log.Fields{
- "timeTotal": stats.Duration(tDone.Sub(tStart)),
- "timeToStatus": stats.Duration(resp.sentHdr.Sub(tStart)),
- "timeWriteBody": stats.Duration(tDone.Sub(resp.sentHdr)),
- "respStatusCode": resp.Status,
- "respStatus": statusText,
- "respBytes": resp.Length,
- }).Info("response")
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "net/http"
- "testing"
-)
-
-func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
- http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
-}
s3RaceWindow time.Duration
s3ACL = s3.Private
+
+ zeroTime time.Time
)
const (
create_calls += 1
if create_calls < 2:
raise RateLimitReachedError(429, "Rate limit exceeded",
- retry_after=12)
+ headers={'retry-after': '12'})
elif create_calls < 3:
raise BaseHTTPError(429, "Rate limit exceeded",
{'retry-after': '2'})
'setuptools'
],
dependency_links=[
- "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev2.zip"
+ "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev3.zip"
],
test_suite='tests',
tests_require=[
'requests',
'pbr<1.7.0',
'mock>=1.0',
- 'apache-libcloud==2.2.2.dev2',
+ 'apache-libcloud==2.2.2.dev3',
],
zip_safe=False,
cmdclass={'egg_info': tagger},
SSO_ROOT="$ARVBOX_DATA/sso-devise-omniauth-provider"
fi
+if test -z "$COMPOSER_ROOT" ; then
+ COMPOSER_ROOT="$ARVBOX_DATA/composer"
+fi
+
PG_DATA="$ARVBOX_DATA/postgres"
VAR_DATA="$ARVBOX_DATA/var"
PASSENGER="$ARVBOX_DATA/passenger"
if ! test -d "$SSO_ROOT" ; then
git clone https://github.com/curoverse/sso-devise-omniauth-provider.git "$SSO_ROOT"
fi
+ if ! test -d "$COMPOSER_ROOT" ; then
+ git clone https://github.com/curoverse/composer.git "$COMPOSER_ROOT"
+ fi
if test "$CONFIG" = test ; then
--privileged \
"--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
"--volume=$SSO_ROOT:/usr/src/sso:rw" \
+ "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
"--volume=$PG_DATA:/var/lib/postgresql:rw" \
"--volume=$VAR_DATA:/var/lib/arvados:rw" \
"--volume=$PASSENGER:/var/lib/passenger:rw" \
--privileged \
"--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
"--volume=$SSO_ROOT:/usr/src/sso:rw" \
+ "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
"--volume=$PG_DATA:/var/lib/postgresql:rw" \
"--volume=$VAR_DATA:/var/lib/arvados:rw" \
"--volume=$PASSENGER:/var/lib/passenger:rw" \
libjson-perl nginx gitolite3 lsof libreadline-dev \
apt-transport-https ca-certificates slurm-wlm \
linkchecker python3-virtualenv python-virtualenv xvfb iceweasel \
- libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr && \
+ libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr \
+ libsecret-1-dev && \
apt-get clean
ENV RUBYVERSION_MINOR 2.3
RUN pip install -U setuptools
-ENV NODEVERSION v6.11.2
+ENV NODEVERSION v6.11.4
# Install nodejs binary
RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
FROM arvados/arvbox-base
ARG arvados_version
ARG sso_version=master
+ARG composer_version=master
RUN cd /usr/src && \
git clone --no-checkout https://github.com/curoverse/arvados.git && \
git -C arvados checkout ${arvados_version} && \
git clone --no-checkout https://github.com/curoverse/sso-devise-omniauth-provider.git sso && \
- git -C sso checkout ${sso_version}
+ git -C sso checkout ${sso_version} && \
+ git clone --no-checkout https://github.com/curoverse/composer.git && \
+ git -C composer checkout ${composer_version}
ADD service/ /var/lib/arvbox/service
RUN ln -sf /var/lib/arvbox/service /etc
[workbench]=80
[api]=8000
[sso]=8900
+ [composer]=4200
[arv-git-httpd]=9001
[keep-web]=9002
[keepproxy]=25100
--uid $HOSTUID --gid $HOSTGID \
--non-unique \
--groups docker \
+ --shell /bin/bash \
arvbox
useradd --home-dir /var/lib/arvados/git --uid $HOSTUID --gid $HOSTGID --non-unique git
useradd --groups docker crunch
--- /dev/null
+/usr/local/lib/arvbox/logger
\ No newline at end of file
--- /dev/null
+#!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+set -e
+
+/usr/local/lib/arvbox/runsu.sh $0-service $1
--- /dev/null
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+exec 2>&1
+set -ex -o pipefail
+
+. /usr/local/lib/arvbox/common.sh
+
+cd /usr/src/composer
+
+npm install yarn
+
+PATH=$PATH:/usr/src/composer/node_modules/.bin
+
+yarn install
+
+if test "$1" != "--only-deps" ; then
+ echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/arvados-configuration.yml
+ exec ng serve --host 0.0.0.0 --port 4200 --env=webdev
+fi
fi
run_bundler --without=development
-bundle exec passenger start --runtime-check-only --runtime-dir=/var/lib/passenger
+bundle exec passenger-config build-native-support
+bundle exec passenger-config install-standalone-runtime
if test "$1" = "--only-deps" ; then
exit
rm -rf tmp
mkdir -p tmp/cache
+bundle exec rake assets:precompile
bundle exec rake db:migrate
set +u
fi
exec bundle exec passenger start --port=${services[sso]} \
- --runtime-dir=/var/lib/passenger \
--ssl --ssl-certificate=/var/lib/arvados/self-signed.pem \
--ssl-certificate-key=/var/lib/arvados/self-signed.key
mkdir tmp
chown arvbox:arvbox tmp
+if test -s /var/lib/arvados/workbench_rails_env ; then
+ export RAILS_ENV=$(cat /var/lib/arvados/workbench_rails_env)
+else
+ export RAILS_ENV=development
+fi
+
if test "$1" != "--only-deps" ; then
exec bundle exec passenger start --port 80 \
--user arvbox --runtime-dir=/var/lib/passenger
keep_web_download_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
keep_web_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
arvados_docsite: http://$localip:${services[doc]}/
+ force_ssl: false
EOF
+bundle exec rake assets:precompile
+
(cd config && /usr/local/lib/arvbox/application_yml_override.py)