Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>
//
// SPDX-License-Identifier: AGPL-3.0
-window.CollectionsTable = {
+window.SearchResultsTable = {
maybeLoadMore: function(dom) {
var loader = this.loader
if (loader.state != loader.READY)
},
view: function(vnode) {
var loader = vnode.attrs.loader
+ var iconsMap = {
+ collections: m('i.fa.fa-fw.fa-archive'),
+ projects: m('i.fa.fa-fw.fa-folder'),
+ }
return m('table.table.table-condensed', [
m('thead', m('tr', [
m('th'),
m('td', [
item.workbenchBaseURL() &&
m('a.btn.btn-xs.btn-default', {
- href: item.workbenchBaseURL()+'collections/'+item.uuid,
- }, 'Show'),
+ 'data-original-title': 'show '+item.objectType.description,
+ 'data-placement': 'top',
+ 'data-toggle': 'tooltip',
+ href: item.workbenchBaseURL()+'/'+item.objectType.wb_path+'/'+item.uuid,
+ // Bootstrap's tooltip feature
+ oncreate: function(vnode) { $(vnode.dom).tooltip() },
+ }, iconsMap[item.objectType.wb_path]),
]),
m('td.arvados-uuid', item.uuid),
m('td', item.name || '(unnamed)'),
},
}
-window.CollectionsSearch = {
+window.Search = {
oninit: function(vnode) {
vnode.state.sessionDB = new SessionDB()
vnode.state.searchEntered = m.stream()
var workbenchBaseURL = function() {
return vnode.state.sessionDB.workbenchBaseURL(session)
}
- return new MultipageLoader({
+ var searchable_objects = [
+ {
+ wb_path: 'projects',
+ api_path: 'arvados/v1/groups',
+ filters: [['group_class', '=', 'project']],
+ description: 'project',
+ },
+ {
+ wb_path: 'collections',
+ api_path: 'arvados/v1/collections',
+ filters: [],
+ description: 'collection',
+ },
+ ]
+ return new MergingLoader({
sessionKey: key,
- loadFunc: function(filters) {
- var tsquery = to_tsquery(q)
- if (tsquery) {
- filters = filters.slice(0)
- filters.push(['any', '@@', tsquery])
- }
- return vnode.state.sessionDB.request(session, 'arvados/v1/collections', {
- data: {
- filters: JSON.stringify(filters),
- count: 'none',
+ // For every session, search for every object type
+ children: searchable_objects.map(function(obj_type) {
+ return new MultipageLoader({
+ sessionKey: key,
+ loadFunc: function(filters) {
+ // Apply additional type dependant filters
+ filters = filters.concat(obj_type.filters)
+ var tsquery = to_tsquery(q)
+ if (tsquery) {
+ filters.push(['any', '@@', tsquery])
+ }
+ return vnode.state.sessionDB.request(session, obj_type.api_path, {
+ data: {
+ filters: JSON.stringify(filters),
+ count: 'none',
+ },
+ }).then(function(resp) {
+ resp.items.map(function(item) {
+ item.workbenchBaseURL = workbenchBaseURL
+ item.objectType = obj_type
+ })
+ return resp
+ })
},
- }).then(function(resp) {
- resp.items.map(function(item) {
- item.workbenchBaseURL = workbenchBaseURL
- })
- return resp
})
- },
+ }),
})
- })
+ }),
})
})
},
m('.row', [
m('.col-md-6', [
m('.input-group', [
- m('input#search.form-control[placeholder=Search]', {
+ m('input#search.form-control[placeholder=Search collections and projects]', {
oninput: m.withAttr('value', vnode.state.searchEntered),
value: vnode.state.searchEntered(),
}),
m('a[href="/sessions"]', 'Add/remove sites'),
]),
]),
- m(CollectionsTable, {
+ m(SearchResultsTable, {
loader: vnode.state.loader,
}),
],
return m('.container', [
m('p', [
'You can log in to multiple Arvados sites here, then use the ',
- m('a[href="/collections/multisite"]', 'multi-site search'),
- ' page to search collections on all sites at once.',
+ m('a[href="/search"]', 'multi-site search'),
+ ' page to search collections and projects on all sites at once.',
]),
m('table.table.table-condensed.table-hover', [
m('thead', m('tr', [
// the host part of apihostport is an IPv4 or [IPv6]
// address.
if (!session.baseURL.match('://(\\[|\\d+\\.\\d+\\.\\d+\\.\\d+[:/])'))
- return session.baseURL.replace('://', '://workbench.')
+ var wbUrl = session.baseURL.replace('://', '://workbench.')
+ // Remove the trailing slash, if it's there.
+ return wbUrl.slice(-1) == '/' ? wbUrl.slice(0, -1) : wbUrl
return null
},
// Return a m.stream that will get fulfilled with the
skip_around_filter(:require_thread_api_token,
only: [:show_file, :show_file_links])
skip_before_filter(:find_object_by_uuid,
- only: [:provenance, :show_file, :show_file_links, :multisite])
+ only: [:provenance, :show_file, :show_file_links])
# We depend on show_file to display the user agreement:
skip_before_filter :check_user_agreements, only: :show_file
skip_before_filter :check_user_profile, only: :show_file
# SPDX-License-Identifier: AGPL-3.0
class SearchController < ApplicationController
+ skip_before_filter :ensure_arvados_api_exists
+
def find_objects_for_index
search_what = Group
if params[:project_uuid]
@client_mtx = Mutex.new
end
- def api(resources_kind, action, data=nil, tokens={})
+ def api(resources_kind, action, data=nil, tokens={}, include_anon_token=true)
profile_checkpoint
'reader_tokens' => ((tokens[:reader_tokens] ||
Thread.current[:reader_tokens] ||
[]) +
- [Rails.configuration.anonymous_user_token]).to_json,
+ (include_anon_token ? [Rails.configuration.anonymous_user_token] : [])).to_json,
}
if !data.nil?
data.each do |k,v|
end
def self.current
- res = arvados_api_client.api self, '/current'
+ res = arvados_api_client.api self, '/current', nil, {}, false
arvados_api_client.unpack_api_response(res)
end
<%=
target = Rails.configuration.multi_site_search
if target == true
- target = {controller: 'collections', action: 'multisite'}
+ target = {controller: 'search', action: 'index'}
end
link_to("Multi-site search", target, {class: 'btn btn-default'}) %>
</form>
# fetch children of all the active crs in one call, if there are any
active_crs = recent_crs.each {|cr| cr if (cr.priority.andand > 0 and cr.state != 'Final' and cr.container_uuid)}
- active_cr_uuids = active_crs.map(&:uuid)
- active_cr_containers = active_crs.map {|cr| cr.container_uuid}.compact.uniq
- cr_children = {}
- if active_cr_containers.any?
- active_cr_containers.each { |c| cr_children[c] = []}
- cols = ContainerRequest.columns.map(&:name) - %w(id updated_at mounts)
- reqs = ContainerRequest.select(cols).where(requesting_container_uuid: active_cr_containers).results
- reqs.each {|cr| cr_children[cr.requesting_container_uuid] << cr} if reqs
- end
wus = {}
outputs = []
recent_procs.each do |p|
- if p.uuid.in?(active_cr_uuids)
- wu = p.work_unit(nil, child_objects=cr_children[p.container_uuid])
- else
- wu = p.work_unit
- end
+ wu = p.work_unit
wus[p] = wu
outputs << wu.outputs
<% else %>
<div class="dashboard-panel-info-row row-<%=wu.uuid%>">
<div class="row">
- <div class="col-md-6">
+ <div class="col-md-6 text-overflow-ellipsis">
<%= link_to_if_arvados_object p, {friendly_name: true} %>
</div>
- <% if wu.is_running? %>
- <div class="col-md-6">
- <div class="progress" style="margin-bottom: 0px">
- <% wu.progress %>
- </div>
- </div>
- <% else %>
<div class="col-md-2">
<span class="label label-<%=wu.state_bootstrap_class%>"><%=wu.state_label%></span>
</div>
- <% end %>
</div>
- <%
- children = wu.children
- running = children.select { |c| c.state_label == "Running" }
- queued = children.select { |c| c.state_label == "Queued" }
- %>
-
<div class="clearfix">
Started at <%= render_localized_date(wu.started_at || wu.created_at, "noseconds") %>.
<% wu_time = Time.now - (wu.started_at || wu.created_at) %>
Active for <%= render_runtime(wu_time, false) %>.
<div class="pull-right">
- <% running.each do |r| %>
- <span class="label label-<%= r.state_bootstrap_class %>"> <%= r.label || r.state_label || 'Not ready' %> </span>
- <% end %>
- <% queued.each do |q| %>
- <span class="label label-<%= q.state_bootstrap_class %>"> <%= q.label || r.state_label || 'Not ready' %> </span>
- <% end %>
</div>
</div>
</div>
SPDX-License-Identifier: AGPL-3.0 -->
-<div data-mount-mithril="CollectionsSearch"></div>
+<div data-mount-mithril="Search"></div>
post 'remove_selected_files', on: :member
get 'tags', on: :member
post 'save_tags', on: :member
- get 'multisite', on: :collection
+ get 'multisite', on: :collection, to: redirect('/search')
end
get('/collections/download/:uuid/:reader_token/*file' => 'collections#show_file',
format: false)
get 'tab_counts', on: :member
get 'public', on: :collection
end
-
+
resources :search do
get 'choose', :on => :collection
end
match '/_health/ping', to: 'healthcheck#ping', via: [:get]
get '/tests/mithril', to: 'tests#mithril'
-
+
get '/status', to: 'status#status'
-
+
# Send unroutable requests to an arbitrary controller
# (ends up at ApplicationController#render_not_found)
match '*a', to: 'links#render_not_found', via: [:get, :post]
{
fixture: 'container_requests',
state: 'running',
- selectors: [['div.progress', true]]
+ selectors: [['.label-info', true, 'Running']]
},
{
fixture: 'pipeline_instances',
{
fixture: 'pipeline_instances',
state: 'pipeline_in_running_state',
- selectors: [['div.progress', true]]
+ selectors: [['.label-info', true, 'Running']]
},
].each do |c|
uuid = api_fixture(c[:fixture])[c[:state]]['uuid']
end
within('.recent-processes') do
- assert_text 'running'
within('.row-zzzzz-xvhdp-cr4runningcntnr') do
- assert_text 'requester_for_running_cr'
+ assert_text 'running'
end
assert_text 'zzzzz-d1hrv-twodonepipeline'
#
# SPDX-License-Identifier: AGPL-3.0
-LIBCLOUD_PIN=2.2.2.dev2
+LIBCLOUD_PIN=2.2.2.dev3
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip build-essential unzip
-
-# Install RVM
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
- curl -L https://get.rvm.io | bash -s stable && \
- /usr/local/rvm/bin/rvm install 2.3 && \
- /usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
- /usr/local/rvm/bin/rvm-exec default gem install bundler && \
- /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
-
-# Install golang binary
-ADD generated/go1.8.3.linux-amd64.tar.gz /usr/local/
-RUN ln -s /usr/local/go/bin/go /usr/local/bin/
-
-# Install nodejs and npm
-ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
-RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
-ENV WORKSPACE /arvados
-CMD ["/usr/local/rvm/bin/rvm-exec", "default", "bash", "/jenkins/run-build-packages.sh", "--target", "ubuntu1204"]
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-FROM ubuntu:precise
-MAINTAINER Ward Vandewege <ward@curoverse.com>
-
-ENV DEBIAN_FRONTEND noninteractive
-
-# Install RVM
-RUN apt-get update && \
- apt-get -y install --no-install-recommends curl ca-certificates g++ && \
- gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
- curl -L https://get.rvm.io | bash -s stable && \
- /usr/local/rvm/bin/rvm install 2.3 && \
- /usr/local/rvm/bin/rvm alias create default ruby-2.3
-
-# udev daemon can't start in a container, so don't try.
-RUN mkdir -p /etc/udev/disabled
-
-RUN echo "deb file:///arvados/packages/ubuntu1204/ /" >>/etc/apt/sources.list
true
else
FINAL_EXITCODE=$?
+ echo
+ echo "Build packages failed for $(basename $(dirname "$dockerfile_path"))"
+ echo
fi
done
if test $FINAL_EXITCODE != 0 ; then
+ echo
echo "Build packages failed with code $FINAL_EXITCODE" >&2
+ echo
fi
exit $FINAL_EXITCODE
sdk/go/httpserver
sdk/go/manifest
sdk/go/blockdigest
-sdk/go/streamer
+sdk/go/asyncbuf
sdk/go/stats
sdk/go/crunchrunner
sdk/cwl
phantomjs --version || fatal "No phantomjs. Try: apt-get install phantomjs"
echo -n 'xvfb: '
which Xvfb || fatal "No xvfb. Try: apt-get install xvfb"
+ echo -n 'graphviz: '
+ dot -V || fatal "No graphviz. Try: apt-get install graphviz"
}
rotate_logfile() {
export GOPATH
mkdir -p "$GOPATH/src/git.curoverse.com"
+rmdir --parents "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH"
ln -sfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git" \
|| fatal "symlink failed"
go get -v github.com/kardianos/govendor \
sdk/go/health
sdk/go/httpserver
sdk/go/manifest
- sdk/go/streamer
+ sdk/go/asyncbuf
sdk/go/crunchrunner
sdk/go/stats
lib/crunchstat
- user/topics/run-command.html.textile.liquid
- user/reference/job-pipeline-ref.html.textile.liquid
- user/examples/crunch-examples.html.textile.liquid
+ - user/topics/arv-sync-groups.html.textile.liquid
- Query the metadata database:
- user/topics/tutorial-trait-search.html.textile.liquid
- Arvados License:
SPDX-License-Identifier: CC-BY-SA-3.0
{% endcomment %}
-The Keep-web server provides read-only HTTP access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail.
+The Keep-web server provides read/write HTTP (WebDAV) access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides TLS support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail.
By convention, we use the following hostnames for the Keep-web service:
-allow-anonymous
Serve public data to anonymous clients. Try the token supplied in the ARVADOS_API_TOKEN environment variable when none of the tokens provided in an HTTP request succeed in reading the desired collection. (default false)
-attachment-only-host string
- Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or SSL.
+ Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or TLS.
-listen string
Address to listen on: "host:port", or ":port" to listen on all interfaces. (default ":80")
-trust-all-content
Omit the @-allow-anonymous@ argument if you do not want to serve public data.
-Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's SSL certificate is not signed by a recognized CA.
+Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's TLS certificate is not signed by a recognized CA.
-h3. Set up a reverse proxy with SSL support
+h3. Set up a reverse proxy with TLS support
-The Keep-web service will be accessible from anywhere on the internet, so we recommend using SSL for transport encryption.
+The Keep-web service will be accessible from anywhere on the internet, so we recommend using TLS for transport encryption.
-This is best achieved by putting a reverse proxy with SSL support in front of Keep-web, running on port 443 and passing requests to Keep-web on port 9002 (or whatever port you chose in your run script).
+This is best achieved by putting a reverse proxy with TLS support in front of Keep-web, running on port 443 and passing requests to Keep-web on port 9002 (or whatever port you chose in your run script).
-Note: A wildcard SSL certificate is required in order to support a full-featured secure Keep-web service. Without it, Keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, Keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard SSL certificate and DNS configured appropriately, all data can be served as web content.
+Note: A wildcard TLS certificate is required in order to support a full-featured secure Keep-web service. Without it, Keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, Keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard TLS certificate and DNS configured appropriately, all data can be served as web content.
For example, using Nginx:
Configure your DNS servers so the following names resolve to your Nginx proxy's public IP address.
* @download.uuid_prefix.your.domain@
* @collections.uuid_prefix.your.domain@
-* @*--collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names.
-* @*.collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for these names.
+* @*--collections.uuid_prefix.your.domain@, if you have a wildcard TLS certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names.
+* @*.collections.uuid_prefix.your.domain@, if you have a wildcard TLS certificate valid for these names.
If neither of the above wildcard options is feasible, you have two choices:
# Serve web content at @collections.uuid_prefix.your.domain@, but only for unauthenticated requests (public data and collection sharing links). Authenticated requests will always result in file downloads, using the @download@ name. For example, the Workbench "preview" button and the "view entire log file" link will invoke file downloads instead of displaying content in the browser window.
|==--submit-runner-ram== SUBMIT_RUNNER_RAM|RAM (in MiB) required for the workflow runner job (default 1024)|
|==--submit-runner-image== SUBMIT_RUNNER_IMAGE|Docker image for workflow runner job, default arvados/jobs|
|==--name== NAME| Name to use for workflow execution instance.|
-|==--on-error {stop,continue}|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.|
+|==--on-error {stop,continue}==|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.|
|==--enable-dev==| Enable loading and running development versions of CWL spec.|
|==--intermediate-output-ttl== N|If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).|
|==--trash-intermediate==| Immediately trash intermediate outputs on workflow success.|
--- /dev/null
+---
+layout: default
+navsection: userguide
+title: "Using arv-sync-groups"
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+The @arv-sync-groups@ tool allows to synchronize remote groups into Arvados from an external source.
+
+h1. Using arv-sync-groups
+
+This tool reads a CSV (comma-separated values) file having information about external groups and their members. When running it for the first time, it'll create a special group named 'Externally synchronized groups' meant to be the parent of all the remote groups.
+
+Every line on the file should have 2 values: a group name and a local user identifier, meaning that the named user is a member of the group. The tool will create the group if it doesn't exist, and add the user to it. If group member is not present on the input file, the account will be removed from the group.
+
+Users can be identified by their email address or username: the tool will check if every user exist on the system, and report back when not found. Groups on the other hand, are identified by their name.
+
+This tool is designed to be run periodically reading a file created by a remote auth system (ie: LDAP) dump script, applying what's included on the file as the source of truth.
+
+
+bq. NOTE: @arv-sync-groups@ needs to perform several administrative tasks on Arvados, so must be run using a superuser token
+
+h2. Options
+
+The following command line options are supported:
+
+table(table table-bordered table-condensed).
+|_. Option |_. Description |
+|==--help==| This list of options|
+|==--parent-group-uuid==| UUID of group to own all the externally synchronized groups|
+|==--user-id== | Identifier to use in looking up user. One of 'email' or 'username' (Default: 'email')|
+|==--verbose==| Log informational messages (Default: False)|
+|==--version==| Print version and exit|
+
+h2. Examples
+
+To sync groups using the username to identify every account, reading from some @external_groups.csv@ file, the command should be called as follows:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arv-sync-groups --user-id username /path/to/external_groups.csv </span>
+</code></pre>
+</notextile>
+
+If you want to use a specific preexisting group as the parent of all the remote groups, you can do it this way:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arv-sync-groups --parent-group-uuid <preexisting group UUID> --user-id username /path/to/external_groups.csv </span>
+</code></pre>
+</notextile>
# apt.arvados.org
deb http://apt.arvados.org/ jessie main
+deb http://apt.arvados.org/ jessie-dev main
arvargs.conformance_test = None
arvargs.use_container = True
arvargs.relax_path_checks = True
- arvargs.validate = None
arvargs.print_supported_versions = False
make_fs_access = partial(CollectionFsAccess,
package arvados
import (
+ "bytes"
"crypto/tls"
"encoding/json"
"fmt"
//
// path must not contain a query string.
func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+ if body, ok := body.(io.Closer); ok {
+ // Ensure body is closed even if we error out early
+ defer body.Close()
+ }
urlString := c.apiURL(path)
urlValues, err := anythingToValues(params)
if err != nil {
return c.DoAndDecode(dst, req)
}
+type resource interface {
+ resourceName() string
+}
+
+// UpdateBody returns an io.Reader suitable for use as an http.Request
+// Body for a create or update API call.
+func (c *Client) UpdateBody(rsc resource) io.Reader {
+ j, err := json.Marshal(rsc)
+ if err != nil {
+ // Return a reader that returns errors.
+ r, w := io.Pipe()
+ w.CloseWithError(err)
+ return r
+ }
+ v := url.Values{rsc.resourceName(): {string(j)}}
+ return bytes.NewBufferString(v.Encode())
+}
+
func (c *Client) httpClient() *http.Client {
switch {
case c.Client != nil:
IsTrashed bool `json:"is_trashed,omitempty"`
}
+func (c Collection) resourceName() string {
+ return "collection"
+}
+
// SizedDigests returns the hash+size part of each data block
// referenced by the collection.
func (c *Collection) SizedDigests() ([]SizedDigest, error) {
package arvados
import (
+ "errors"
+ "fmt"
"io"
"net/http"
"os"
"path"
+ "regexp"
+ "sort"
+ "strconv"
"strings"
"sync"
"time"
+)
+
+var (
+ ErrReadOnlyFile = errors.New("read-only file")
+ ErrNegativeOffset = errors.New("cannot seek to negative offset")
+ ErrFileExists = errors.New("file exists")
+ ErrInvalidOperation = errors.New("invalid operation")
+ ErrInvalidArgument = errors.New("invalid argument")
+ ErrDirectoryNotEmpty = errors.New("directory not empty")
+ ErrWriteOnlyMode = errors.New("file is O_WRONLY")
+ ErrSyncNotSupported = errors.New("O_SYNC flag is not supported")
+ ErrIsDirectory = errors.New("cannot rename file to overwrite existing directory")
+ ErrPermission = os.ErrPermission
- "git.curoverse.com/arvados.git/sdk/go/manifest"
+ maxBlockSize = 1 << 26
)
+// A File is an *os.File-like interface for reading and writing files
+// in a CollectionFileSystem.
type File interface {
io.Reader
+ io.Writer
io.Closer
io.Seeker
Size() int64
+ Readdir(int) ([]os.FileInfo, error)
+ Stat() (os.FileInfo, error)
+ Truncate(int64) error
}
type keepClient interface {
- ManifestFileReader(manifest.Manifest, string) (File, error)
+ ReadAt(locator string, p []byte, off int) (int, error)
+ PutB(p []byte) (string, int, error)
}
-type collectionFile struct {
- File
- collection *Collection
- name string
- size int64
+type fileinfo struct {
+ name string
+ mode os.FileMode
+ size int64
+ modTime time.Time
}
-func (cf *collectionFile) Size() int64 {
- return cf.size
+// Name implements os.FileInfo.
+func (fi fileinfo) Name() string {
+ return fi.name
}
-func (cf *collectionFile) Readdir(count int) ([]os.FileInfo, error) {
- return nil, io.EOF
+// ModTime implements os.FileInfo.
+func (fi fileinfo) ModTime() time.Time {
+ return fi.modTime
}
-func (cf *collectionFile) Stat() (os.FileInfo, error) {
- return collectionDirent{
- collection: cf.collection,
- name: cf.name,
- size: cf.size,
- isDir: false,
- }, nil
+// Mode implements os.FileInfo.
+func (fi fileinfo) Mode() os.FileMode {
+ return fi.mode
}
-type collectionDir struct {
- collection *Collection
- stream string
- dirents []os.FileInfo
+// IsDir implements os.FileInfo.
+func (fi fileinfo) IsDir() bool {
+ return fi.mode&os.ModeDir != 0
}
-// Readdir implements os.File.
-func (cd *collectionDir) Readdir(count int) ([]os.FileInfo, error) {
- ret := cd.dirents
- if count <= 0 {
- cd.dirents = nil
- return ret, nil
- } else if len(ret) == 0 {
- return nil, io.EOF
+// Size implements os.FileInfo.
+func (fi fileinfo) Size() int64 {
+ return fi.size
+}
+
+// Sys implements os.FileInfo.
+func (fi fileinfo) Sys() interface{} {
+ return nil
+}
+
+// A CollectionFileSystem is an http.Filesystem plus Stat() and
+// support for opening writable files. All methods are safe to call
+// from multiple goroutines.
+type CollectionFileSystem interface {
+ http.FileSystem
+
+ // analogous to os.Stat()
+ Stat(name string) (os.FileInfo, error)
+
+ // analogous to os.Create(): create/truncate a file and open it O_RDWR.
+ Create(name string) (File, error)
+
+ // Like os.OpenFile(): create or open a file or directory.
+ //
+ // If flag&os.O_EXCL==0, it opens an existing file or
+ // directory if one exists. If flag&os.O_CREATE!=0, it creates
+ // a new empty file or directory if one does not already
+ // exist.
+ //
+ // When creating a new item, perm&os.ModeDir determines
+ // whether it is a file or a directory.
+ //
+ // A file can be opened multiple times and used concurrently
+ // from multiple goroutines. However, each File object should
+ // be used by only one goroutine at a time.
+ OpenFile(name string, flag int, perm os.FileMode) (File, error)
+
+ Mkdir(name string, perm os.FileMode) error
+ Remove(name string) error
+ RemoveAll(name string) error
+ Rename(oldname, newname string) error
+
+ // Flush all file data to Keep and return a snapshot of the
+ // filesystem suitable for saving as (Collection)ManifestText.
+ // Prefix (normally ".") is a top level directory, effectively
+ // prepended to all paths in the returned manifest.
+ MarshalManifest(prefix string) (string, error)
+}
+
+type fileSystem struct {
+ dirnode
+}
+
+func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
+ return fs.dirnode.OpenFile(name, flag, perm)
+}
+
+func (fs *fileSystem) Open(name string) (http.File, error) {
+ return fs.dirnode.OpenFile(name, os.O_RDONLY, 0)
+}
+
+func (fs *fileSystem) Create(name string) (File, error) {
+ return fs.dirnode.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
+}
+
+func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) {
+ node := fs.dirnode.lookupPath(name)
+ if node == nil {
+ err = os.ErrNotExist
+ } else {
+ fi = node.Stat()
}
- var err error
- if count >= len(ret) {
- count = len(ret)
- err = io.EOF
+ return
+}
+
+type inode interface {
+ Parent() inode
+ Read([]byte, filenodePtr) (int, filenodePtr, error)
+ Write([]byte, filenodePtr) (int, filenodePtr, error)
+ Truncate(int64) error
+ Readdir() []os.FileInfo
+ Size() int64
+ Stat() os.FileInfo
+ sync.Locker
+ RLock()
+ RUnlock()
+}
+
+// filenode implements inode.
+type filenode struct {
+ fileinfo fileinfo
+ parent *dirnode
+ segments []segment
+ // number of times `segments` has changed in a
+ // way that might invalidate a filenodePtr
+ repacked int64
+ memsize int64 // bytes in memSegments
+ sync.RWMutex
+}
+
+// filenodePtr is an offset into a file that is (usually) efficient to
+// seek to. Specifically, if filenode.repacked==filenodePtr.repacked
+// then
+// filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
+// corresponds to file offset filenodePtr.off. Otherwise, it is
+// necessary to reexamine len(filenode.segments[0]) etc. to find the
+// correct segment and offset.
+type filenodePtr struct {
+ off int64
+ segmentIdx int
+ segmentOff int
+ repacked int64
+}
+
+// seek returns a ptr that is consistent with both startPtr.off and
+// the current state of fn. The caller must already hold fn.RLock() or
+// fn.Lock().
+//
+// If startPtr is beyond EOF, ptr.segment* will indicate precisely
+// EOF.
+//
+// After seeking:
+//
+// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
+// ||
+// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
+func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
+ ptr = startPtr
+ if ptr.off < 0 {
+ // meaningless anyway
+ return
+ } else if ptr.off >= fn.fileinfo.size {
+ ptr.segmentIdx = len(fn.segments)
+ ptr.segmentOff = 0
+ ptr.repacked = fn.repacked
+ return
+ } else if ptr.repacked == fn.repacked {
+ // segmentIdx and segmentOff accurately reflect
+ // ptr.off, but might have fallen off the end of a
+ // segment
+ if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
+ ptr.segmentIdx++
+ ptr.segmentOff = 0
+ }
+ return
}
- cd.dirents = cd.dirents[count:]
- return ret[:count], err
+ defer func() {
+ ptr.repacked = fn.repacked
+ }()
+ if ptr.off >= fn.fileinfo.size {
+ ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
+ return
+ }
+ // Recompute segmentIdx and segmentOff. We have already
+ // established fn.fileinfo.size > ptr.off >= 0, so we don't
+ // have to deal with edge cases here.
+ var off int64
+ for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
+ // This would panic (index out of range) if
+ // fn.fileinfo.size were larger than
+ // sum(fn.segments[i].Len()) -- but that can't happen
+ // because we have ensured fn.fileinfo.size is always
+ // accurate.
+ segLen := int64(fn.segments[ptr.segmentIdx].Len())
+ if off+segLen > ptr.off {
+ ptr.segmentOff = int(ptr.off - off)
+ break
+ }
+ off += segLen
+ }
+ return
}
-// Stat implements os.File.
-func (cd *collectionDir) Stat() (os.FileInfo, error) {
- return collectionDirent{
- collection: cd.collection,
- name: path.Base(cd.stream),
- isDir: true,
- size: int64(len(cd.dirents)),
- }, nil
+// caller must have lock
+func (fn *filenode) appendSegment(e segment) {
+ fn.segments = append(fn.segments, e)
+ fn.fileinfo.size += int64(e.Len())
+}
+
+func (fn *filenode) Parent() inode {
+ fn.RLock()
+ defer fn.RUnlock()
+ return fn.parent
}
-// Close implements os.File.
-func (cd *collectionDir) Close() error {
+func (fn *filenode) Readdir() []os.FileInfo {
return nil
}
-// Read implements os.File.
-func (cd *collectionDir) Read([]byte) (int, error) {
- return 0, nil
+// Read reads file data from a single segment, starting at startPtr,
+// into p. startPtr is assumed not to be up-to-date. Caller must have
+// RLock or Lock.
+func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
+ ptr = fn.seek(startPtr)
+ if ptr.off < 0 {
+ err = ErrNegativeOffset
+ return
+ }
+ if ptr.segmentIdx >= len(fn.segments) {
+ err = io.EOF
+ return
+ }
+ n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
+ if n > 0 {
+ ptr.off += int64(n)
+ ptr.segmentOff += n
+ if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
+ ptr.segmentIdx++
+ ptr.segmentOff = 0
+ if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
+ err = nil
+ }
+ }
+ }
+ return
}
-// Seek implements os.File.
-func (cd *collectionDir) Seek(int64, int) (int64, error) {
- return 0, nil
+func (fn *filenode) Size() int64 {
+ fn.RLock()
+ defer fn.RUnlock()
+ return fn.fileinfo.Size()
}
-// collectionDirent implements os.FileInfo.
-type collectionDirent struct {
- collection *Collection
- name string
- isDir bool
- mode os.FileMode
- size int64
+func (fn *filenode) Stat() os.FileInfo {
+ fn.RLock()
+ defer fn.RUnlock()
+ return fn.fileinfo
}
-// Name implements os.FileInfo.
-func (e collectionDirent) Name() string {
- return e.name
+func (fn *filenode) Truncate(size int64) error {
+ fn.Lock()
+ defer fn.Unlock()
+ return fn.truncate(size)
}
-// ModTime implements os.FileInfo.
-func (e collectionDirent) ModTime() time.Time {
- if e.collection.ModifiedAt == nil {
- return time.Now()
+func (fn *filenode) truncate(size int64) error {
+ if size == fn.fileinfo.size {
+ return nil
+ }
+ fn.repacked++
+ if size < fn.fileinfo.size {
+ ptr := fn.seek(filenodePtr{off: size})
+ for i := ptr.segmentIdx; i < len(fn.segments); i++ {
+ if seg, ok := fn.segments[i].(*memSegment); ok {
+ fn.memsize -= int64(seg.Len())
+ }
+ }
+ if ptr.segmentOff == 0 {
+ fn.segments = fn.segments[:ptr.segmentIdx]
+ } else {
+ fn.segments = fn.segments[:ptr.segmentIdx+1]
+ switch seg := fn.segments[ptr.segmentIdx].(type) {
+ case *memSegment:
+ seg.Truncate(ptr.segmentOff)
+ fn.memsize += int64(seg.Len())
+ default:
+ fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
+ }
+ }
+ fn.fileinfo.size = size
+ return nil
}
- return *e.collection.ModifiedAt
+ for size > fn.fileinfo.size {
+ grow := size - fn.fileinfo.size
+ var seg *memSegment
+ var ok bool
+ if len(fn.segments) == 0 {
+ seg = &memSegment{}
+ fn.segments = append(fn.segments, seg)
+ } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
+ seg = &memSegment{}
+ fn.segments = append(fn.segments, seg)
+ }
+ if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
+ grow = maxgrow
+ }
+ seg.Truncate(seg.Len() + int(grow))
+ fn.fileinfo.size += grow
+ fn.memsize += grow
+ }
+ return nil
}
-// Mode implements os.FileInfo.
-func (e collectionDirent) Mode() os.FileMode {
- if e.isDir {
- return 0555
+// Write writes data from p to the file, starting at startPtr,
+// extending the file size if necessary. Caller must have Lock.
+func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
+ if startPtr.off > fn.fileinfo.size {
+ if err = fn.truncate(startPtr.off); err != nil {
+ return 0, startPtr, err
+ }
+ }
+ ptr = fn.seek(startPtr)
+ if ptr.off < 0 {
+ err = ErrNegativeOffset
+ return
+ }
+ for len(p) > 0 && err == nil {
+ cando := p
+ if len(cando) > maxBlockSize {
+ cando = cando[:maxBlockSize]
+ }
+ // Rearrange/grow fn.segments (and shrink cando if
+ // needed) such that cando can be copied to
+ // fn.segments[ptr.segmentIdx] at offset
+ // ptr.segmentOff.
+ cur := ptr.segmentIdx
+ prev := ptr.segmentIdx - 1
+ var curWritable bool
+ if cur < len(fn.segments) {
+ _, curWritable = fn.segments[cur].(*memSegment)
+ }
+ var prevAppendable bool
+ if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
+ _, prevAppendable = fn.segments[prev].(*memSegment)
+ }
+ if ptr.segmentOff > 0 && !curWritable {
+ // Split a non-writable block.
+ if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
+ // Truncate cur, and insert a new
+ // segment after it.
+ cando = cando[:max]
+ fn.segments = append(fn.segments, nil)
+ copy(fn.segments[cur+1:], fn.segments[cur:])
+ } else {
+ // Split cur into two copies, truncate
+ // the one on the left, shift the one
+ // on the right, and insert a new
+ // segment between them.
+ fn.segments = append(fn.segments, nil, nil)
+ copy(fn.segments[cur+2:], fn.segments[cur:])
+ fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
+ }
+ cur++
+ prev++
+ seg := &memSegment{}
+ seg.Truncate(len(cando))
+ fn.memsize += int64(len(cando))
+ fn.segments[cur] = seg
+ fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
+ ptr.segmentIdx++
+ ptr.segmentOff = 0
+ fn.repacked++
+ ptr.repacked++
+ } else if curWritable {
+ if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
+ cando = cando[:fit]
+ }
+ } else {
+ if prevAppendable {
+ // Shrink cando if needed to fit in
+ // prev segment.
+ if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
+ cando = cando[:cangrow]
+ }
+ }
+
+ if cur == len(fn.segments) {
+ // ptr is at EOF, filesize is changing.
+ fn.fileinfo.size += int64(len(cando))
+ } else if el := fn.segments[cur].Len(); el <= len(cando) {
+ // cando is long enough that we won't
+ // need cur any more. shrink cando to
+ // be exactly as long as cur
+ // (otherwise we'd accidentally shift
+ // the effective position of all
+ // segments after cur).
+ cando = cando[:el]
+ copy(fn.segments[cur:], fn.segments[cur+1:])
+ fn.segments = fn.segments[:len(fn.segments)-1]
+ } else {
+ // shrink cur by the same #bytes we're growing prev
+ fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
+ }
+
+ if prevAppendable {
+ // Grow prev.
+ ptr.segmentIdx--
+ ptr.segmentOff = fn.segments[prev].Len()
+ fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
+ fn.memsize += int64(len(cando))
+ ptr.repacked++
+ fn.repacked++
+ } else {
+ // Insert a segment between prev and
+ // cur, and advance prev/cur.
+ fn.segments = append(fn.segments, nil)
+ if cur < len(fn.segments) {
+ copy(fn.segments[cur+1:], fn.segments[cur:])
+ ptr.repacked++
+ fn.repacked++
+ } else {
+ // appending a new segment does
+ // not invalidate any ptrs
+ }
+ seg := &memSegment{}
+ seg.Truncate(len(cando))
+ fn.memsize += int64(len(cando))
+ fn.segments[cur] = seg
+ cur++
+ prev++
+ }
+ }
+
+ // Finally we can copy bytes from cando to the current segment.
+ fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
+ n += len(cando)
+ p = p[len(cando):]
+
+ ptr.off += int64(len(cando))
+ ptr.segmentOff += len(cando)
+ if ptr.segmentOff >= maxBlockSize {
+ fn.pruneMemSegments()
+ }
+ if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
+ ptr.segmentOff = 0
+ ptr.segmentIdx++
+ }
+
+ fn.fileinfo.modTime = time.Now()
+ }
+ return
+}
+
+// Write some data out to disk to reduce memory use. Caller must have
+// write lock.
+func (fn *filenode) pruneMemSegments() {
+ // TODO: async (don't hold Lock() while waiting for Keep)
+ // TODO: share code with (*dirnode)sync()
+ // TODO: pack/flush small blocks too, when fragmented
+ for idx, seg := range fn.segments {
+ seg, ok := seg.(*memSegment)
+ if !ok || seg.Len() < maxBlockSize {
+ continue
+ }
+ locator, _, err := fn.parent.kc.PutB(seg.buf)
+ if err != nil {
+ // TODO: stall (or return errors from)
+ // subsequent writes until flushing
+ // starts to succeed
+ continue
+ }
+ fn.memsize -= int64(seg.Len())
+ fn.segments[idx] = storedSegment{
+ kc: fn.parent.kc,
+ locator: locator,
+ size: seg.Len(),
+ offset: 0,
+ length: seg.Len(),
+ }
+ }
+}
+
+// FileSystem returns a CollectionFileSystem for the collection.
+func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
+ var modTime time.Time
+ if c.ModifiedAt == nil {
+ modTime = time.Now()
} else {
- return 0444
+ modTime = *c.ModifiedAt
+ }
+ fs := &fileSystem{dirnode: dirnode{
+ client: client,
+ kc: kc,
+ fileinfo: fileinfo{
+ name: ".",
+ mode: os.ModeDir | 0755,
+ modTime: modTime,
+ },
+ parent: nil,
+ inodes: make(map[string]inode),
+ }}
+ fs.dirnode.parent = &fs.dirnode
+ if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
+ return nil, err
}
+ return fs, nil
}
-// IsDir implements os.FileInfo.
-func (e collectionDirent) IsDir() bool {
- return e.isDir
+type filehandle struct {
+ inode
+ ptr filenodePtr
+ append bool
+ readable bool
+ writable bool
+ unreaddirs []os.FileInfo
}
-// Size implements os.FileInfo.
-func (e collectionDirent) Size() int64 {
- return e.size
+func (f *filehandle) Read(p []byte) (n int, err error) {
+ if !f.readable {
+ return 0, ErrWriteOnlyMode
+ }
+ f.inode.RLock()
+ defer f.inode.RUnlock()
+ n, f.ptr, err = f.inode.Read(p, f.ptr)
+ return
}
-// Sys implements os.FileInfo.
-func (e collectionDirent) Sys() interface{} {
- return nil
+func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) {
+ size := f.inode.Size()
+ ptr := f.ptr
+ switch whence {
+ case io.SeekStart:
+ ptr.off = off
+ case io.SeekCurrent:
+ ptr.off += off
+ case io.SeekEnd:
+ ptr.off = size + off
+ }
+ if ptr.off < 0 {
+ return f.ptr.off, ErrNegativeOffset
+ }
+ if ptr.off != f.ptr.off {
+ f.ptr = ptr
+ // force filenode to recompute f.ptr fields on next
+ // use
+ f.ptr.repacked = -1
+ }
+ return f.ptr.off, nil
}
-// A CollectionFileSystem is an http.Filesystem with an added Stat() method.
-type CollectionFileSystem interface {
- http.FileSystem
- Stat(name string) (os.FileInfo, error)
+func (f *filehandle) Truncate(size int64) error {
+ return f.inode.Truncate(size)
}
-// collectionFS implements CollectionFileSystem.
-type collectionFS struct {
- collection *Collection
- client *Client
- kc keepClient
- sizes map[string]int64
- sizesOnce sync.Once
+func (f *filehandle) Write(p []byte) (n int, err error) {
+ if !f.writable {
+ return 0, ErrReadOnlyFile
+ }
+ f.inode.Lock()
+ defer f.inode.Unlock()
+ if fn, ok := f.inode.(*filenode); ok && f.append {
+ f.ptr = filenodePtr{
+ off: fn.fileinfo.size,
+ segmentIdx: len(fn.segments),
+ segmentOff: 0,
+ repacked: fn.repacked,
+ }
+ }
+ n, f.ptr, err = f.inode.Write(p, f.ptr)
+ return
}
-// FileSystem returns a CollectionFileSystem for the collection.
-func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
- return &collectionFS{
- collection: c,
- client: client,
- kc: kc,
- }
-}
-
-func (c *collectionFS) Stat(name string) (os.FileInfo, error) {
- name = canonicalName(name)
- if name == "." {
- return collectionDirent{
- collection: c.collection,
- name: "/",
- isDir: true,
- }, nil
- }
- if size, ok := c.fileSizes()[name]; ok {
- return collectionDirent{
- collection: c.collection,
- name: path.Base(name),
- size: size,
- isDir: false,
- }, nil
- }
- for fnm := range c.fileSizes() {
- if !strings.HasPrefix(fnm, name+"/") {
+func (f *filehandle) Readdir(count int) ([]os.FileInfo, error) {
+ if !f.inode.Stat().IsDir() {
+ return nil, ErrInvalidOperation
+ }
+ if count <= 0 {
+ return f.inode.Readdir(), nil
+ }
+ if f.unreaddirs == nil {
+ f.unreaddirs = f.inode.Readdir()
+ }
+ if len(f.unreaddirs) == 0 {
+ return nil, io.EOF
+ }
+ if count > len(f.unreaddirs) {
+ count = len(f.unreaddirs)
+ }
+ ret := f.unreaddirs[:count]
+ f.unreaddirs = f.unreaddirs[count:]
+ return ret, nil
+}
+
+func (f *filehandle) Stat() (os.FileInfo, error) {
+ return f.inode.Stat(), nil
+}
+
+func (f *filehandle) Close() error {
+ return nil
+}
+
+type dirnode struct {
+ fileinfo fileinfo
+ parent *dirnode
+ client *Client
+ kc keepClient
+ inodes map[string]inode
+ sync.RWMutex
+}
+
+// sync flushes in-memory data (for all files in the tree rooted at
+// dn) to persistent storage. Caller must hold dn.Lock().
+func (dn *dirnode) sync() error {
+ type shortBlock struct {
+ fn *filenode
+ idx int
+ }
+ var pending []shortBlock
+ var pendingLen int
+
+ flush := func(sbs []shortBlock) error {
+ if len(sbs) == 0 {
+ return nil
+ }
+ block := make([]byte, 0, maxBlockSize)
+ for _, sb := range sbs {
+ block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
+ }
+ locator, _, err := dn.kc.PutB(block)
+ if err != nil {
+ return err
+ }
+ off := 0
+ for _, sb := range sbs {
+ data := sb.fn.segments[sb.idx].(*memSegment).buf
+ sb.fn.segments[sb.idx] = storedSegment{
+ kc: dn.kc,
+ locator: locator,
+ size: len(block),
+ offset: off,
+ length: len(data),
+ }
+ off += len(data)
+ sb.fn.memsize -= int64(len(data))
+ }
+ return nil
+ }
+
+ names := make([]string, 0, len(dn.inodes))
+ for name := range dn.inodes {
+ names = append(names, name)
+ }
+ sort.Strings(names)
+
+ for _, name := range names {
+ fn, ok := dn.inodes[name].(*filenode)
+ if !ok {
continue
}
- return collectionDirent{
- collection: c.collection,
- name: path.Base(name),
- isDir: true,
- }, nil
+ fn.Lock()
+ defer fn.Unlock()
+ for idx, seg := range fn.segments {
+ seg, ok := seg.(*memSegment)
+ if !ok {
+ continue
+ }
+ if seg.Len() > maxBlockSize/2 {
+ if err := flush([]shortBlock{{fn, idx}}); err != nil {
+ return err
+ }
+ continue
+ }
+ if pendingLen+seg.Len() > maxBlockSize {
+ if err := flush(pending); err != nil {
+ return err
+ }
+ pending = nil
+ pendingLen = 0
+ }
+ pending = append(pending, shortBlock{fn, idx})
+ pendingLen += seg.Len()
+ }
}
- return nil, os.ErrNotExist
+ return flush(pending)
}
-func (c *collectionFS) Open(name string) (http.File, error) {
- // Ensure name looks the way it does in a manifest.
- name = canonicalName(name)
+func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
+ dn.Lock()
+ defer dn.Unlock()
+ return dn.marshalManifest(prefix)
+}
- m := manifest.Manifest{Text: c.collection.ManifestText}
+// caller must have read lock.
+func (dn *dirnode) marshalManifest(prefix string) (string, error) {
+ var streamLen int64
+ type filepart struct {
+ name string
+ offset int64
+ length int64
+ }
+ var fileparts []filepart
+ var subdirs string
+ var blocks []string
- // Return a file if it exists.
- if size, ok := c.fileSizes()[name]; ok {
- reader, err := c.kc.ManifestFileReader(m, name)
- if err != nil {
- return nil, err
+ if err := dn.sync(); err != nil {
+ return "", err
+ }
+
+ names := make([]string, 0, len(dn.inodes))
+ for name, node := range dn.inodes {
+ names = append(names, name)
+ node.Lock()
+ defer node.Unlock()
+ }
+ sort.Strings(names)
+
+ for _, name := range names {
+ switch node := dn.inodes[name].(type) {
+ case *dirnode:
+ subdir, err := node.marshalManifest(prefix + "/" + name)
+ if err != nil {
+ return "", err
+ }
+ subdirs = subdirs + subdir
+ case *filenode:
+ if len(node.segments) == 0 {
+ fileparts = append(fileparts, filepart{name: name})
+ break
+ }
+ for _, seg := range node.segments {
+ switch seg := seg.(type) {
+ case storedSegment:
+ if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
+ streamLen -= int64(seg.size)
+ } else {
+ blocks = append(blocks, seg.locator)
+ }
+ next := filepart{
+ name: name,
+ offset: streamLen + int64(seg.offset),
+ length: int64(seg.length),
+ }
+ if prev := len(fileparts) - 1; prev >= 0 &&
+ fileparts[prev].name == name &&
+ fileparts[prev].offset+fileparts[prev].length == next.offset {
+ fileparts[prev].length += next.length
+ } else {
+ fileparts = append(fileparts, next)
+ }
+ streamLen += int64(seg.size)
+ default:
+ // This can't happen: we
+ // haven't unlocked since
+ // calling sync().
+ panic(fmt.Sprintf("can't marshal segment type %T", seg))
+ }
+ }
+ default:
+ panic(fmt.Sprintf("can't marshal inode type %T", node))
+ }
+ }
+ var filetokens []string
+ for _, s := range fileparts {
+ filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+ }
+ if len(filetokens) == 0 {
+ return subdirs, nil
+ } else if len(blocks) == 0 {
+ blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+ }
+ return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+}
+
+func (dn *dirnode) loadManifest(txt string) error {
+ var dirname string
+ streams := strings.Split(txt, "\n")
+ if streams[len(streams)-1] != "" {
+ return fmt.Errorf("line %d: no trailing newline", len(streams))
+ }
+ streams = streams[:len(streams)-1]
+ segments := []storedSegment{}
+ for i, stream := range streams {
+ lineno := i + 1
+ var anyFileTokens bool
+ var pos int64
+ var segIdx int
+ segments = segments[:0]
+ for i, token := range strings.Split(stream, " ") {
+ if i == 0 {
+ dirname = manifestUnescape(token)
+ continue
+ }
+ if !strings.Contains(token, ":") {
+ if anyFileTokens {
+ return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+ }
+ toks := strings.SplitN(token, "+", 3)
+ if len(toks) < 2 {
+ return fmt.Errorf("line %d: bad locator %q", lineno, token)
+ }
+ length, err := strconv.ParseInt(toks[1], 10, 32)
+ if err != nil || length < 0 {
+ return fmt.Errorf("line %d: bad locator %q", lineno, token)
+ }
+ segments = append(segments, storedSegment{
+ locator: token,
+ size: int(length),
+ offset: 0,
+ length: int(length),
+ })
+ continue
+ } else if len(segments) == 0 {
+ return fmt.Errorf("line %d: bad locator %q", lineno, token)
+ }
+
+ toks := strings.Split(token, ":")
+ if len(toks) != 3 {
+ return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+ }
+ anyFileTokens = true
+
+ offset, err := strconv.ParseInt(toks[0], 10, 64)
+ if err != nil || offset < 0 {
+ return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+ }
+ length, err := strconv.ParseInt(toks[1], 10, 64)
+ if err != nil || length < 0 {
+ return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+ }
+ name := dirname + "/" + manifestUnescape(toks[2])
+ fnode, err := dn.createFileAndParents(name)
+ if err != nil {
+ return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+ }
+ // Map the stream offset/range coordinates to
+ // block/offset/range coordinates and add
+ // corresponding storedSegments to the filenode
+ if pos > offset {
+ // Can't continue where we left off.
+ // TODO: binary search instead of
+ // rewinding all the way (but this
+ // situation might be rare anyway)
+ segIdx, pos = 0, 0
+ }
+ for next := int64(0); segIdx < len(segments); segIdx++ {
+ seg := segments[segIdx]
+ next = pos + int64(seg.Len())
+ if next <= offset || seg.Len() == 0 {
+ pos = next
+ continue
+ }
+ if pos >= offset+length {
+ break
+ }
+ var blkOff int
+ if pos < offset {
+ blkOff = int(offset - pos)
+ }
+ blkLen := seg.Len() - blkOff
+ if pos+int64(blkOff+blkLen) > offset+length {
+ blkLen = int(offset + length - pos - int64(blkOff))
+ }
+ fnode.appendSegment(storedSegment{
+ kc: dn.kc,
+ locator: seg.locator,
+ size: seg.size,
+ offset: blkOff,
+ length: blkLen,
+ })
+ if next > offset+length {
+ break
+ } else {
+ pos = next
+ }
+ }
+ if segIdx == len(segments) && pos < offset+length {
+ return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
+ }
+ }
+ if !anyFileTokens {
+ return fmt.Errorf("line %d: no file segments", lineno)
+ } else if len(segments) == 0 {
+ return fmt.Errorf("line %d: no locators", lineno)
+ } else if dirname == "" {
+ return fmt.Errorf("line %d: no stream name", lineno)
+ }
+ }
+ return nil
+}
+
+// only safe to call from loadManifest -- no locking
+func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
+ names := strings.Split(path, "/")
+ basename := names[len(names)-1]
+ if basename == "" || basename == "." || basename == ".." {
+ err = fmt.Errorf("invalid filename")
+ return
+ }
+ for _, name := range names[:len(names)-1] {
+ switch name {
+ case "", ".":
+ case "..":
+ dn = dn.parent
+ default:
+ switch node := dn.inodes[name].(type) {
+ case nil:
+ dn = dn.newDirnode(name, 0755, dn.fileinfo.modTime)
+ case *dirnode:
+ dn = node
+ case *filenode:
+ err = ErrFileExists
+ return
+ }
+ }
+ }
+ switch node := dn.inodes[basename].(type) {
+ case nil:
+ fn = dn.newFilenode(basename, 0755, dn.fileinfo.modTime)
+ case *filenode:
+ fn = node
+ case *dirnode:
+ err = ErrIsDirectory
+ }
+ return
+}
+
+func (dn *dirnode) mkdir(name string) (*filehandle, error) {
+ return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
+}
+
+func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
+ f, err := dn.mkdir(name)
+ if err == nil {
+ err = f.Close()
+ }
+ return err
+}
+
+func (dn *dirnode) Remove(name string) error {
+ return dn.remove(strings.TrimRight(name, "/"), false)
+}
+
+func (dn *dirnode) RemoveAll(name string) error {
+ err := dn.remove(strings.TrimRight(name, "/"), true)
+ if os.IsNotExist(err) {
+ // "If the path does not exist, RemoveAll returns
+ // nil." (see "os" pkg)
+ err = nil
+ }
+ return err
+}
+
+func (dn *dirnode) remove(name string, recursive bool) error {
+ dirname, name := path.Split(name)
+ if name == "" || name == "." || name == ".." {
+ return ErrInvalidArgument
+ }
+ dn, ok := dn.lookupPath(dirname).(*dirnode)
+ if !ok {
+ return os.ErrNotExist
+ }
+ dn.Lock()
+ defer dn.Unlock()
+ switch node := dn.inodes[name].(type) {
+ case nil:
+ return os.ErrNotExist
+ case *dirnode:
+ node.RLock()
+ defer node.RUnlock()
+ if !recursive && len(node.inodes) > 0 {
+ return ErrDirectoryNotEmpty
}
- return &collectionFile{
- File: reader,
- collection: c.collection,
- name: path.Base(name),
- size: size,
- }, nil
}
+ delete(dn.inodes, name)
+ return nil
+}
+
+func (dn *dirnode) Rename(oldname, newname string) error {
+ olddir, oldname := path.Split(oldname)
+ if oldname == "" || oldname == "." || oldname == ".." {
+ return ErrInvalidArgument
+ }
+ olddirf, err := dn.OpenFile(olddir+".", os.O_RDONLY, 0)
+ if err != nil {
+ return fmt.Errorf("%q: %s", olddir, err)
+ }
+ defer olddirf.Close()
+ newdir, newname := path.Split(newname)
+ if newname == "." || newname == ".." {
+ return ErrInvalidArgument
+ } else if newname == "" {
+ // Rename("a/b", "c/") means Rename("a/b", "c/b")
+ newname = oldname
+ }
+ newdirf, err := dn.OpenFile(newdir+".", os.O_RDONLY, 0)
+ if err != nil {
+ return fmt.Errorf("%q: %s", newdir, err)
+ }
+ defer newdirf.Close()
+
+ // When acquiring locks on multiple nodes, all common
+ // ancestors must be locked first in order to avoid
+ // deadlock. This is assured by locking the path from root to
+ // newdir, then locking the path from root to olddir, skipping
+ // any already-locked nodes.
+ needLock := []sync.Locker{}
+ for _, f := range []*filehandle{olddirf, newdirf} {
+ node := f.inode
+ needLock = append(needLock, node)
+ for node.Parent() != node {
+ node = node.Parent()
+ needLock = append(needLock, node)
+ }
+ }
+ locked := map[sync.Locker]bool{}
+ for i := len(needLock) - 1; i >= 0; i-- {
+ if n := needLock[i]; !locked[n] {
+ n.Lock()
+ defer n.Unlock()
+ locked[n] = true
+ }
+ }
+
+ olddn := olddirf.inode.(*dirnode)
+ newdn := newdirf.inode.(*dirnode)
+ oldinode, ok := olddn.inodes[oldname]
+ if !ok {
+ return os.ErrNotExist
+ }
+ if existing, ok := newdn.inodes[newname]; ok {
+ // overwriting an existing file or dir
+ if dn, ok := existing.(*dirnode); ok {
+ if !oldinode.Stat().IsDir() {
+ return ErrIsDirectory
+ }
+ dn.RLock()
+ defer dn.RUnlock()
+ if len(dn.inodes) > 0 {
+ return ErrDirectoryNotEmpty
+ }
+ }
+ } else {
+ if newdn.inodes == nil {
+ newdn.inodes = make(map[string]inode)
+ }
+ newdn.fileinfo.size++
+ }
+ newdn.inodes[newname] = oldinode
+ switch n := oldinode.(type) {
+ case *dirnode:
+ n.parent = newdn
+ case *filenode:
+ n.parent = newdn
+ default:
+ panic(fmt.Sprintf("bad inode type %T", n))
+ }
+ delete(olddn.inodes, oldname)
+ olddn.fileinfo.size--
+ return nil
+}
+
+func (dn *dirnode) Parent() inode {
+ dn.RLock()
+ defer dn.RUnlock()
+ return dn.parent
+}
+
+func (dn *dirnode) Readdir() (fi []os.FileInfo) {
+ dn.RLock()
+ defer dn.RUnlock()
+ fi = make([]os.FileInfo, 0, len(dn.inodes))
+ for _, inode := range dn.inodes {
+ fi = append(fi, inode.Stat())
+ }
+ return
+}
- // Return a directory if it's the root dir or there are file
- // entries below it.
- children := map[string]collectionDirent{}
- for fnm, size := range c.fileSizes() {
- if !strings.HasPrefix(fnm, name+"/") {
+func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (dn *dirnode) Size() int64 {
+ dn.RLock()
+ defer dn.RUnlock()
+ return dn.fileinfo.Size()
+}
+
+func (dn *dirnode) Stat() os.FileInfo {
+ dn.RLock()
+ defer dn.RUnlock()
+ return dn.fileinfo
+}
+
+func (dn *dirnode) Truncate(int64) error {
+ return ErrInvalidOperation
+}
+
+// lookupPath returns the inode for the file/directory with the given
+// name (which may contain "/" separators), along with its parent
+// node. If no such file/directory exists, the returned node is nil.
+func (dn *dirnode) lookupPath(path string) (node inode) {
+ node = dn
+ for _, name := range strings.Split(path, "/") {
+ dn, ok := node.(*dirnode)
+ if !ok {
+ return nil
+ }
+ if name == "." || name == "" {
continue
}
- isDir := false
- ent := fnm[len(name)+1:]
- if i := strings.Index(ent, "/"); i >= 0 {
- ent = ent[:i]
- isDir = true
+ if name == ".." {
+ node = node.Parent()
+ continue
}
- e := children[ent]
- e.collection = c.collection
- e.isDir = isDir
- e.name = ent
- e.size = size
- children[ent] = e
+ dn.RLock()
+ node = dn.inodes[name]
+ dn.RUnlock()
+ }
+ return
+}
+
+func (dn *dirnode) newDirnode(name string, perm os.FileMode, modTime time.Time) *dirnode {
+ child := &dirnode{
+ parent: dn,
+ client: dn.client,
+ kc: dn.kc,
+ fileinfo: fileinfo{
+ name: name,
+ mode: os.ModeDir | perm,
+ modTime: modTime,
+ },
+ }
+ if dn.inodes == nil {
+ dn.inodes = make(map[string]inode)
+ }
+ dn.inodes[name] = child
+ dn.fileinfo.size++
+ return child
+}
+
+func (dn *dirnode) newFilenode(name string, perm os.FileMode, modTime time.Time) *filenode {
+ child := &filenode{
+ parent: dn,
+ fileinfo: fileinfo{
+ name: name,
+ mode: perm,
+ modTime: modTime,
+ },
+ }
+ if dn.inodes == nil {
+ dn.inodes = make(map[string]inode)
}
- if len(children) == 0 && name != "." {
+ dn.inodes[name] = child
+ dn.fileinfo.size++
+ return child
+}
+
+// OpenFile is analogous to os.OpenFile().
+func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
+ if flag&os.O_SYNC != 0 {
+ return nil, ErrSyncNotSupported
+ }
+ dirname, name := path.Split(name)
+ dn, ok := dn.lookupPath(dirname).(*dirnode)
+ if !ok {
return nil, os.ErrNotExist
}
- dirents := make([]os.FileInfo, 0, len(children))
- for _, ent := range children {
- dirents = append(dirents, ent)
+ var readable, writable bool
+ switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
+ case os.O_RDWR:
+ readable = true
+ writable = true
+ case os.O_RDONLY:
+ readable = true
+ case os.O_WRONLY:
+ writable = true
+ default:
+ return nil, fmt.Errorf("invalid flags 0x%x", flag)
+ }
+ if !writable {
+ // A directory can be opened via "foo/", "foo/.", or
+ // "foo/..".
+ switch name {
+ case ".", "":
+ return &filehandle{inode: dn}, nil
+ case "..":
+ return &filehandle{inode: dn.Parent()}, nil
+ }
+ }
+ createMode := flag&os.O_CREATE != 0
+ if createMode {
+ dn.Lock()
+ defer dn.Unlock()
+ } else {
+ dn.RLock()
+ defer dn.RUnlock()
}
- return &collectionDir{
- collection: c.collection,
- stream: name,
- dirents: dirents,
+ n, ok := dn.inodes[name]
+ if !ok {
+ if !createMode {
+ return nil, os.ErrNotExist
+ }
+ if perm.IsDir() {
+ n = dn.newDirnode(name, 0755, time.Now())
+ } else {
+ n = dn.newFilenode(name, 0755, time.Now())
+ }
+ } else if flag&os.O_EXCL != 0 {
+ return nil, ErrFileExists
+ } else if flag&os.O_TRUNC != 0 {
+ if !writable {
+ return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
+ } else if fn, ok := n.(*filenode); !ok {
+ return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
+ } else {
+ fn.Truncate(0)
+ }
+ }
+ return &filehandle{
+ inode: n,
+ append: flag&os.O_APPEND != 0,
+ readable: readable,
+ writable: writable,
}, nil
}
-// fileSizes returns a map of files that can be opened. Each key
-// starts with "./".
-func (c *collectionFS) fileSizes() map[string]int64 {
- c.sizesOnce.Do(func() {
- c.sizes = map[string]int64{}
- m := manifest.Manifest{Text: c.collection.ManifestText}
- for ms := range m.StreamIter() {
- for _, fss := range ms.FileStreamSegments {
- c.sizes[ms.StreamName+"/"+fss.Name] += int64(fss.SegLen)
- }
+type segment interface {
+ io.ReaderAt
+ Len() int
+ // Return a new segment with a subsection of the data from this
+ // one. length<0 means length=Len()-off.
+ Slice(off int, length int) segment
+}
+
+type memSegment struct {
+ buf []byte
+}
+
+func (me *memSegment) Len() int {
+ return len(me.buf)
+}
+
+func (me *memSegment) Slice(off, length int) segment {
+ if length < 0 {
+ length = len(me.buf) - off
+ }
+ buf := make([]byte, length)
+ copy(buf, me.buf[off:])
+ return &memSegment{buf: buf}
+}
+
+func (me *memSegment) Truncate(n int) {
+ if n > cap(me.buf) {
+ newsize := 1024
+ for newsize < n {
+ newsize = newsize << 2
+ }
+ newbuf := make([]byte, n, newsize)
+ copy(newbuf, me.buf)
+ me.buf = newbuf
+ } else {
+ // Zero unused part when shrinking, in case we grow
+ // and start using it again later.
+ for i := n; i < len(me.buf); i++ {
+ me.buf[i] = 0
+ }
+ }
+ me.buf = me.buf[:n]
+}
+
+func (me *memSegment) WriteAt(p []byte, off int) {
+ if off+len(p) > len(me.buf) {
+ panic("overflowed segment")
+ }
+ copy(me.buf[off:], p)
+}
+
+func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
+ if off > int64(me.Len()) {
+ err = io.EOF
+ return
+ }
+ n = copy(p, me.buf[int(off):])
+ if n < len(p) {
+ err = io.EOF
+ }
+ return
+}
+
+type storedSegment struct {
+ kc keepClient
+ locator string
+ size int // size of stored block (also encoded in locator)
+ offset int // position of segment within the stored block
+ length int // bytes in this segment (offset + length <= size)
+}
+
+func (se storedSegment) Len() int {
+ return se.length
+}
+
+func (se storedSegment) Slice(n, size int) segment {
+ se.offset += n
+ se.length -= n
+ if size >= 0 && se.length > size {
+ se.length = size
+ }
+ return se
+}
+
+func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
+ if off > int64(se.length) {
+ return 0, io.EOF
+ }
+ maxlen := se.length - int(off)
+ if len(p) > maxlen {
+ p = p[:maxlen]
+ n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
+ if err == nil {
+ err = io.EOF
}
- })
- return c.sizes
+ return
+ }
+ return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
}
func canonicalName(name string) string {
}
return name
}
+
+var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
+
+func manifestUnescapeFunc(seq string) string {
+ if seq == `\\` {
+ return `\`
+ }
+ i, err := strconv.ParseUint(seq[1:], 8, 8)
+ if err != nil {
+ // Invalid escape sequence: can't unescape.
+ return seq
+ }
+ return string([]byte{byte(i)})
+}
+
+func manifestUnescape(s string) string {
+ return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
+}
+
+var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
+
+func manifestEscapeFunc(seq string) string {
+ return fmt.Sprintf("\\%03o", byte(seq[0]))
+}
+
+func manifestEscape(s string) string {
+ return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
+}
package arvados
import (
+ "bytes"
+ "crypto/md5"
+ "errors"
+ "fmt"
"io"
+ "io/ioutil"
+ "math/rand"
"net/http"
"os"
+ "regexp"
+ "runtime"
+ "sync"
"testing"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
check "gopkg.in/check.v1"
var _ = check.Suite(&CollectionFSSuite{})
+type keepClientStub struct {
+ blocks map[string][]byte
+ sync.RWMutex
+}
+
+var errStub404 = errors.New("404 block not found")
+
+func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
+ kcs.RLock()
+ defer kcs.RUnlock()
+ buf := kcs.blocks[locator[:32]]
+ if buf == nil {
+ return 0, errStub404
+ }
+ return copy(p, buf[off:]), nil
+}
+
+func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
+ locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
+ buf := make([]byte, len(p))
+ copy(buf, p)
+ kcs.Lock()
+ defer kcs.Unlock()
+ kcs.blocks[locator[:32]] = buf
+ return locator, 1, nil
+}
+
type CollectionFSSuite struct {
client *Client
coll Collection
- fs http.FileSystem
+ fs CollectionFileSystem
+ kc keepClient
}
func (s *CollectionFSSuite) SetUpTest(c *check.C) {
s.client = NewClientFromEnv()
err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+arvadostest.FooAndBarFilesInDirUUID, nil, nil)
c.Assert(err, check.IsNil)
- s.fs = s.coll.FileSystem(s.client, nil)
+ s.kc = &keepClientStub{
+ blocks: map[string][]byte{
+ "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
+ }}
+ s.fs, err = s.coll.FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+}
+
+func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
+ _, ok := s.fs.(http.FileSystem)
+ c.Check(ok, check.Equals, true)
}
func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
}
fis, err = f.Readdir(1)
- c.Check(err, check.Equals, io.EOF)
+ c.Check(err, check.IsNil)
c.Check(len(fis), check.Equals, 1)
if len(fis) > 0 {
c.Check(fis[0].Size(), check.Equals, int64(3))
c.Assert(err, check.IsNil)
fis, err = f.Readdir(2)
c.Check(len(fis), check.Equals, 1)
- c.Assert(err, check.Equals, io.EOF)
+ c.Assert(err, check.IsNil)
fis, err = f.Readdir(2)
c.Check(len(fis), check.Equals, 0)
c.Assert(err, check.Equals, io.EOF)
}
}
-func (s *CollectionFSSuite) TestOpenFile(c *check.C) {
- c.Skip("cannot test files with nil keepclient")
+func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
+ f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ st, err := f.Stat()
+ c.Assert(err, check.IsNil)
+ c.Check(st.Size(), check.Equals, int64(3))
+ n, err := f.Write([]byte("bar"))
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, ErrReadOnlyFile)
+}
+
+func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
+ f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ st, err := f.Stat()
+ c.Assert(err, check.IsNil)
+ c.Check(st.Size(), check.Equals, int64(0))
+
+ n, err := f.Write([]byte("bar"))
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+
+ c.Check(f.Close(), check.IsNil)
+
+ f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
+ c.Check(f, check.IsNil)
+ c.Assert(err, check.NotNil)
+
+ f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ st, err = f.Stat()
+ c.Assert(err, check.IsNil)
+ c.Check(st.Size(), check.Equals, int64(3))
+
+ c.Check(f.Close(), check.IsNil)
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
+}
+
+func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
+ maxBlockSize = 8
+ defer func() { maxBlockSize = 2 << 26 }()
- f, err := s.fs.Open("/foo.txt")
+ f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
c.Assert(err, check.IsNil)
+ defer f.Close()
st, err := f.Stat()
c.Assert(err, check.IsNil)
c.Check(st.Size(), check.Equals, int64(3))
+
+ f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f2.Close()
+
+ buf := make([]byte, 64)
+ n, err := f.Read(buf)
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.Equals, io.EOF)
+ c.Check(string(buf[:3]), check.DeepEquals, "foo")
+
+ pos, err := f.Seek(-2, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(1))
+ c.Check(err, check.IsNil)
+
+ // Split a storedExtent in two, and insert a memExtent
+ n, err = f.Write([]byte("*"))
+ c.Check(n, check.Equals, 1)
+ c.Check(err, check.IsNil)
+
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(2))
+ c.Check(err, check.IsNil)
+
+ pos, err = f.Seek(0, io.SeekStart)
+ c.Check(pos, check.Equals, int64(0))
+ c.Check(err, check.IsNil)
+
+ rbuf, err := ioutil.ReadAll(f)
+ c.Check(len(rbuf), check.Equals, 3)
+ c.Check(err, check.IsNil)
+ c.Check(string(rbuf), check.Equals, "f*o")
+
+ // Write multiple blocks in one call
+ f.Seek(1, io.SeekStart)
+ n, err = f.Write([]byte("0123456789abcdefg"))
+ c.Check(n, check.Equals, 17)
+ c.Check(err, check.IsNil)
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(18))
+ pos, err = f.Seek(-18, io.SeekCurrent)
+ c.Check(err, check.IsNil)
+ n, err = io.ReadFull(f, buf)
+ c.Check(n, check.Equals, 18)
+ c.Check(err, check.Equals, io.ErrUnexpectedEOF)
+ c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
+
+ buf2, err := ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+ // truncate to current size
+ err = f.Truncate(18)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+ // shrink to zero some data
+ f.Truncate(15)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "f0123456789abcd")
+
+ // grow to partial block/extent
+ f.Truncate(20)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
+
+ f.Truncate(0)
+ f2.Seek(0, io.SeekStart)
+ f2.Write([]byte("12345678abcdefghijkl"))
+
+ // grow to block/extent boundary
+ f.Truncate(64)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(len(buf2), check.Equals, 64)
+ c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
+
+ // shrink to block/extent boundary
+ err = f.Truncate(32)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(len(buf2), check.Equals, 32)
+ c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
+
+ // shrink to partial block/extent
+ err = f.Truncate(15)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "12345678abcdefg")
+ c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
+
+ // Force flush to ensure the block "12345678" gets stored, so
+ // we know what to expect in the final manifest below.
+ _, err = s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+
+ // Truncate to size=3 while f2's ptr is at 15
+ err = f.Truncate(3)
+ c.Check(err, check.IsNil)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "")
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "123")
+ c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+ c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
+}
+
+func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+
+ checkSize := func(size int64) {
+ fi, err := f.Stat()
+ c.Check(fi.Size(), check.Equals, size)
+
+ f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ fi, err = f.Stat()
+ c.Check(fi.Size(), check.Equals, size)
+ pos, err := f.Seek(0, io.SeekEnd)
+ c.Check(pos, check.Equals, size)
+ }
+
+ f.Seek(2, io.SeekEnd)
+ checkSize(0)
+ f.Write([]byte{1})
+ checkSize(3)
+
+ f.Seek(2, io.SeekCurrent)
+ checkSize(3)
+ f.Write([]byte{})
+ checkSize(5)
+
+ f.Seek(8, io.SeekStart)
+ checkSize(5)
+ n, err := f.Read(make([]byte, 1))
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+ checkSize(5)
+ f.Write([]byte{1, 2, 3})
+ checkSize(11)
+}
+
+func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
+ maxBlockSize = 8
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ var err error
+ s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ for _, name := range []string{"foo", "bar", "baz"} {
+ f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ f.Write([]byte(name))
+ f.Close()
+ }
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+ c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
+}
+
+func (s *CollectionFSSuite) TestMkdir(c *check.C) {
+ err := s.fs.Mkdir("foo/bar", 0755)
+ c.Check(err, check.Equals, os.ErrNotExist)
+
+ f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
+ c.Check(err, check.Equals, os.ErrNotExist)
+
+ err = s.fs.Mkdir("foo", 0755)
+ c.Check(err, check.IsNil)
+
+ f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
+ c.Check(err, check.IsNil)
+ if err == nil {
+ defer f.Close()
+ f.Write([]byte("foo"))
+ }
+
+ // mkdir fails if a file already exists with that name
+ err = s.fs.Mkdir("foo/bar", 0755)
+ c.Check(err, check.NotNil)
+
+ err = s.fs.Remove("foo/bar")
+ c.Check(err, check.IsNil)
+
+ // mkdir succeds after the file is deleted
+ err = s.fs.Mkdir("foo/bar", 0755)
+ c.Check(err, check.IsNil)
+
+ // creating a file in a nonexistent subdir should still fail
+ f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
+ c.Check(err, check.Equals, os.ErrNotExist)
+
+ f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
+ c.Check(err, check.IsNil)
+ if err == nil {
+ defer f.Close()
+ f.Write([]byte("foo"))
+ }
+
+ // creating foo/bar as a regular file should fail
+ f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
+ c.Check(err, check.NotNil)
+
+ // creating foo/bar as a directory should fail
+ f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
+ c.Check(err, check.NotNil)
+ err = s.fs.Mkdir("foo/bar", 0755)
+ c.Check(err, check.NotNil)
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+ c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
+}
+
+func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
+ maxBlockSize = 8
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ var wg sync.WaitGroup
+ for n := 0; n < 128; n++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ for i := 0; i < 6502; i++ {
+ switch rand.Int() & 3 {
+ case 0:
+ f.Truncate(int64(rand.Intn(64)))
+ case 1:
+ f.Seek(int64(rand.Intn(64)), io.SeekStart)
+ case 2:
+ _, err := f.Write([]byte("beep boop"))
+ c.Check(err, check.IsNil)
+ case 3:
+ _, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ }
+ }
+ }()
+ }
+ wg.Wait()
+
+ f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ buf, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
+}
+
+func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
+ maxBlockSize = 40
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ var err error
+ s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ const nfiles = 256
+ const ngoroutines = 256
+
+ var wg sync.WaitGroup
+ for n := 0; n < nfiles; n++ {
+ wg.Add(1)
+ go func(n int) {
+ defer wg.Done()
+ expect := make([]byte, 0, 64)
+ wbytes := []byte("there's no simple explanation for anything important that any of us do")
+ f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ for i := 0; i < ngoroutines; i++ {
+ trunc := rand.Intn(65)
+ woff := rand.Intn(trunc + 1)
+ wbytes = wbytes[:rand.Intn(64-woff+1)]
+ for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
+ buf[i] = 0
+ }
+ expect = expect[:trunc]
+ if trunc < woff+len(wbytes) {
+ expect = expect[:woff+len(wbytes)]
+ }
+ copy(expect[woff:], wbytes)
+ f.Truncate(int64(trunc))
+ pos, err := f.Seek(int64(woff), io.SeekStart)
+ c.Check(pos, check.Equals, int64(woff))
+ c.Check(err, check.IsNil)
+ n, err := f.Write(wbytes)
+ c.Check(n, check.Equals, len(wbytes))
+ c.Check(err, check.IsNil)
+ pos, err = f.Seek(0, io.SeekStart)
+ c.Check(pos, check.Equals, int64(0))
+ c.Check(err, check.IsNil)
+ buf, err := ioutil.ReadAll(f)
+ c.Check(string(buf), check.Equals, string(expect))
+ c.Check(err, check.IsNil)
+ }
+ s.checkMemSize(c, f)
+ }(n)
+ }
+ wg.Wait()
+
+ root, err := s.fs.Open("/")
+ c.Assert(err, check.IsNil)
+ defer root.Close()
+ fi, err := root.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Check(len(fi), check.Equals, nfiles)
+
+ _, err = s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ // TODO: check manifest content
+}
+
+func (s *CollectionFSSuite) TestRemove(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("dir0", 0755)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("dir1", 0755)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("dir1/dir2", 0755)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("dir1/dir3", 0755)
+ c.Assert(err, check.IsNil)
+
+ err = fs.Remove("dir0")
+ c.Check(err, check.IsNil)
+ err = fs.Remove("dir0")
+ c.Check(err, check.Equals, os.ErrNotExist)
+
+ err = fs.Remove("dir1/dir2/.")
+ c.Check(err, check.Equals, ErrInvalidArgument)
+ err = fs.Remove("dir1/dir2/..")
+ c.Check(err, check.Equals, ErrInvalidArgument)
+ err = fs.Remove("dir1")
+ c.Check(err, check.Equals, ErrDirectoryNotEmpty)
+ err = fs.Remove("dir1/dir2/../../../dir1")
+ c.Check(err, check.Equals, ErrDirectoryNotEmpty)
+ err = fs.Remove("dir1/dir3/")
+ c.Check(err, check.IsNil)
+ err = fs.RemoveAll("dir1")
+ c.Check(err, check.IsNil)
+ err = fs.RemoveAll("dir1")
+ c.Check(err, check.IsNil)
+}
+
+func (s *CollectionFSSuite) TestRename(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ const (
+ outer = 16
+ inner = 16
+ )
+ for i := 0; i < outer; i++ {
+ err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
+ c.Assert(err, check.IsNil)
+ for j := 0; j < inner; j++ {
+ err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
+ c.Assert(err, check.IsNil)
+ for _, fnm := range []string{
+ fmt.Sprintf("dir%d/file%d", i, j),
+ fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
+ } {
+ f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte("beep"))
+ c.Assert(err, check.IsNil)
+ f.Close()
+ }
+ }
+ }
+ var wg sync.WaitGroup
+ for i := 0; i < outer; i++ {
+ for j := 0; j < inner; j++ {
+ wg.Add(1)
+ go func(i, j int) {
+ defer wg.Done()
+ oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
+ newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
+ _, err := fs.Open(newname)
+ c.Check(err, check.Equals, os.ErrNotExist)
+ err = fs.Rename(oldname, newname)
+ c.Check(err, check.IsNil)
+ f, err := fs.Open(newname)
+ c.Check(err, check.IsNil)
+ f.Close()
+ }(i, j)
+
+ wg.Add(1)
+ go func(i, j int) {
+ defer wg.Done()
+ // oldname does not exist
+ err := fs.Rename(
+ fmt.Sprintf("dir%d/dir%d/missing", i, j),
+ fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
+ c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+ // newname parent dir does not exist
+ err = fs.Rename(
+ fmt.Sprintf("dir%d/dir%d", i, j),
+ fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
+ c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+ // oldname parent dir is a file
+ err = fs.Rename(
+ fmt.Sprintf("dir%d/file%d/patherror", i, j),
+ fmt.Sprintf("dir%d/irrelevant", i))
+ c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+ // newname parent dir is a file
+ err = fs.Rename(
+ fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
+ fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
+ c.Check(err, check.ErrorMatches, `.*does not exist`)
+ }(i, j)
+ }
+ }
+ wg.Wait()
+
+ f, err := fs.OpenFile("dir1/newfile3", 0, 0)
+ c.Assert(err, check.IsNil)
+ c.Check(f.Size(), check.Equals, int64(4))
+ buf, err := ioutil.ReadAll(f)
+ c.Check(buf, check.DeepEquals, []byte("beep"))
+ c.Check(err, check.IsNil)
+ _, err = fs.Open("dir1/dir1/file1")
+ c.Check(err, check.Equals, os.ErrNotExist)
+}
+
+func (s *CollectionFSSuite) TestPersist(c *check.C) {
+ maxBlockSize = 1024
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ var err error
+ s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ err = s.fs.Mkdir("d:r", 0755)
+ c.Assert(err, check.IsNil)
+
+ expect := map[string][]byte{}
+
+ var wg sync.WaitGroup
+ for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
+ buf := make([]byte, 500)
+ rand.Read(buf)
+ expect[name] = buf
+
+ f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ // Note: we don't close the file until after the test
+ // is done. Writes to unclosed files should persist.
+ defer f.Close()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i := 0; i < len(buf); i += 5 {
+ _, err := f.Write(buf[i : i+5])
+ c.Assert(err, check.IsNil)
+ }
+ }()
+ }
+ wg.Wait()
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ c.Logf("%q", m)
+
+ root, err := s.fs.Open("/")
+ c.Assert(err, check.IsNil)
+ defer root.Close()
+ fi, err := root.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Check(len(fi), check.Equals, 4)
+
+ persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ root, err = persisted.Open("/")
+ c.Assert(err, check.IsNil)
+ defer root.Close()
+ fi, err = root.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Check(len(fi), check.Equals, 4)
+
+ for name, content := range expect {
+ c.Logf("read %q", name)
+ f, err := persisted.Open(name)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ buf, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ c.Check(buf, check.DeepEquals, content)
+ }
+}
+
+func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
+ var err error
+ s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} {
+ err = s.fs.Mkdir(name, 0755)
+ c.Assert(err, check.IsNil)
+ }
+
+ expect := map[string][]byte{
+ "0": nil,
+ "00": []byte{},
+ "one": []byte{1},
+ "dir/0": nil,
+ "dir/two": []byte{1, 2},
+ "dir/zero": nil,
+ "dir/zerodir/zero": nil,
+ "zero/zero/zero": nil,
+ }
+ for name, data := range expect {
+ f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ if data != nil {
+ _, err := f.Write(data)
+ c.Assert(err, check.IsNil)
+ }
+ f.Close()
+ }
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ c.Logf("%q", m)
+
+ persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ for name, data := range expect {
+ f, err := persisted.Open("bogus-" + name)
+ c.Check(err, check.NotNil)
+
+ f, err = persisted.Open(name)
+ c.Assert(err, check.IsNil)
+
+ if data == nil {
+ data = []byte{}
+ }
+ buf, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ c.Check(buf, check.DeepEquals, data)
+ }
+}
+
+func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
+ c.Check(f, check.IsNil)
+ c.Check(err, check.ErrorMatches, `file does not exist`)
+
+ f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ n, err := f.Write([]byte{1, 2, 3})
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.ErrorMatches, `read-only file`)
+ n, err = f.Read(make([]byte, 1))
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+ f, err = fs.OpenFile("new", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ _, err = f.Write([]byte{4, 5, 6})
+ c.Check(err, check.IsNil)
+ fi, err := f.Stat()
+ c.Assert(err, check.IsNil)
+ c.Check(fi.Size(), check.Equals, int64(3))
+
+ f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ pos, err := f.Seek(0, io.SeekEnd)
+ c.Check(pos, check.Equals, int64(0))
+ c.Check(err, check.IsNil)
+ fi, err = f.Stat()
+ c.Assert(err, check.IsNil)
+ c.Check(fi.Size(), check.Equals, int64(0))
+ fs.Remove("new")
+
+ buf := make([]byte, 64)
+ f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
+ c.Assert(err, check.IsNil)
+ f.Write([]byte{1, 2, 3})
+ f.Seek(0, io.SeekStart)
+ n, _ = f.Read(buf[:1])
+ c.Check(n, check.Equals, 1)
+ c.Check(buf[:1], check.DeepEquals, []byte{1})
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(1))
+ f.Write([]byte{4, 5, 6})
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(6))
+ f.Seek(0, io.SeekStart)
+ n, err = f.Read(buf)
+ c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
+ c.Check(err, check.Equals, io.EOF)
+ f.Close()
+
+ f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
+ c.Assert(err, check.IsNil)
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(0))
+ c.Check(err, check.IsNil)
+ f.Read(buf[:3])
+ pos, _ = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(3))
+ f.Write([]byte{7, 8, 9})
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(9))
+ f.Close()
+
+ f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
+ c.Assert(err, check.IsNil)
+ n, err = f.Write([]byte{3, 2, 1})
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+ pos, _ = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(3))
+ pos, _ = f.Seek(0, io.SeekStart)
+ c.Check(pos, check.Equals, int64(0))
+ n, err = f.Read(buf)
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
+ f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ n, _ = f.Read(buf)
+ c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
+
+ f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
+ c.Check(f, check.IsNil)
+ c.Check(err, check.NotNil)
+
+ f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
+ c.Check(f, check.IsNil)
+ c.Check(err, check.ErrorMatches, `invalid flag.*`)
+}
+
+func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+ maxBlockSize = 1024
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+
+ data := make([]byte, 500)
+ rand.Read(data)
+
+ for i := 0; i < 100; i++ {
+ n, err := f.Write(data)
+ c.Assert(n, check.Equals, len(data))
+ c.Assert(err, check.IsNil)
+ }
+
+ currentMemExtents := func() (memExtents []int) {
+ for idx, e := range f.(*filehandle).inode.(*filenode).segments {
+ switch e.(type) {
+ case *memSegment:
+ memExtents = append(memExtents, idx)
+ }
+ }
+ return
+ }
+ c.Check(currentMemExtents(), check.HasLen, 1)
+
+ m, err := fs.MarshalManifest(".")
+ c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
+ c.Check(err, check.IsNil)
+ c.Check(currentMemExtents(), check.HasLen, 0)
+}
+
+func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
+ for _, txt := range []string{
+ "\n",
+ ".\n",
+ ". \n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
+ ". 0:0:foo\n",
+ ". 0:0:foo\n",
+ ". 0:0:foo 0:0:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
+ "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
+ } {
+ c.Logf("<-%q", txt)
+ fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
+ c.Check(fs, check.IsNil)
+ c.Logf("-> %s", err)
+ c.Check(err, check.NotNil)
+ }
+}
+
+func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
+ for _, txt := range []string{
+ "",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
+ } {
+ c.Logf("<-%q", txt)
+ fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
+ c.Check(err, check.IsNil)
+ c.Check(fs, check.NotNil)
+ }
+}
+
+func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
+ fn := f.(*filehandle).inode.(*filenode)
+ var memsize int64
+ for _, seg := range fn.segments {
+ if e, ok := seg.(*memSegment); ok {
+ memsize += int64(len(e.buf))
+ }
+ }
+ c.Check(fn.memsize, check.Equals, memsize)
+}
+
+type CollectionFSUnitSuite struct{}
+
+var _ = check.Suite(&CollectionFSUnitSuite{})
+
+// expect ~2 seconds to load a manifest with 256K files
+func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
+ const (
+ dirCount = 512
+ fileCount = 512
+ )
+
+ mb := bytes.NewBuffer(make([]byte, 0, 40000000))
+ for i := 0; i < dirCount; i++ {
+ fmt.Fprintf(mb, "./dir%d", i)
+ for j := 0; j <= fileCount; j++ {
+ fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
+ }
+ for j := 0; j < fileCount; j++ {
+ fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
+ }
+ mb.Write([]byte{'\n'})
+ }
+ coll := Collection{ManifestText: mb.String()}
+ c.Logf("%s built", time.Now())
+
+ var memstats runtime.MemStats
+ runtime.ReadMemStats(&memstats)
+ c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
+
+ f, err := coll.FileSystem(nil, nil)
+ c.Check(err, check.IsNil)
+ c.Logf("%s loaded", time.Now())
+
+ for i := 0; i < dirCount; i++ {
+ for j := 0; j < fileCount; j++ {
+ f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
+ }
+ }
+ c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
+
+ runtime.ReadMemStats(&memstats)
+ c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
}
// Gocheck boilerplate
URL url.URL
StatusCode int
Status string
- errors []string
+ Errors []string
}
func (e TransactionError) Error() (s string) {
if e.Status != "" {
s = s + ": " + e.Status
}
- if len(e.errors) > 0 {
- s = s + ": " + strings.Join(e.errors, "; ")
+ if len(e.Errors) > 0 {
+ s = s + ": " + strings.Join(e.Errors, "; ")
}
return
}
var e TransactionError
if json.Unmarshal(buf, &e) != nil {
// No JSON-formatted error response
- e.errors = nil
+ e.Errors = nil
}
e.Method = req.Method
e.URL = *req.URL
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+ "bytes"
+ "io"
+ "sync"
+)
+
+// A Buffer is an io.Writer that distributes written data
+// asynchronously to multiple concurrent readers.
+//
+// NewReader() can be called at any time. In all cases, every returned
+// io.Reader reads all data written to the Buffer.
+//
+// Behavior is undefined if Write is called after Close or
+// CloseWithError.
+type Buffer interface {
+ io.WriteCloser
+
+ // NewReader() returns an io.Reader that reads all data
+ // written to the Buffer.
+ NewReader() io.Reader
+
+ // Close, but return the given error (instead of io.EOF) to
+ // all readers when they reach the end of the buffer.
+ //
+ // CloseWithError(nil) is equivalent to
+ // CloseWithError(io.EOF).
+ CloseWithError(error) error
+}
+
+type buffer struct {
+ data *bytes.Buffer
+ cond sync.Cond
+ err error // nil if there might be more writes
+}
+
+// NewBuffer creates a new Buffer using buf as its initial
+// contents. The new Buffer takes ownership of buf, and the caller
+// should not use buf after this call.
+func NewBuffer(buf []byte) Buffer {
+ return &buffer{
+ data: bytes.NewBuffer(buf),
+ cond: sync.Cond{L: &sync.Mutex{}},
+ }
+}
+
+func (b *buffer) Write(p []byte) (int, error) {
+ defer b.cond.Broadcast()
+ b.cond.L.Lock()
+ defer b.cond.L.Unlock()
+ if b.err != nil {
+ return 0, b.err
+ }
+ return b.data.Write(p)
+}
+
+func (b *buffer) Close() error {
+ return b.CloseWithError(nil)
+}
+
+func (b *buffer) CloseWithError(err error) error {
+ defer b.cond.Broadcast()
+ b.cond.L.Lock()
+ defer b.cond.L.Unlock()
+ if err == nil {
+ b.err = io.EOF
+ } else {
+ b.err = err
+ }
+ return nil
+}
+
+func (b *buffer) NewReader() io.Reader {
+ return &reader{b: b}
+}
+
+type reader struct {
+ b *buffer
+ read int // # bytes already read
+}
+
+func (r *reader) Read(p []byte) (int, error) {
+ r.b.cond.L.Lock()
+ for {
+ switch {
+ case r.read < r.b.data.Len():
+ buf := r.b.data.Bytes()
+ r.b.cond.L.Unlock()
+ n := copy(p, buf[r.read:])
+ r.read += n
+ return n, nil
+ case r.b.err != nil || len(p) == 0:
+ // r.b.err != nil means we reached EOF. And
+ // even if we're not at EOF, there's no need
+ // to block if len(p)==0.
+ err := r.b.err
+ r.b.cond.L.Unlock()
+ return 0, err
+ default:
+ r.b.cond.Wait()
+ }
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+ "crypto/md5"
+ "errors"
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestNoWrites(c *check.C) {
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ r2 := b.NewReader()
+ b.Close()
+ s.checkReader(c, r1, []byte{}, nil, nil)
+ s.checkReader(c, r2, []byte{}, nil, nil)
+}
+
+func (s *Suite) TestNoReaders(c *check.C) {
+ b := NewBuffer(nil)
+ n, err := b.Write([]byte("foobar"))
+ err2 := b.Close()
+ c.Check(n, check.Equals, 6)
+ c.Check(err, check.IsNil)
+ c.Check(err2, check.IsNil)
+}
+
+func (s *Suite) TestWriteReadClose(c *check.C) {
+ done := make(chan bool, 2)
+ b := NewBuffer(nil)
+ n, err := b.Write([]byte("foobar"))
+ c.Check(n, check.Equals, 6)
+ c.Check(err, check.IsNil)
+ r1 := b.NewReader()
+ r2 := b.NewReader()
+ go s.checkReader(c, r1, []byte("foobar"), nil, done)
+ go s.checkReader(c, r2, []byte("foobar"), nil, done)
+ time.Sleep(time.Millisecond)
+ c.Check(len(done), check.Equals, 0)
+ b.Close()
+ <-done
+ <-done
+}
+
+func (s *Suite) TestPrefillWriteCloseRead(c *check.C) {
+ done := make(chan bool, 2)
+ b := NewBuffer([]byte("baz"))
+ n, err := b.Write([]byte("waz"))
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+ b.Close()
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwaz"), nil, done)
+ r2 := b.NewReader()
+ go s.checkReader(c, r2, []byte("bazwaz"), nil, done)
+ <-done
+ <-done
+}
+
+func (s *Suite) TestWriteReadCloseRead(c *check.C) {
+ done := make(chan bool, 1)
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwazqux"), nil, done)
+
+ b.Write([]byte("bazwaz"))
+
+ r2 := b.NewReader()
+ r2.Read(make([]byte, 3))
+
+ b.Write([]byte("qux"))
+ b.Close()
+
+ s.checkReader(c, r2, []byte("wazqux"), nil, nil)
+ <-done
+}
+
+func (s *Suite) TestReadAtEOF(c *check.C) {
+ buf := make([]byte, 8)
+
+ b := NewBuffer([]byte{1, 2, 3})
+
+ r := b.NewReader()
+ n, err := r.Read(buf)
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+
+ // Reading zero bytes at EOF, but before Close(), doesn't
+ // block or error
+ done := make(chan bool)
+ go func() {
+ defer close(done)
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.IsNil)
+ }()
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ c.Error("timeout")
+ }
+
+ b.Close()
+
+ // Reading zero bytes after Close() returns EOF
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+
+ // Reading from start after Close() returns 3 bytes, then EOF
+ r = b.NewReader()
+ n, err = r.Read(buf)
+ c.Check(n, check.Equals, 3)
+ if err != nil {
+ c.Check(err, check.Equals, io.EOF)
+ }
+ n, err = r.Read(buf[:0])
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+ n, err = r.Read(buf)
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+}
+
+func (s *Suite) TestCloseWithError(c *check.C) {
+ errFake := errors.New("it's not even a real error")
+
+ done := make(chan bool, 1)
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done)
+
+ b.Write([]byte("bazwaz"))
+
+ r2 := b.NewReader()
+ r2.Read(make([]byte, 3))
+
+ b.Write([]byte("qux"))
+ b.CloseWithError(errFake)
+
+ s.checkReader(c, r2, []byte("wazqux"), errFake, nil)
+ <-done
+}
+
+// Write n*n bytes, n at a time; read them into n goroutines using
+// varying buffer sizes; compare checksums.
+func (s *Suite) TestManyReaders(c *check.C) {
+ const n = 256
+
+ b := NewBuffer(nil)
+
+ expectSum := make(chan []byte)
+ go func() {
+ hash := md5.New()
+ buf := make([]byte, n)
+ for i := 0; i < n; i++ {
+ time.Sleep(10 * time.Nanosecond)
+ rand.Read(buf)
+ b.Write(buf)
+ hash.Write(buf)
+ }
+ expectSum <- hash.Sum(nil)
+ b.Close()
+ }()
+
+ gotSum := make(chan []byte)
+ for i := 0; i < n; i++ {
+ go func(bufSize int) {
+ got := md5.New()
+ io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize))
+ gotSum <- got.Sum(nil)
+ }(i + n/2)
+ }
+
+ expect := <-expectSum
+ for i := 0; i < n; i++ {
+ c.Check(expect, check.DeepEquals, <-gotSum)
+ }
+}
+
+func (s *Suite) BenchmarkOneReader(c *check.C) {
+ s.benchmarkReaders(c, 1)
+}
+
+func (s *Suite) BenchmarkManyReaders(c *check.C) {
+ s.benchmarkReaders(c, 100)
+}
+
+func (s *Suite) benchmarkReaders(c *check.C, readers int) {
+ var n int64
+ t0 := time.Now()
+
+ buf := make([]byte, 10000)
+ rand.Read(buf)
+ for i := 0; i < 10; i++ {
+ b := NewBuffer(nil)
+ go func() {
+ for i := 0; i < c.N; i++ {
+ b.Write(buf)
+ }
+ b.Close()
+ }()
+
+ var wg sync.WaitGroup
+ for i := 0; i < readers; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ nn, _ := io.Copy(ioutil.Discard, b.NewReader())
+ atomic.AddInt64(&n, int64(nn))
+ }()
+ }
+ wg.Wait()
+ }
+ c.Logf("%d bytes, %.0f MB/s", n, float64(n)/time.Since(t0).Seconds()/1000000)
+}
+
+func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
+ buf, err := ioutil.ReadAll(r)
+ c.Check(err, check.Equals, expectError)
+ c.Check(buf, check.DeepEquals, expectData)
+ if done != nil {
+ done <- true
+ }
+}
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
package httpserver
import (
+ "math/rand"
+ "net/http"
"strconv"
"sync"
"time"
// Prefix is prepended to each returned ID.
Prefix string
- lastID int64
- mtx sync.Mutex
+ mtx sync.Mutex
+ src rand.Source
}
// Next returns a new ID string. It is safe to call Next from multiple
// goroutines.
func (g *IDGenerator) Next() string {
- id := time.Now().UnixNano()
g.mtx.Lock()
- if id <= g.lastID {
- id = g.lastID + 1
+ defer g.mtx.Unlock()
+ if g.src == nil {
+ g.src = rand.NewSource(time.Now().UnixNano())
}
- g.lastID = id
- g.mtx.Unlock()
- return g.Prefix + strconv.FormatInt(id, 36)
+ a, b := g.src.Int63(), g.src.Int63()
+ id := strconv.FormatInt(a, 36) + strconv.FormatInt(b, 36)
+ for len(id) > 20 {
+ id = id[:20]
+ }
+ return g.Prefix + id
+}
+
+// AddRequestIDs wraps an http.Handler, adding an X-Request-Id header
+// to each request that doesn't already have one.
+func AddRequestIDs(h http.Handler) http.Handler {
+ gen := &IDGenerator{Prefix: "req-"}
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ if req.Header.Get("X-Request-Id") == "" {
+ req.Header.Set("X-Request-Id", gen.Next())
+ }
+ h.ServeHTTP(w, req)
+ })
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+ "context"
+ "net/http"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/stats"
+ log "github.com/Sirupsen/logrus"
+)
+
+type contextKey struct {
+ name string
+}
+
+var requestTimeContextKey = contextKey{"requestTime"}
+
+// LogRequests wraps an http.Handler, logging each request and
+// response via logrus.
+func LogRequests(h http.Handler) http.Handler {
+ return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
+ w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
+ req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
+ lgr := log.WithFields(log.Fields{
+ "RequestID": req.Header.Get("X-Request-Id"),
+ "remoteAddr": req.RemoteAddr,
+ "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+ "reqMethod": req.Method,
+ "reqPath": req.URL.Path[1:],
+ "reqBytes": req.ContentLength,
+ })
+ logRequest(w, req, lgr)
+ defer logResponse(w, req, lgr)
+ h.ServeHTTP(w, req)
+ })
+}
+
+func logRequest(w *responseTimer, req *http.Request, lgr *log.Entry) {
+ lgr.Info("request")
+}
+
+func logResponse(w *responseTimer, req *http.Request, lgr *log.Entry) {
+ if tStart, ok := req.Context().Value(&requestTimeContextKey).(time.Time); ok {
+ tDone := time.Now()
+ lgr = lgr.WithFields(log.Fields{
+ "timeTotal": stats.Duration(tDone.Sub(tStart)),
+ "timeToStatus": stats.Duration(w.writeTime.Sub(tStart)),
+ "timeWriteBody": stats.Duration(tDone.Sub(w.writeTime)),
+ })
+ }
+ lgr.WithFields(log.Fields{
+ "respStatusCode": w.WroteStatus(),
+ "respStatus": http.StatusText(w.WroteStatus()),
+ "respBytes": w.WroteBodyBytes(),
+ }).Info("response")
+}
+
+type responseTimer struct {
+ ResponseWriter
+ wrote bool
+ writeTime time.Time
+}
+
+func (rt *responseTimer) WriteHeader(code int) {
+ if !rt.wrote {
+ rt.wrote = true
+ rt.writeTime = time.Now()
+ }
+ rt.ResponseWriter.WriteHeader(code)
+}
+
+func (rt *responseTimer) Write(p []byte) (int, error) {
+ if !rt.wrote {
+ rt.wrote = true
+ rt.writeTime = time.Now()
+ }
+ return rt.ResponseWriter.Write(p)
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+ "bytes"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "testing"
+ "time"
+
+ log "github.com/Sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestLogRequests(c *check.C) {
+ defer log.SetOutput(os.Stdout)
+ captured := &bytes.Buffer{}
+ log.SetOutput(captured)
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: time.RFC3339Nano,
+ })
+ h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.Write([]byte("hello world"))
+ })
+ req, err := http.NewRequest("GET", "https://foo.example/bar", nil)
+ req.Header.Set("X-Forwarded-For", "1.2.3.4:12345")
+ c.Assert(err, check.IsNil)
+ resp := httptest.NewRecorder()
+ AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req)
+
+ dec := json.NewDecoder(captured)
+
+ gotReq := make(map[string]interface{})
+ err = dec.Decode(&gotReq)
+ c.Logf("%#v", gotReq)
+ c.Check(gotReq["RequestID"], check.Matches, "req-[a-z0-9]{20}")
+ c.Check(gotReq["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+ c.Check(gotReq["msg"], check.Equals, "request")
+
+ gotResp := make(map[string]interface{})
+ err = dec.Decode(&gotResp)
+ c.Logf("%#v", gotResp)
+ c.Check(gotResp["RequestID"], check.Equals, gotReq["RequestID"])
+ c.Check(gotResp["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+ c.Check(gotResp["msg"], check.Equals, "response")
+
+ c.Assert(gotResp["time"], check.FitsTypeOf, "")
+ _, err = time.Parse(time.RFC3339Nano, gotResp["time"].(string))
+ c.Check(err, check.IsNil)
+
+ for _, key := range []string{"timeToStatus", "timeWriteBody", "timeTotal"} {
+ c.Assert(gotResp[key], check.FitsTypeOf, float64(0))
+ c.Check(gotResp[key].(float64), check.Not(check.Equals), float64(0))
+ }
+}
"net/http"
)
-// ResponseWriter wraps http.ResponseWriter and exposes the status
+type ResponseWriter interface {
+ http.ResponseWriter
+ WroteStatus() int
+ WroteBodyBytes() int
+}
+
+// responseWriter wraps http.ResponseWriter and exposes the status
// sent, the number of bytes sent to the client, and the last write
// error.
-type ResponseWriter struct {
+type responseWriter struct {
http.ResponseWriter
- wroteStatus *int // Last status given to WriteHeader()
- wroteBodyBytes *int // Bytes successfully written
- err *error // Last error returned from Write()
+ wroteStatus int // Last status given to WriteHeader()
+ wroteBodyBytes int // Bytes successfully written
+ err error // Last error returned from Write()
}
func WrapResponseWriter(orig http.ResponseWriter) ResponseWriter {
- return ResponseWriter{orig, new(int), new(int), new(error)}
+ return &responseWriter{ResponseWriter: orig}
}
-func (w ResponseWriter) WriteHeader(s int) {
- *w.wroteStatus = s
+func (w *responseWriter) WriteHeader(s int) {
+ w.wroteStatus = s
w.ResponseWriter.WriteHeader(s)
}
-func (w ResponseWriter) Write(data []byte) (n int, err error) {
+func (w *responseWriter) Write(data []byte) (n int, err error) {
n, err = w.ResponseWriter.Write(data)
- *w.wroteBodyBytes += n
- *w.err = err
+ w.wroteBodyBytes += n
+ w.err = err
return
}
-func (w ResponseWriter) WroteStatus() int {
- return *w.wroteStatus
+func (w *responseWriter) WroteStatus() int {
+ return w.wroteStatus
}
-func (w ResponseWriter) WroteBodyBytes() int {
- return *w.wroteBodyBytes
+func (w *responseWriter) WroteBodyBytes() int {
+ return w.wroteBodyBytes
}
-func (w ResponseWriter) Err() error {
- return *w.err
+func (w *responseWriter) Err() error {
+ return w.err
}
import (
"io"
"sort"
+ "strconv"
+ "strings"
"sync"
"time"
)
}
}
+// ReadAt returns data from the cache, first retrieving it from Keep if
+// necessary.
+func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) {
+ buf, err := c.Get(kc, locator)
+ if err != nil {
+ return 0, err
+ }
+ if off > len(buf) {
+ return 0, io.ErrUnexpectedEOF
+ }
+ return copy(p, buf[off:]), nil
+}
+
// Get returns data from the cache, first retrieving it from Keep if
// necessary.
func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
cacheKey := locator[:32]
+ bufsize := BLOCKSIZE
+ if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
+ datasize, err := strconv.ParseInt(parts[1], 10, 32)
+ if err == nil && datasize >= 0 {
+ bufsize = int(datasize)
+ }
+ }
c.mtx.Lock()
if c.cache == nil {
c.cache = make(map[string]*cacheBlock)
rdr, size, _, err := kc.Get(locator)
var data []byte
if err == nil {
- data = make([]byte, size, BLOCKSIZE)
+ data = make([]byte, size, bufsize)
_, err = io.ReadFull(rdr, data)
err2 := rdr.Close()
if err == nil {
import (
"errors"
- "fmt"
- "io"
"os"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/manifest"
)
-const (
- // After reading a data block from Keep, cfReader slices it up
- // and sends the slices to a buffered channel to be consumed
- // by the caller via Read().
- //
- // dataSliceSize is the maximum size of the slices, and
- // therefore the maximum number of bytes that will be returned
- // by a single call to Read().
- dataSliceSize = 1 << 20
-)
-
// ErrNoManifest indicates the given collection has no manifest
// information (e.g., manifest_text was excluded by a "select"
// parameter when retrieving the collection record).
if !ok {
return nil, ErrNoManifest
}
- m := manifest.Manifest{Text: mText}
- return kc.ManifestFileReader(m, filename)
-}
-
-func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
- f := &file{
- kc: kc,
- }
- err := f.load(m, filename)
+ fs, err := (&arvados.Collection{ManifestText: mText}).FileSystem(nil, kc)
if err != nil {
return nil, err
}
- return f, nil
-}
-
-type file struct {
- kc *KeepClient
- segments []*manifest.FileSegment
- size int64 // total file size
- offset int64 // current read offset
-
- // current/latest segment accessed -- might or might not match pos
- seg *manifest.FileSegment
- segStart int64 // position of segment relative to file
- segData []byte
- segNext []*manifest.FileSegment
- readaheadDone bool
-}
-
-// Close implements io.Closer.
-func (f *file) Close() error {
- f.kc = nil
- f.segments = nil
- f.segData = nil
- return nil
+ return fs.OpenFile(filename, os.O_RDONLY, 0)
}
-// Read implements io.Reader.
-func (f *file) Read(buf []byte) (int, error) {
- if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
- // f.seg does not cover the current read offset
- // (f.pos). Iterate over f.segments to find the one
- // that does.
- f.seg = nil
- f.segStart = 0
- f.segData = nil
- f.segNext = f.segments
- for len(f.segNext) > 0 {
- seg := f.segNext[0]
- f.segNext = f.segNext[1:]
- segEnd := f.segStart + int64(seg.Len)
- if segEnd > f.offset {
- f.seg = seg
- break
- }
- f.segStart = segEnd
- }
- f.readaheadDone = false
- }
- if f.seg == nil {
- return 0, io.EOF
- }
- if f.segData == nil {
- data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
- if err != nil {
- return 0, err
- }
- if len(data) < f.seg.Offset+f.seg.Len {
- return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
- }
- f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
- }
- // dataOff and dataLen denote a portion of f.segData
- // corresponding to a portion of the file at f.offset.
- dataOff := int(f.offset - f.segStart)
- dataLen := f.seg.Len - dataOff
-
- if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
- // If we have already read more than just the first
- // few bytes of this file, and we have already
- // consumed a noticeable portion of this segment, and
- // there's more data for this file in the next segment
- // ... then there's a good chance we are going to need
- // the data for that next segment soon. Start getting
- // it into the cache now.
- go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
- f.readaheadDone = true
- }
-
- n := len(buf)
- if n > dataLen {
- n = dataLen
- }
- copy(buf[:n], f.segData[dataOff:dataOff+n])
- f.offset += int64(n)
- return n, nil
-}
-
-// Seek implements io.Seeker.
-func (f *file) Seek(offset int64, whence int) (int64, error) {
- var want int64
- switch whence {
- case io.SeekStart:
- want = offset
- case io.SeekCurrent:
- want = f.offset + offset
- case io.SeekEnd:
- want = f.size + offset
- default:
- return f.offset, fmt.Errorf("invalid whence %d", whence)
- }
- if want < 0 {
- return f.offset, fmt.Errorf("attempted seek to %d", want)
- }
- if want > f.size {
- want = f.size
- }
- f.offset = want
- return f.offset, nil
-}
-
-// Size returns the file size in bytes.
-func (f *file) Size() int64 {
- return f.size
-}
-
-func (f *file) load(m manifest.Manifest, path string) error {
- f.segments = nil
- f.size = 0
- for seg := range m.FileSegmentIterByName(path) {
- f.segments = append(f.segments, seg)
- f.size += int64(seg.Len)
- }
- if f.segments == nil {
- return os.ErrNotExist
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
+ fs, err := (&arvados.Collection{ManifestText: m.Text}).FileSystem(nil, kc)
+ if err != nil {
+ return nil, err
}
- return nil
+ return fs.OpenFile(filename, os.O_RDONLY, 0)
}
func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
h := md5.New()
- var testdata []byte
buf := make([]byte, 4096)
locs := make([]string, len(buf))
+ testdata := make([]byte, 0, len(buf)*len(buf))
filesize := 0
- for i := 0; i < len(locs); i++ {
+ for i := range locs {
_, err := rand.Read(buf[:i])
h.Write(buf[:i])
locs[i], _, err = s.kc.PutB(buf[:i])
c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata))
c.Check(buf[:1000], check.DeepEquals, testdata[:1000])
+ expectPos := curPos + size + 12345
curPos, err = rdr.Seek(size+12345, io.SeekCurrent)
c.Check(err, check.IsNil)
- c.Check(curPos, check.Equals, size)
+ c.Check(curPos, check.Equals, expectPos)
- curPos, err = rdr.Seek(8-size, io.SeekCurrent)
+ curPos, err = rdr.Seek(8-curPos, io.SeekCurrent)
c.Check(err, check.IsNil)
c.Check(curPos, check.Equals, int64(8))
_, err = io.Copy(this.Hash, this.Reader)
if closer, ok := this.Reader.(io.Closer); ok {
- err2 := closer.Close()
- if err2 != nil && err == nil {
- return err2
+ closeErr := closer.Close()
+ if err == nil {
+ err = closeErr
}
}
if err != nil {
return err
}
-
- sum := this.Hash.Sum(nil)
- if fmt.Sprintf("%x", sum) != this.Check {
- err = BadChecksum
+ if fmt.Sprintf("%x", this.Hash.Sum(nil)) != this.Check {
+ return BadChecksum
}
-
- return err
+ return nil
}
"time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
+ "git.curoverse.com/arvados.git/sdk/go/asyncbuf"
)
// A Keep "block" is 64MB.
bufsize = BLOCKSIZE
}
- t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
- defer t.Close()
-
- return kc.putReplicas(hash, t, dataBytes)
+ buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+ go func() {
+ _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+ buf.CloseWithError(err)
+ }()
+ return kc.putReplicas(hash, buf.NewReader, dataBytes)
}
// PutHB writes a block to Keep. The hash of the bytes is given in
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
- t := streamer.AsyncStreamFromSlice(buf)
- defer t.Close()
- return kc.putReplicas(hash, t, int64(len(buf)))
+ newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+ return kc.putReplicas(hash, newReader, int64(len(buf)))
}
// PutB writes a block to Keep. It computes the hash itself.
return kc.getOrHead("GET", locator)
}
+// ReadAt() retrieves a portion of block from the cache if it's
+// present, otherwise from the network.
+func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
+ return kc.cache().ReadAt(kc, locator, p, off)
+}
+
// Ask() verifies that a block with the given hash is available and
// readable, according to at least one Keep service. Unlike Get, it
// does not retrieve the data or verify that the data content matches
package keepclient
import (
+ "bytes"
"crypto/md5"
"errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
. "gopkg.in/check.v1"
)
make(chan string)}
UploadToStubHelper(c, st,
- func(kc *KeepClient, url string, reader io.ReadCloser,
- writer io.WriteCloser, upload_status chan uploadStatus) {
-
- tr := streamer.AsyncStreamFromReader(512, reader)
- defer tr.Close()
-
- br1 := tr.MakeStreamReader()
-
- go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
-
- writer.Write([]byte("foo"))
- writer.Close()
+ func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
+ go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, 0)
<-st.handled
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
)
// Function used to emit debug messages. The easiest way to enable
response string
}
-func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
upload_status chan<- uploadStatus, expectedLength int64, requestID int32) {
var req *http.Request
if req, err = http.NewRequest("PUT", url, nil); err != nil {
DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error())
upload_status <- uploadStatus{err, url, 0, 0, ""}
- body.Close()
return
}
req.ContentLength = expectedLength
if expectedLength > 0 {
- // Do() will close the body ReadCloser when it is done
- // with it.
- req.Body = body
+ req.Body = ioutil.NopCloser(body)
} else {
- // "For client requests, a value of 0 means unknown if Body is
- // not nil." In this case we do want the body to be empty, so
- // don't set req.Body. However, we still need to close the
- // body ReadCloser.
- body.Close()
+ // "For client requests, a value of 0 means unknown if
+ // Body is not nil." In this case we do want the body
+ // to be empty, so don't set req.Body.
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
func (this *KeepClient) putReplicas(
hash string,
- tr *streamer.AsyncStream,
+ getReader func() io.Reader,
expectedLength int64) (locator string, replicas int, err error) {
// Generate an arbitrary ID to identify this specific
// Start some upload requests
if next_server < len(sv) {
DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
+ go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID)
next_server += 1
active += 1
} else {
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* AsyncStream pulls data in from a io.Reader source (such as a file or network
-socket) and fans out to any number of StreamReader sinks.
-
-Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
-any point in the lifetime of the AsyncStream, and each StreamReader will read
-the contents of the buffer up to the "frontier" of the buffer, at which point
-the StreamReader blocks until new data is read from the source.
-
-This is useful for minimizing readthrough latency as sinks can read and act on
-data from the source without waiting for the source to be completely buffered.
-It is also useful as a cache in situations where re-reading the original source
-potentially is costly, since the buffer retains a copy of the source data.
-
-Usage:
-
-Begin reading into a buffer with maximum size 'buffersize' from 'source':
- stream := AsyncStreamFromReader(buffersize, source)
-
-To create a new reader (this can be called multiple times, each reader starts
-at the beginning of the buffer):
- reader := tr.MakeStreamReader()
-
-Make sure to close the reader when you're done with it.
- reader.Close()
-
-When you're done with the stream:
- stream.Close()
-
-Alternately, if you already have a filled buffer and just want to read out from it:
- stream := AsyncStreamFromSlice(buf)
-
- r := tr.MakeStreamReader()
-
-*/
-
-package streamer
-
-import (
- "errors"
- "io"
-)
-
-var ErrAlreadyClosed = errors.New("cannot close a stream twice")
-
-type AsyncStream struct {
- buffer []byte
- requests chan sliceRequest
- add_reader chan bool
- subtract_reader chan bool
- wait_zero_readers chan bool
- closed bool
-}
-
-// Reads from the buffer managed by the Transfer()
-type StreamReader struct {
- offset int
- stream *AsyncStream
- responses chan sliceResult
-}
-
-func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
- t := &AsyncStream{
- buffer: make([]byte, buffersize),
- requests: make(chan sliceRequest),
- add_reader: make(chan bool),
- subtract_reader: make(chan bool),
- wait_zero_readers: make(chan bool),
- }
-
- go t.transfer(source)
- go t.readersMonitor()
-
- return t
-}
-
-func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{
- buffer: buf,
- requests: make(chan sliceRequest),
- add_reader: make(chan bool),
- subtract_reader: make(chan bool),
- wait_zero_readers: make(chan bool),
- }
-
- go t.transfer(nil)
- go t.readersMonitor()
-
- return t
-}
-
-func (this *AsyncStream) MakeStreamReader() *StreamReader {
- this.add_reader <- true
- return &StreamReader{0, this, make(chan sliceResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this *StreamReader) Read(p []byte) (n int, err error) {
- this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
- rr, valid := <-this.responses
- if valid {
- this.offset += len(rr.slice)
- return copy(p, rr.slice), rr.err
- } else {
- return 0, io.ErrUnexpectedEOF
- }
-}
-
-func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
- // Record starting offset in order to correctly report the number of bytes sent
- starting_offset := this.offset
- for {
- this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
- rr, valid := <-this.responses
- if valid {
- this.offset += len(rr.slice)
- if rr.err != nil {
- if rr.err == io.EOF {
- // EOF is not an error.
- return int64(this.offset - starting_offset), nil
- } else {
- return int64(this.offset - starting_offset), rr.err
- }
- } else {
- dest.Write(rr.slice)
- }
- } else {
- return int64(this.offset), io.ErrUnexpectedEOF
- }
- }
-}
-
-// Close the responses channel
-func (this *StreamReader) Close() error {
- if this.stream == nil {
- return ErrAlreadyClosed
- }
- this.stream.subtract_reader <- true
- close(this.responses)
- this.stream = nil
- return nil
-}
-
-func (this *AsyncStream) Close() error {
- if this.closed {
- return ErrAlreadyClosed
- }
- this.closed = true
- this.wait_zero_readers <- true
- close(this.requests)
- close(this.add_reader)
- close(this.subtract_reader)
- close(this.wait_zero_readers)
- return nil
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package streamer
-
-import (
- . "gopkg.in/check.v1"
- "io"
- "testing"
- "time"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) { TestingT(t) }
-
-var _ = Suite(&StandaloneSuite{})
-
-// Standalone tests
-type StandaloneSuite struct{}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
- ReadIntoBufferHelper(c, 225)
- ReadIntoBufferHelper(c, 224)
-}
-
-func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-}
-
-func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 96)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 96; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 96) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-}
-
-func ReadIntoBufferHelper(c *C, bufsize int) {
- buffer := make([]byte, bufsize)
-
- reader, writer := io.Pipe()
- slices := make(chan nextSlice)
-
- go readIntoBuffer(buffer, reader, slices)
-
- HelperWrite128andCheck(c, buffer, writer, slices)
- HelperWrite96andCheck(c, buffer, writer, slices)
-
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.EOF)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
- buffer := make([]byte, 223)
- reader, writer := io.Pipe()
- slices := make(chan nextSlice)
-
- go readIntoBuffer(buffer, reader, slices)
-
- HelperWrite128andCheck(c, buffer, writer, slices)
-
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
-
- // Write will deadlock because it can't write all the data, so
- // spin it off to a goroutine
- go writer.Write(out)
- s1 := <-slices
-
- c.Check(len(s1.slice), Equals, 95)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 95; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 95) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-
- writer.Close()
- s1 = <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(512, reader)
-
- br1 := tr.MakeStreamReader()
- out := make([]byte, 128)
-
- {
- // Write some data, and read into a buffer shorter than
- // available data
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
-
- writer.Write(out[:100])
-
- in := make([]byte, 64)
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Write some more data, and read into buffer longer than
- // available data
- in := make([]byte, 64)
- n, err := br1.Read(in)
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, out[64+i])
- }
-
- }
-
- {
- // Test read before write
- type Rd struct {
- n int
- err error
- }
- rd := make(chan Rd)
- in := make([]byte, 64)
-
- go func() {
- n, err := br1.Read(in)
- rd <- Rd{n, err}
- }()
-
- time.Sleep(100 * time.Millisecond)
- writer.Write(out[100:])
-
- got := <-rd
-
- c.Check(got.n, Equals, 28)
- c.Check(got.err, Equals, nil)
-
- for i := 0; i < 28; i += 1 {
- c.Check(in[i], Equals, out[100+i])
- }
- }
-
- br2 := tr.MakeStreamReader()
- {
- // Test 'catch up' reader
- in := make([]byte, 256)
- n, err := br2.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Test closing the reader
- writer.Close()
-
- in := make([]byte, 256)
- n1, err1 := br1.Read(in)
- n2, err2 := br2.Read(in)
- c.Check(n1, Equals, 0)
- c.Check(err1, Equals, io.EOF)
- c.Check(n2, Equals, 0)
- c.Check(err2, Equals, io.EOF)
- }
-
- {
- // Test 'catch up' reader after closing
- br3 := tr.MakeStreamReader()
- in := make([]byte, 256)
- n, err := br3.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
-
- n, err = br3.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(100, reader)
- defer tr.Close()
-
- sr := tr.MakeStreamReader()
- defer sr.Close()
-
- out := make([]byte, 101)
- go writer.Write(out)
-
- n, err := sr.Read(out)
- c.Check(n, Equals, 100)
- c.Check(err, IsNil)
-
- n, err = sr.Read(out)
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- tr := AsyncStreamFromSlice(buffer)
-
- br1 := tr.MakeStreamReader()
-
- in := make([]byte, 64)
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, buffer[i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, buffer[64+i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- tr := AsyncStreamFromSlice(buffer)
- defer tr.Close()
-
- br1 := tr.MakeStreamReader()
- defer br1.Close()
-
- reader, writer := io.Pipe()
-
- go func() {
- p := make([]byte, 100)
- n, err := reader.Read(p)
- c.Check(n, Equals, 100)
- c.Check(err, Equals, nil)
- c.Check(p, DeepEquals, buffer)
- }()
-
- io.Copy(writer, br1)
-}
-
-func (s *StandaloneSuite) TestManyReaders(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(512, reader)
- defer tr.Close()
-
- sr := tr.MakeStreamReader()
- go func() {
- time.Sleep(100 * time.Millisecond)
- sr.Close()
- }()
-
- for i := 0; i < 200; i += 1 {
- go func() {
- br1 := tr.MakeStreamReader()
- defer br1.Close()
-
- p := make([]byte, 3)
- n, err := br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("foo"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("bar"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("baz"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }()
- }
-
- writer.Write([]byte("foo"))
- writer.Write([]byte("bar"))
- writer.Write([]byte("baz"))
- writer.Close()
-}
-
-func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
- buffer := make([]byte, 100)
- tr := AsyncStreamFromSlice(buffer)
- sr := tr.MakeStreamReader()
- c.Check(sr.Close(), IsNil)
- c.Check(sr.Close(), Equals, ErrAlreadyClosed)
- c.Check(tr.Close(), IsNil)
- c.Check(tr.Close(), Equals, ErrAlreadyClosed)
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* Internal implementation of AsyncStream.
-Outline of operation:
-
-The kernel is the transfer() goroutine. It manages concurrent reads and
-appends to the "body" slice. "body" is a slice of "source_buffer" that
-represents the segment of the buffer that is already filled in and available
-for reading.
-
-To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
-from the io.Reader source directly into source_buffer. Each read goes into a
-slice of buffer which spans the section immediately following the end of the
-current "body". Each time a Read completes, a slice representing the the
-section just filled in (or any read errors/EOF) is sent over the "slices"
-channel back to the transfer() function.
-
-Meanwhile, the transfer() function selects() on two channels, the "requests"
-channel and the "slices" channel.
-
-When a message is received on the "slices" channel, this means the a new
-section of the buffer has data, or an error is signaled. Since the data has
-been read directly into the source_buffer, it is able to simply increases the
-size of the body slice to encompass the newly filled in section. Then any
-pending reads are serviced with handleReadRequest (described below).
-
-When a message is received on the "requests" channel, it means a StreamReader
-wants access to a slice of the buffer. This is passed to handleReadRequest().
-
-The handleReadRequest() function takes a sliceRequest consisting of a buffer
-offset, maximum size, and channel to send the response. If there was an error
-reported from the source reader, it is returned. If the offset is less than
-the size of the body, the request can proceed, and it sends a body slice
-spanning the segment from offset to min(offset+maxsize, end of the body). If
-source reader status is EOF (done filling the buffer) and the read request
-offset is beyond end of the body, it responds with EOF. Otherwise, the read
-request is for a slice beyond the current size of "body" but we expect the body
-to expand as more data is added, so the request gets added to a wait list.
-
-The transfer() runs until the requests channel is closed by AsyncStream.Close()
-
-To track readers, streamer uses the readersMonitor() goroutine. This goroutine
-chooses which channels to receive from based on the number of outstanding
-readers. When a new reader is created, it sends a message on the add_reader
-channel. If the number of readers is already at MAX_READERS, this blocks the
-sender until an existing reader is closed. When a reader is closed, it sends a
-message on the subtract_reader channel. Finally, when AsyncStream.Close() is
-called, it sends a message on the wait_zero_readers channel, which will block
-the sender unless there are zero readers and it is safe to shut down the
-AsyncStream.
-*/
-
-package streamer
-
-import (
- "io"
-)
-
-const MAX_READERS = 100
-
-// A slice passed from readIntoBuffer() to transfer()
-type nextSlice struct {
- slice []byte
- reader_error error
-}
-
-// A read request to the Transfer() function
-type sliceRequest struct {
- offset int
- maxsize int
- result chan<- sliceResult
-}
-
-// A read result from the Transfer() function
-type sliceResult struct {
- slice []byte
- err error
-}
-
-// Supports writing into a buffer
-type bufferWriter struct {
- buf []byte
- ptr int
-}
-
-// Copy p into this.buf, increment pointer and return number of bytes read.
-func (this *bufferWriter) Write(p []byte) (n int, err error) {
- n = copy(this.buf[this.ptr:], p)
- this.ptr += n
- return n, nil
-}
-
-// Read repeatedly from the reader and write sequentially into the specified
-// buffer, and report each read to channel 'c'. Completes when Reader 'r'
-// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
- defer close(slices)
-
- if writeto, ok := r.(io.WriterTo); ok {
- n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
- if err != nil {
- slices <- nextSlice{nil, err}
- } else {
- slices <- nextSlice{buffer[:n], nil}
- slices <- nextSlice{nil, io.EOF}
- }
- return
- } else {
- // Initially entire buffer is available
- ptr := buffer[:]
- for {
- var n int
- var err error
- if len(ptr) > 0 {
- const readblock = 64 * 1024
- // Read 64KiB into the next part of the buffer
- if len(ptr) > readblock {
- n, err = r.Read(ptr[:readblock])
- } else {
- n, err = r.Read(ptr)
- }
- } else {
- // Ran out of buffer space, try reading one more byte
- var b [1]byte
- n, err = r.Read(b[:])
-
- if n > 0 {
- // Reader has more data but we have nowhere to
- // put it, so we're stuffed
- slices <- nextSlice{nil, io.ErrShortBuffer}
- } else {
- // Return some other error (hopefully EOF)
- slices <- nextSlice{nil, err}
- }
- return
- }
-
- // End on error (includes EOF)
- if err != nil {
- slices <- nextSlice{nil, err}
- return
- }
-
- if n > 0 {
- // Make a slice with the contents of the read
- slices <- nextSlice{ptr[:n], nil}
-
- // Adjust the scratch space slice
- ptr = ptr[n:]
- }
- }
- }
-}
-
-// Handle a read request. Returns true if a response was sent, and false if
-// the request should be queued.
-func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
- if (reader_status != nil) && (reader_status != io.EOF) {
- req.result <- sliceResult{nil, reader_status}
- return true
- } else if req.offset < len(body) {
- var end int
- if req.offset+req.maxsize < len(body) {
- end = req.offset + req.maxsize
- } else {
- end = len(body)
- }
- req.result <- sliceResult{body[req.offset:end], nil}
- return true
- } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
- req.result <- sliceResult{nil, io.EOF}
- return true
- } else {
- return false
- }
-}
-
-// Mediates between reads and appends.
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel. Completes
-// when 'requests' channel is closed.
-func (this *AsyncStream) transfer(source_reader io.Reader) {
- source_buffer := this.buffer
- requests := this.requests
-
- // currently buffered data
- var body []byte
-
- // for receiving slices from readIntoBuffer
- var slices chan nextSlice = nil
-
- // indicates the status of the underlying reader
- var reader_status error = nil
-
- if source_reader != nil {
- // 'body' is the buffer slice representing the body content read so far
- body = source_buffer[:0]
-
- // used to communicate slices of the buffer as they are
- // readIntoBuffer will close 'slices' when it is done with it
- slices = make(chan nextSlice)
-
- // Spin it off
- go readIntoBuffer(source_buffer, source_reader, slices)
- } else {
- // use the whole buffer
- body = source_buffer[:]
-
- // buffer is complete
- reader_status = io.EOF
- }
-
- pending_requests := make([]sliceRequest, 0)
-
- for {
- select {
- case req, valid := <-requests:
- // Handle a buffer read request
- if valid {
- if !handleReadRequest(req, body, reader_status) {
- pending_requests = append(pending_requests, req)
- }
- } else {
- // closed 'requests' channel indicates we're done
- return
- }
-
- case bk, valid := <-slices:
- // Got a new slice from the reader
- if valid {
- reader_status = bk.reader_error
-
- if bk.slice != nil {
- // adjust body bounds now that another slice has been read
- body = source_buffer[0 : len(body)+len(bk.slice)]
- }
-
- // handle pending reads
- n := 0
- for n < len(pending_requests) {
- if handleReadRequest(pending_requests[n], body, reader_status) {
- // move the element from the back of the slice to
- // position 'n', then shorten the slice by one element
- pending_requests[n] = pending_requests[len(pending_requests)-1]
- pending_requests = pending_requests[0 : len(pending_requests)-1]
- } else {
-
- // Request wasn't handled, so keep it in the request slice
- n += 1
- }
- }
- } else {
- if reader_status == nil {
- // slices channel closed without signaling EOF
- reader_status = io.ErrUnexpectedEOF
- }
- slices = nil
- }
- }
- }
-}
-
-func (this *AsyncStream) readersMonitor() {
- var readers int = 0
-
- for {
- if readers == 0 {
- select {
- case _, ok := <-this.wait_zero_readers:
- if ok {
- // nothing, just implicitly unblock the sender
- } else {
- return
- }
- case _, ok := <-this.add_reader:
- if ok {
- readers += 1
- } else {
- return
- }
- }
- } else if readers > 0 && readers < MAX_READERS {
- select {
- case _, ok := <-this.add_reader:
- if ok {
- readers += 1
- } else {
- return
- }
-
- case _, ok := <-this.subtract_reader:
- if ok {
- readers -= 1
- } else {
- return
- }
- }
- } else if readers == MAX_READERS {
- _, ok := <-this.subtract_reader
- if ok {
- readers -= 1
- } else {
- return
- }
- }
- }
-}
end
def require_auth_scope
- if @read_auths.empty?
+ unless current_user && @read_auths.any? { |auth| auth.user.andand.uuid == current_user.uuid }
if require_login != false
send_error("Forbidden", status: 403)
end
user = nil
api_client = nil
api_client_auth = nil
- supplied_token =
- params["api_token"] ||
- params["oauth_token"] ||
- env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1]
- if supplied_token
- api_client_auth = ApiClientAuthorization.
+ if request.get? || params["_method"] == 'GET'
+ reader_tokens = params["reader_tokens"]
+ if reader_tokens.is_a? String
+ reader_tokens = SafeJSON.load(reader_tokens)
+ end
+ else
+ reader_tokens = nil
+ end
+
+ # Set current_user etc. based on the primary session token if a
+ # valid one is present. Otherwise, use the first valid token in
+ # reader_tokens.
+ [params["api_token"],
+ params["oauth_token"],
+ env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1],
+ *reader_tokens,
+ ].each do |supplied|
+ next if !supplied
+ try_auth = ApiClientAuthorization.
includes(:api_client, :user).
- where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
+ where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied).
first
- if api_client_auth.andand.user
+ if try_auth.andand.user
+ api_client_auth = try_auth
user = api_client_auth.user
api_client = api_client_auth.api_client
- else
- # Token seems valid, but points to a non-existent (deleted?) user.
- api_client_auth = nil
+ break
end
end
Thread.current[:api_client_ip_address] = remote_ip
after_validation :assign_auth
before_save :sort_serialized_attrs
after_save :handle_completed
+ after_save :propagate_priority
has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
end
end
+ def propagate_priority
+ if self.priority_changed?
+ act_as_system_user do
+ # Update the priority of child container requests to match new priority
+ # of the parent container.
+ ContainerRequest.where(requesting_container_uuid: self.uuid,
+ state: ContainerRequest::Committed).each do |cr|
+ cr.priority = self.priority
+ cr.save
+ end
+ end
+ end
+ end
+
# Create a new container (or find an existing one) to satisfy the
# given container request.
def self.resolve(req)
assert_response 403
end
+ test "narrow + wide scoped tokens for different users" do
+ get_args = [{
+ reader_tokens: [api_client_authorizations(:anonymous).api_token]
+ }, auth(:active_userlist)]
+ get(v1_url('users'), *get_args)
+ assert_response :success
+ get(v1_url('users', ''), *get_args) # Add trailing slash.
+ assert_response :success
+ get(v1_url('users', 'current'), *get_args)
+ assert_response 403
+ get(v1_url('virtual_machines'), *get_args)
+ assert_response 403
+ end
+
test "specimens token can see exactly owned specimens" do
get_args = [{}, auth(:active_specimens)]
get(v1_url('specimens'), *get_args)
[nil, :active_noscope].each do |main_auth|
[:spectator, :spectator_specimens].each do |read_auth|
- test "#{main_auth} auth with reader token #{read_auth} can read" do
- assert_includes(get_specimen_uuids(main_auth, read_auth),
- spectator_specimen, "did not find spectator specimen")
- end
-
- test "#{main_auth} auth with JSON read token #{read_auth} can read" do
- assert_includes(get_specimen_uuids(main_auth, read_auth, :to_json),
- spectator_specimen, "did not find spectator specimen")
- end
-
- test "#{main_auth} auth with reader token #{read_auth} can't write" do
- assert_post_denied(main_auth, read_auth)
- end
+ [:to_a, :to_json].each do |formatter|
+ test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can#{"'t" if main_auth} read" do
+ get_specimens(main_auth, read_auth)
+ assert_response(if main_auth then 403 else 200 end)
+ end
- test "#{main_auth} auth with JSON read token #{read_auth} can't write" do
- assert_post_denied(main_auth, read_auth, :to_json)
+ test "#{main_auth.inspect} auth with #{formatter} reader token #{read_auth} can't write" do
+ assert_post_denied(main_auth, read_auth, formatter)
+ end
end
end
end
assert_equal 0, c2.priority
end
+
+ test "Container makes container request, then changes priority" do
+ set_user_from_auth :active
+ cr = create_minimal_req!(priority: 5, state: "Committed", container_count_max: 1)
+
+ c = Container.find_by_uuid cr.container_uuid
+ assert_equal 5, c.priority
+
+ cr2 = create_minimal_req!
+ cr2.update_attributes!(priority: 5, state: "Committed", requesting_container_uuid: c.uuid, command: ["echo", "foo2"], container_count_max: 1)
+ cr2.reload
+
+ c2 = Container.find_by_uuid cr2.container_uuid
+ assert_equal 5, c2.priority
+
+ act_as_system_user do
+ c.priority = 10
+ c.save!
+ end
+
+ cr.reload
+
+ cr2.reload
+ assert_equal 10, cr2.priority
+
+ c2.reload
+ assert_equal 10, c2.priority
+ end
+
[
['running_container_auth', 'zzzzz-dz642-runningcontainr'],
['active_no_prefs', nil],
}
r.URL.Path = rewrittenPath
- h.handler.ServeHTTP(&w, r)
+ h.handler.ServeHTTP(w, r)
}
}
}
+var errorBlacklist = []string{"Cannot connect to the Docker daemon"}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+
+func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+ for _, d := range errorBlacklist {
+ if strings.Index(goterr.Error(), d) != -1 {
+ runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+ if *brokenNodeHook == "" {
+ runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+ } else {
+ runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+ // run killme script
+ c := exec.Command(*brokenNodeHook)
+ c.Stdout = runner.CrunchLog
+ c.Stderr = runner.CrunchLog
+ err := c.Run()
+ if err != nil {
+ runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+ }
+ }
+ return true
+ }
+ }
+ return false
+}
+
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
// check for and/or load image
err = runner.LoadImage()
if err != nil {
- runner.finalState = "Cancelled"
+ if !runner.checkBrokenNode(err) {
+ // Failed to load image but not due to a "broken node"
+ // condition, probably user error.
+ runner.finalState = "Cancelled"
+ }
err = fmt.Errorf("While loading container image: %v", err)
return
}
return
}
- runner.StartCrunchstat()
-
if runner.IsCancelled() {
return
}
}
runner.finalState = "Cancelled"
+ runner.StartCrunchstat()
+
err = runner.StartContainer()
if err != nil {
+ runner.checkBrokenNode(err)
return
}
}
api.Retries = 8
- var kc *keepclient.KeepClient
- kc, err = keepclient.MakeKeepClient(api)
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
+ kc, kcerr := keepclient.MakeKeepClient(api)
+ if kcerr != nil {
+ log.Fatalf("%s: %v", containerId, kcerr)
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
- var docker *dockerclient.Client
// API version 1.21 corresponds to Docker 1.9, which is currently the
// minimum version we want to support.
- docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
- }
-
+ docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
dockerClientProxy := ThinDockerClientProxy{Docker: docker}
cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+
+ if dockererr != nil {
+ cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+ cr.checkBrokenNode(dockererr)
+ cr.CrunchLog.Close()
+ os.Exit(1)
+ }
+
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent
}
func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+ if t.finish == 2 {
+ return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+ }
+
if t.imageLoaded == image {
return dockertypes.ImageInspect{}, nil, nil
} else {
}
func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+ if t.finish == 2 {
+ return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+ }
_, err := io.Copy(ioutil.Discard, input)
if err != nil {
return dockertypes.ImageLoadResponse{}, err
len int64
}
+func (fw FileWrapper) Readdir(n int) ([]os.FileInfo, error) {
+ return nil, errors.New("not implemented")
+}
+
+func (fw FileWrapper) Seek(int64, int) (int64, error) {
+ return 0, errors.New("not implemented")
+}
+
func (fw FileWrapper) Size() int64 {
return fw.len
}
-func (fw FileWrapper) Seek(int64, int) (int64, error) {
+func (fw FileWrapper) Stat() (os.FileInfo, error) {
+ return nil, errors.New("not implemented")
+}
+
+func (fw FileWrapper) Truncate(int64) error {
+ return errors.New("not implemented")
+}
+
+func (fw FileWrapper) Write([]byte) (int, error) {
return 0, errors.New("not implemented")
}
func (KeepReadErrorTestClient) ClearBlockCache() {
}
-type ErrorReader struct{}
+type ErrorReader struct {
+ FileWrapper
+}
func (ErrorReader) Read(p []byte) (n int, err error) {
return 0, errors.New("ErrorReader")
}
-func (ErrorReader) Close() error {
- return nil
-}
-
-func (ErrorReader) Size() int64 {
- return 0
-}
-
func (ErrorReader) Seek(int64, int) (int64, error) {
return 0, errors.New("ErrorReader")
}
if api.CalledWith("container.state", "Complete") != nil {
c.Check(err, IsNil)
}
- c.Check(api.WasSetRunning, Equals, true)
-
- c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+ if exitCode != 2 {
+ c.Check(api.WasSetRunning, Equals, true)
+ c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+ }
if err != nil {
for k, v := range api.Logs {
_, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "", 0)
c.Assert(err, NotNil)
}
+
+func (s *TestSuite) TestFullBrokenDocker1(c *C) {
+ tf, err := ioutil.TempFile("", "brokenNodeHook-")
+ c.Assert(err, IsNil)
+ defer os.Remove(tf.Name())
+
+ tf.Write([]byte(`#!/bin/sh
+exec echo killme
+`))
+ tf.Close()
+ os.Chmod(tf.Name(), 0700)
+
+ ech := tf.Name()
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
+
+}
+
+func (s *TestSuite) TestFullBrokenDocker2(c *C) {
+ ech := ""
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+}
}
}
+// Update saves a modified version (fs) to an existing collection
+// (coll) and, if successful, updates the relevant cache entries so
+// subsequent calls to Get() reflect the modifications.
+func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
+ c.setupOnce.Do(c.setup)
+
+ if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText {
+ return err
+ } else {
+ coll.ManifestText = m
+ }
+ var updated arvados.Collection
+ defer c.pdhs.Remove(coll.UUID)
+ err := client.RequestAndDecode(&updated, "PATCH", "/arvados/v1/collections/"+coll.UUID, client.UpdateBody(coll), nil)
+ if err == nil {
+ c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
+ expire: time.Now().Add(time.Duration(c.TTL)),
+ collection: &updated,
+ })
+ }
+ return err
+}
+
func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
c.setupOnce.Do(c.setup)
import (
"bytes"
"io"
+ "io/ioutil"
+ "net/url"
+ "os"
"os/exec"
+ "strings"
+ "time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
check "gopkg.in/check.v1"
)
func (s *IntegrationSuite) TestWebdavWithCadaver(c *check.C) {
- basePath := "/c=" + arvadostest.FooAndBarFilesInDirUUID + "/t=" + arvadostest.ActiveToken + "/"
+ testdata := []byte("the human tragedy consists in the necessity of living with the consequences of actions performed under the pressure of compulsions we do not understand")
+
+ localfile, err := ioutil.TempFile("", "localfile")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(localfile.Name())
+ localfile.Write(testdata)
+
+ emptyfile, err := ioutil.TempFile("", "emptyfile")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(emptyfile.Name())
+
+ checkfile, err := ioutil.TempFile("", "checkfile")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(checkfile.Name())
+
+ var newCollection arvados.Collection
+ arv := arvados.NewClientFromEnv()
+ arv.AuthToken = arvadostest.ActiveToken
+ err = arv.RequestAndDecode(&newCollection, "POST", "/arvados/v1/collections", bytes.NewBufferString(url.Values{"collection": {"{}"}}.Encode()), nil)
+ c.Assert(err, check.IsNil)
+ writePath := "/c=" + newCollection.UUID + "/t=" + arv.AuthToken + "/"
+
+ pdhPath := "/c=" + strings.Replace(arvadostest.FooAndBarFilesInDirPDH, "+", "-", -1) + "/t=" + arv.AuthToken + "/"
+
+ matchToday := time.Now().Format("Jan +2")
+
+ readPath := "/c=" + arvadostest.FooAndBarFilesInDirUUID + "/t=" + arvadostest.ActiveToken + "/"
type testcase struct {
path string
cmd string
match string
+ data []byte
}
for _, trial := range []testcase{
{
- path: basePath,
+ path: readPath,
cmd: "ls\n",
match: `(?ms).*dir1 *0 .*`,
},
{
- path: basePath,
+ path: readPath,
cmd: "ls dir1\n",
match: `(?ms).*bar *3.*foo *3 .*`,
},
{
- path: basePath + "_/dir1",
+ path: readPath + "_/dir1",
cmd: "ls\n",
match: `(?ms).*bar *3.*foo *3 .*`,
},
{
- path: basePath + "dir1/",
+ path: readPath + "dir1/",
cmd: "ls\n",
- match: `(?ms).*bar *3.*foo *3 .*`,
+ match: `(?ms).*bar *3.*foo +3 +Feb +\d+ +2014.*`,
+ },
+ {
+ path: writePath,
+ cmd: "get emptyfile '" + checkfile.Name() + "'\n",
+ match: `(?ms).*Not Found.*`,
+ },
+ {
+ path: writePath,
+ cmd: "put '" + emptyfile.Name() + "' emptyfile\n",
+ match: `(?ms).*Uploading .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "get emptyfile '" + checkfile.Name() + "'\n",
+ match: `(?ms).*Downloading .* succeeded.*`,
+ data: []byte{},
+ },
+ {
+ path: writePath,
+ cmd: "put '" + localfile.Name() + "' testfile\n",
+ match: `(?ms).*Uploading .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "get testfile '" + checkfile.Name() + "'\n",
+ match: `(?ms).*succeeded.*`,
+ data: testdata,
+ },
+ {
+ path: writePath,
+ cmd: "move testfile newdir0/\n",
+ match: `(?ms).*Moving .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "move testfile newdir0/\n",
+ match: `(?ms).*Moving .* failed.*`,
+ },
+ {
+ path: writePath,
+ cmd: "ls\n",
+ match: `(?ms).*newdir0.* 0 +` + matchToday + ` \d+:\d+\n.*`,
+ },
+ {
+ path: writePath,
+ cmd: "move newdir0/testfile emptyfile/bogus/\n",
+ match: `(?ms).*Moving .* failed.*`,
+ },
+ {
+ path: writePath,
+ cmd: "mkcol newdir1\n",
+ match: `(?ms).*Creating .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "move newdir0/testfile newdir1/\n",
+ match: `(?ms).*Moving .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "put '" + localfile.Name() + "' newdir1/testfile1\n",
+ match: `(?ms).*Uploading .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "mkcol newdir2\n",
+ match: `(?ms).*Creating .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "put '" + localfile.Name() + "' newdir2/testfile2\n",
+ match: `(?ms).*Uploading .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "copy newdir2/testfile2 testfile3\n",
+ match: `(?ms).*succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "get testfile3 '" + checkfile.Name() + "'\n",
+ match: `(?ms).*succeeded.*`,
+ data: testdata,
+ },
+ {
+ path: writePath,
+ cmd: "get newdir2/testfile2 '" + checkfile.Name() + "'\n",
+ match: `(?ms).*succeeded.*`,
+ data: testdata,
+ },
+ {
+ path: writePath,
+ cmd: "rmcol newdir2\n",
+ match: `(?ms).*Deleting collection .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "get newdir2/testfile2 '" + checkfile.Name() + "'\n",
+ match: `(?ms).*Downloading .* failed.*`,
+ },
+ {
+ path: "/c=" + arvadostest.UserAgreementCollection + "/t=" + arv.AuthToken + "/",
+ cmd: "put '" + localfile.Name() + "' foo\n",
+ match: `(?ms).*Uploading .* failed:.*403 Forbidden.*`,
+ },
+ {
+ path: pdhPath,
+ cmd: "put '" + localfile.Name() + "' foo\n",
+ match: `(?ms).*Uploading .* failed:.*405 Method Not Allowed.*`,
+ },
+ {
+ path: pdhPath,
+ cmd: "move foo bar\n",
+ match: `(?ms).*Moving .* failed:.*405 Method Not Allowed.*`,
+ },
+ {
+ path: pdhPath,
+ cmd: "copy foo bar\n",
+ match: `(?ms).*Copying .* failed:.*405 Method Not Allowed.*`,
+ },
+ {
+ path: pdhPath,
+ cmd: "delete foo\n",
+ match: `(?ms).*Deleting .* failed:.*405 Method Not Allowed.*`,
},
} {
- c.Logf("%s %#v", "http://"+s.testServer.Addr, trial)
+ c.Logf("%s %+v", "http://"+s.testServer.Addr, trial)
+
+ os.Remove(checkfile.Name())
+
cmd := exec.Command("cadaver", "http://"+s.testServer.Addr+trial.path)
cmd.Stdin = bytes.NewBufferString(trial.cmd)
stdout, err := cmd.StdoutPipe()
err = cmd.Wait()
c.Check(err, check.Equals, nil)
c.Check(buf.String(), check.Matches, trial.match)
+
+ if trial.data == nil {
+ continue
+ }
+ checkfile, err = os.Open(checkfile.Name())
+ c.Assert(err, check.IsNil)
+ checkfile.Seek(0, os.SEEK_SET)
+ got, err := ioutil.ReadAll(checkfile)
+ c.Check(got, check.DeepEquals, trial.data)
+ c.Check(err, check.IsNil)
}
}
//
// SPDX-License-Identifier: AGPL-3.0
-// Keep-web provides read-only HTTP access to files stored in Keep. It
-// serves public data to anonymous and unauthenticated clients, and
-// serves private data to clients that supply Arvados API tokens. It
-// can be installed anywhere with access to Keep services, typically
-// behind a web proxy that supports TLS.
+// Keep-web provides read/write HTTP (WebDAV) access to files stored
+// in Keep. It serves public data to anonymous and unauthenticated
+// clients, and serves private data to clients that supply Arvados API
+// tokens. It can be installed anywhere with access to Keep services,
+// typically behind a web proxy that supports TLS.
//
// See http://doc.arvados.org/install/install-keep-web.html.
//
//
// Proxy configuration
//
-// Keep-web does not support SSL natively. Typically, it is installed
+// Keep-web does not support TLS natively. Typically, it is installed
// behind a proxy like nginx.
//
// Here is an example nginx configuration.
"html"
"html/template"
"io"
+ "log"
"net/http"
"net/url"
"os"
json.NewEncoder(w).Encode(status)
}
+// updateOnSuccess wraps httpserver.ResponseWriter. If the handler
+// sends an HTTP header indicating success, updateOnSuccess first
+// calls the provided update func. If the update func fails, a 500
+// response is sent, and the status code and body sent by the handler
+// are ignored (all response writes return the update error).
+type updateOnSuccess struct {
+ httpserver.ResponseWriter
+ update func() error
+ sentHeader bool
+ err error
+}
+
+func (uos *updateOnSuccess) Write(p []byte) (int, error) {
+ if uos.err != nil {
+ return 0, uos.err
+ }
+ if !uos.sentHeader {
+ uos.WriteHeader(http.StatusOK)
+ }
+ return uos.ResponseWriter.Write(p)
+}
+
+func (uos *updateOnSuccess) WriteHeader(code int) {
+ if !uos.sentHeader {
+ uos.sentHeader = true
+ if code >= 200 && code < 400 {
+ if uos.err = uos.update(); uos.err != nil {
+ code := http.StatusInternalServerError
+ if err, ok := uos.err.(*arvados.TransactionError); ok {
+ code = err.StatusCode
+ }
+ log.Printf("update() changes response to HTTP %d: %T %q", code, uos.err, uos.err)
+ http.Error(uos.ResponseWriter, uos.err.Error(), code)
+ return
+ }
+ }
+ }
+ uos.ResponseWriter.WriteHeader(code)
+}
+
var (
+ writeMethod = map[string]bool{
+ "COPY": true,
+ "DELETE": true,
+ "MKCOL": true,
+ "MOVE": true,
+ "PUT": true,
+ "RMCOL": true,
+ }
webdavMethod = map[string]bool{
+ "COPY": true,
+ "DELETE": true,
+ "MKCOL": true,
+ "MOVE": true,
"OPTIONS": true,
"PROPFIND": true,
+ "PUT": true,
+ "RMCOL": true,
}
browserMethod = map[string]bool{
"GET": true,
return
}
w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, Range")
- w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS, PROPFIND")
+ w.Header().Set("Access-Control-Allow-Methods", "COPY, DELETE, GET, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PUT, RMCOL")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Max-Age", "86400")
statusCode = http.StatusOK
}
applyContentDispositionHdr(w, r, basename, attachment)
- fs := collection.FileSystem(&arvados.Client{
+ client := &arvados.Client{
APIHost: arv.ApiServer,
AuthToken: arv.ApiToken,
Insecure: arv.ApiInsecure,
- }, kc)
+ }
+ fs, err := collection.FileSystem(client, kc)
+ if err != nil {
+ statusCode, statusText = http.StatusInternalServerError, err.Error()
+ return
+ }
+
+ targetIsPDH := arvadosclient.PDHMatch(targetID)
+ if targetIsPDH && writeMethod[r.Method] {
+ statusCode, statusText = http.StatusMethodNotAllowed, errReadOnly.Error()
+ return
+ }
+
if webdavMethod[r.Method] {
+ if writeMethod[r.Method] {
+ // Save the collection only if/when all
+ // webdav->filesystem operations succeed --
+ // and send a 500 error if the modified
+ // collection can't be saved.
+ w = &updateOnSuccess{
+ ResponseWriter: w,
+ update: func() error {
+ return h.Config.Cache.Update(client, *collection, fs)
+ }}
+ }
h := webdav.Handler{
- Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
- FileSystem: &webdavFS{collfs: fs},
+ Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
+ FileSystem: &webdavFS{
+ collfs: fs,
+ writing: writeMethod[r.Method],
+ },
LockSystem: h.webdavLS,
Logger: func(_ *http.Request, err error) {
- if os.IsNotExist(err) {
- statusCode, statusText = http.StatusNotFound, err.Error()
- } else if err != nil {
- statusCode, statusText = http.StatusInternalServerError, err.Error()
+ if err != nil {
+ log.Printf("error from webdav handler: %q", err)
}
},
}
c.Check(resp.Code, check.Equals, http.StatusOK)
c.Check(resp.Body.String(), check.Equals, "")
c.Check(resp.Header().Get("Access-Control-Allow-Origin"), check.Equals, "*")
- c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "GET, POST, OPTIONS, PROPFIND")
+ c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "COPY, DELETE, GET, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PUT, RMCOL")
c.Check(resp.Header().Get("Access-Control-Allow-Headers"), check.Equals, "Authorization, Content-Type, Range")
// Check preflight for a disallowed request
resp = httptest.NewRecorder()
- req.Header.Set("Access-Control-Request-Method", "DELETE")
+ req.Header.Set("Access-Control-Request-Method", "MAKE-COFFEE")
h.ServeHTTP(resp, req)
c.Check(resp.Body.String(), check.Equals, "")
c.Check(resp.Code, check.Equals, http.StatusMethodNotAllowed)
"errors"
"fmt"
prand "math/rand"
- "net/http"
"os"
- "sync"
+ "path"
+ "strings"
"sync/atomic"
"time"
errReadOnly = errors.New("read-only filesystem")
)
-// webdavFS implements a read-only webdav.FileSystem by wrapping an
+// webdavFS implements a webdav.FileSystem by wrapping an
// arvados.CollectionFilesystem.
+//
+// Collections don't preserve empty directories, so Mkdir is
+// effectively a no-op, and we need to make parent dirs spring into
+// existence automatically so sequences like "mkcol foo; put foo/bar"
+// work as expected.
type webdavFS struct {
- collfs arvados.CollectionFileSystem
+ collfs arvados.CollectionFileSystem
+ writing bool
}
-var _ webdav.FileSystem = &webdavFS{}
+func (fs *webdavFS) makeparents(name string) {
+ dir, name := path.Split(name)
+ if dir == "" || dir == "/" {
+ return
+ }
+ dir = dir[:len(dir)-1]
+ fs.makeparents(dir)
+ fs.collfs.Mkdir(dir, 0755)
+}
func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) error {
- return errReadOnly
+ if !fs.writing {
+ return errReadOnly
+ }
+ name = strings.TrimRight(name, "/")
+ fs.makeparents(name)
+ return fs.collfs.Mkdir(name, 0755)
}
-func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (webdav.File, error) {
- fi, err := fs.collfs.Stat(name)
- if err != nil {
- return nil, err
+func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (f webdav.File, err error) {
+ writing := flag&(os.O_WRONLY|os.O_RDWR) != 0
+ if writing {
+ fs.makeparents(name)
+ }
+ f, err = fs.collfs.OpenFile(name, flag, perm)
+ if !fs.writing {
+ // webdav module returns 404 on all OpenFile errors,
+ // but returns 405 Method Not Allowed if OpenFile()
+ // succeeds but Write() or Close() fails. We'd rather
+ // have 405.
+ f = writeFailer{File: f, err: errReadOnly}
}
- return &webdavFile{collfs: fs.collfs, fileInfo: fi, name: name}, nil
+ return
}
func (fs *webdavFS) RemoveAll(ctx context.Context, name string) error {
- return errReadOnly
+ return fs.collfs.RemoveAll(name)
}
func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
- return errReadOnly
-}
-
-func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
- return fs.collfs.Stat(name)
-}
-
-// webdavFile implements a read-only webdav.File by wrapping
-// http.File.
-//
-// The http.File is opened from an arvados.CollectionFileSystem, but
-// not until Seek, Read, or Readdir is called. This deferred-open
-// strategy makes webdav's OpenFile-Stat-Close cycle fast even though
-// the collfs's Open method is slow. This is relevant because webdav
-// does OpenFile-Stat-Close on each file when preparing directory
-// listings.
-//
-// Writes to a webdavFile always fail.
-type webdavFile struct {
- // fields populated by (*webdavFS).OpenFile()
- collfs http.FileSystem
- fileInfo os.FileInfo
- name string
-
- // internal fields
- file http.File
- loadOnce sync.Once
- err error
-}
-
-func (f *webdavFile) load() {
- f.file, f.err = f.collfs.Open(f.name)
-}
-
-func (f *webdavFile) Write([]byte) (int, error) {
- return 0, errReadOnly
-}
-
-func (f *webdavFile) Seek(offset int64, whence int) (int64, error) {
- f.loadOnce.Do(f.load)
- if f.err != nil {
- return 0, f.err
+ if !fs.writing {
+ return errReadOnly
}
- return f.file.Seek(offset, whence)
+ fs.makeparents(newName)
+ return fs.collfs.Rename(oldName, newName)
}
-func (f *webdavFile) Read(buf []byte) (int, error) {
- f.loadOnce.Do(f.load)
- if f.err != nil {
- return 0, f.err
+func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
+ if fs.writing {
+ fs.makeparents(name)
}
- return f.file.Read(buf)
+ return fs.collfs.Stat(name)
}
-func (f *webdavFile) Close() error {
- if f.file == nil {
- // We never called load(), or load() failed
- return f.err
- }
- return f.file.Close()
+type writeFailer struct {
+ webdav.File
+ err error
}
-func (f *webdavFile) Readdir(n int) ([]os.FileInfo, error) {
- f.loadOnce.Do(f.load)
- if f.err != nil {
- return nil, f.err
- }
- return f.file.Readdir(n)
+func (wf writeFailer) Write([]byte) (int, error) {
+ return 0, wf.err
}
-func (f *webdavFile) Stat() (os.FileInfo, error) {
- return f.fileInfo, nil
+func (wf writeFailer) Close() error {
+ return wf.err
}
// noLockSystem implements webdav.LockSystem by returning success for
--- /dev/null
+package main
+
+import "golang.org/x/net/webdav"
+
+var _ webdav.FileSystem = &webdavFS{}
"fmt"
"io"
"io/ioutil"
- "log"
"net"
"net/http"
"os"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/health"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ log "github.com/Sirupsen/logrus"
"github.com/coreos/go-systemd/daemon"
"github.com/ghodss/yaml"
"github.com/gorilla/mux"
router http.Handler
)
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
func main() {
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ })
+
cfg := DefaultConfig()
flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
// Start serving requests.
router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
- http.Serve(listener, router)
+ http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
log.Println("shutting down")
}
Timeout: h.timeout,
Transport: h.transport,
},
- proto: req.Proto,
+ proto: req.Proto,
+ requestID: req.Header.Get("X-Request-Id"),
}
return &kc
}
"errors"
"fmt"
"io/ioutil"
- "log"
+ "math/rand"
"net/http"
"net/http/httptest"
"os"
"strings"
+ "sync"
"testing"
"time"
time.Sleep(ms * time.Millisecond)
}
if listener == nil {
- log.Fatalf("Timed out waiting for listener to start")
+ panic("Timed out waiting for listener to start")
}
}
}
}
+func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+ router.(*proxyHandler).timeout = time.Nanosecond
+
+ buf := make([]byte, 1<<20)
+ rand.Read(buf)
+ var wg sync.WaitGroup
+ for i := 0; i < 128; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ kc.PutB(buf)
+ }()
+ }
+ done := make(chan bool)
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ c.Error("timeout")
+ }
+}
+
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
kc := runProxy(c, nil, false)
defer closeListener()
{
_, _, err := kc.Ask(hash)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Ask (expected BlockNotFound)")
+ c.Log("Finished Ask (expected BlockNotFound)")
}
{
reader, _, _, err := kc.Get(hash)
c.Check(reader, Equals, nil)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Get (expected BlockNotFound)")
+ c.Log("Finished Get (expected BlockNotFound)")
}
// Note in bug #5309 among other errors keepproxy would set
c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB (expected success)")
+ c.Log("Finished PutB (expected success)")
}
{
blocklen, _, err := kc.Ask(hash2)
c.Assert(err, Equals, nil)
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Ask (expected success)")
+ c.Log("Finished Ask (expected success)")
}
{
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte("foo"))
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Get (expected success)")
+ c.Log("Finished Get (expected success)")
}
{
c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB zero block")
+ c.Log("Finished PutB zero block")
}
{
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte(""))
c.Check(blocklen, Equals, int64(0))
- log.Print("Finished Get zero block")
+ c.Log("Finished Get zero block")
}
}
errNotFound, _ := err.(keepclient.ErrNotFound)
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
- log.Print("Ask 1")
+ c.Log("Ask 1")
}
{
c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
- log.Print("PutB")
+ c.Log("PutB")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
+ c.Log("Ask 2")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
+ c.Log("Get")
}
}
errNotFound, _ := err.(keepclient.ErrNotFound)
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
- log.Print("Ask 1")
+ c.Log("Ask 1")
}
{
c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("PutB")
+ c.Log("PutB")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
+ c.Log("Ask 2")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
+ c.Log("Get")
}
}
var viaAlias = "keepproxy"
type proxyClient struct {
- client keepclient.HTTPClient
- proto string
+ client keepclient.HTTPClient
+ proto string
+ requestID string
}
func (pc *proxyClient) Do(req *http.Request) (*http.Response, error) {
req.Header.Add("Via", pc.proto+" "+viaAlias)
+ req.Header.Add("X-Request-Id", pc.requestID)
return pc.client.Do(req)
}
ok := make(chan struct{})
go func() {
req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
- (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
+ MakeRESTRouter().ServeHTTP(resp, req)
ok <- struct{}{}
}()
// Start a round-robin VolumeManager with the volumes we have found.
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
- // Middleware stack: logger, MaxRequests limiter, method handlers
+ // Middleware/handler stack
router := MakeRESTRouter()
limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
router.limiter = limiter
- http.Handle("/", &LoggingRESTRouter{router: limiter})
+ http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-// LoggingRESTRouter
-// LoggingResponseWriter
-
-import (
- "context"
- "net/http"
- "strings"
- "time"
-
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "git.curoverse.com/arvados.git/sdk/go/stats"
- log "github.com/Sirupsen/logrus"
-)
-
-// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
-type LoggingResponseWriter struct {
- Status int
- Length int
- http.ResponseWriter
- ResponseBody string
- sentHdr time.Time
-}
-
-// CloseNotify implements http.CloseNotifier.
-func (resp *LoggingResponseWriter) CloseNotify() <-chan bool {
- wrapped, ok := resp.ResponseWriter.(http.CloseNotifier)
- if !ok {
- // If upstream doesn't implement CloseNotifier, we can
- // satisfy the interface by returning a channel that
- // never sends anything (the interface doesn't
- // guarantee that anything will ever be sent on the
- // channel even if the client disconnects).
- return nil
- }
- return wrapped.CloseNotify()
-}
-
-// WriteHeader writes header to ResponseWriter
-func (resp *LoggingResponseWriter) WriteHeader(code int) {
- if resp.sentHdr == zeroTime {
- resp.sentHdr = time.Now()
- }
- resp.Status = code
- resp.ResponseWriter.WriteHeader(code)
-}
-
-var zeroTime time.Time
-
-func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
- if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime {
- resp.sentHdr = time.Now()
- }
- resp.Length += len(data)
- if resp.Status >= 400 {
- resp.ResponseBody += string(data)
- }
- return resp.ResponseWriter.Write(data)
-}
-
-// LoggingRESTRouter is used to add logging capabilities to mux.Router
-type LoggingRESTRouter struct {
- router http.Handler
- idGenerator httpserver.IDGenerator
-}
-
-func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
- tStart := time.Now()
-
- // Attach a requestID-aware logger to the request context.
- lgr := log.WithField("RequestID", loggingRouter.idGenerator.Next())
- ctx := context.WithValue(req.Context(), "logger", lgr)
- req = req.WithContext(ctx)
-
- lgr = lgr.WithFields(log.Fields{
- "remoteAddr": req.RemoteAddr,
- "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
- "reqMethod": req.Method,
- "reqPath": req.URL.Path[1:],
- "reqBytes": req.ContentLength,
- })
- lgr.Debug("request")
-
- resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
- loggingRouter.router.ServeHTTP(&resp, req)
- tDone := time.Now()
-
- statusText := http.StatusText(resp.Status)
- if resp.Status >= 400 {
- statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
- }
- if resp.sentHdr == zeroTime {
- // Nobody changed status or wrote any data, i.e., we
- // returned a 200 response with no body.
- resp.sentHdr = tDone
- }
-
- lgr.WithFields(log.Fields{
- "timeTotal": stats.Duration(tDone.Sub(tStart)),
- "timeToStatus": stats.Duration(resp.sentHdr.Sub(tStart)),
- "timeWriteBody": stats.Duration(tDone.Sub(resp.sentHdr)),
- "respStatusCode": resp.Status,
- "respStatus": statusText,
- "respBytes": resp.Length,
- }).Info("response")
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "net/http"
- "testing"
-)
-
-func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
- http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
-}
s3RaceWindow time.Duration
s3ACL = s3.Private
+
+ zeroTime time.Time
)
const (
create_calls += 1
if create_calls < 2:
raise RateLimitReachedError(429, "Rate limit exceeded",
- retry_after=12)
+ headers={'retry-after': '12'})
elif create_calls < 3:
raise BaseHTTPError(429, "Rate limit exceeded",
{'retry-after': '2'})
'setuptools'
],
dependency_links=[
- "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev2.zip"
+ "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev3.zip"
],
test_suite='tests',
tests_require=[
'requests',
'pbr<1.7.0',
'mock>=1.0',
- 'apache-libcloud==2.2.2.dev2',
+ 'apache-libcloud==2.2.2.dev3',
],
zip_safe=False,
cmdclass={'egg_info': tagger},
SSO_ROOT="$ARVBOX_DATA/sso-devise-omniauth-provider"
fi
+if test -z "$COMPOSER_ROOT" ; then
+ COMPOSER_ROOT="$ARVBOX_DATA/composer"
+fi
+
PG_DATA="$ARVBOX_DATA/postgres"
VAR_DATA="$ARVBOX_DATA/var"
PASSENGER="$ARVBOX_DATA/passenger"
if ! test -d "$SSO_ROOT" ; then
git clone https://github.com/curoverse/sso-devise-omniauth-provider.git "$SSO_ROOT"
fi
+ if ! test -d "$COMPOSER_ROOT" ; then
+ git clone https://github.com/curoverse/composer.git "$COMPOSER_ROOT"
+ fi
if test "$CONFIG" = test ; then
--privileged \
"--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
"--volume=$SSO_ROOT:/usr/src/sso:rw" \
+ "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
"--volume=$PG_DATA:/var/lib/postgresql:rw" \
"--volume=$VAR_DATA:/var/lib/arvados:rw" \
"--volume=$PASSENGER:/var/lib/passenger:rw" \
--privileged \
"--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
"--volume=$SSO_ROOT:/usr/src/sso:rw" \
+ "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
"--volume=$PG_DATA:/var/lib/postgresql:rw" \
"--volume=$VAR_DATA:/var/lib/arvados:rw" \
"--volume=$PASSENGER:/var/lib/passenger:rw" \
libjson-perl nginx gitolite3 lsof libreadline-dev \
apt-transport-https ca-certificates slurm-wlm \
linkchecker python3-virtualenv python-virtualenv xvfb iceweasel \
- libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr && \
+ libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr \
+ libsecret-1-dev && \
apt-get clean
ENV RUBYVERSION_MINOR 2.3
RUN pip install -U setuptools
-ENV NODEVERSION v6.11.2
+ENV NODEVERSION v6.11.4
# Install nodejs binary
RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
FROM arvados/arvbox-base
ARG arvados_version
ARG sso_version=master
+ARG composer_version=master
RUN cd /usr/src && \
git clone --no-checkout https://github.com/curoverse/arvados.git && \
git -C arvados checkout ${arvados_version} && \
git clone --no-checkout https://github.com/curoverse/sso-devise-omniauth-provider.git sso && \
- git -C sso checkout ${sso_version}
+ git -C sso checkout ${sso_version} && \
+ git clone --no-checkout https://github.com/curoverse/composer.git && \
+ git -C composer checkout ${composer_version}
ADD service/ /var/lib/arvbox/service
RUN ln -sf /var/lib/arvbox/service /etc
[workbench]=80
[api]=8000
[sso]=8900
+ [composer]=4200
[arv-git-httpd]=9001
[keep-web]=9002
[keepproxy]=25100
--uid $HOSTUID --gid $HOSTGID \
--non-unique \
--groups docker \
+ --shell /bin/bash \
arvbox
useradd --home-dir /var/lib/arvados/git --uid $HOSTUID --gid $HOSTGID --non-unique git
useradd --groups docker crunch
--- /dev/null
+/usr/local/lib/arvbox/logger
\ No newline at end of file
--- /dev/null
+#!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+set -e
+
+/usr/local/lib/arvbox/runsu.sh $0-service $1
--- /dev/null
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+exec 2>&1
+set -ex -o pipefail
+
+. /usr/local/lib/arvbox/common.sh
+
+cd /usr/src/composer
+
+npm install yarn
+
+PATH=$PATH:/usr/src/composer/node_modules/.bin
+
+yarn install
+
+if test "$1" != "--only-deps" ; then
+ echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/arvados-configuration.yml
+ exec ng serve --host 0.0.0.0 --port 4200 --env=webdev
+fi
fi
run_bundler --without=development
-bundle exec passenger start --runtime-check-only --runtime-dir=/var/lib/passenger
+bundle exec passenger-config build-native-support
+bundle exec passenger-config install-standalone-runtime
if test "$1" = "--only-deps" ; then
exit
rm -rf tmp
mkdir -p tmp/cache
+bundle exec rake assets:precompile
bundle exec rake db:migrate
set +u
fi
exec bundle exec passenger start --port=${services[sso]} \
- --runtime-dir=/var/lib/passenger \
--ssl --ssl-certificate=/var/lib/arvados/self-signed.pem \
--ssl-certificate-key=/var/lib/arvados/self-signed.key
mkdir tmp
chown arvbox:arvbox tmp
+if test -s /var/lib/arvados/workbench_rails_env ; then
+ export RAILS_ENV=$(cat /var/lib/arvados/workbench_rails_env)
+else
+ export RAILS_ENV=development
+fi
+
if test "$1" != "--only-deps" ; then
exec bundle exec passenger start --port 80 \
--user arvbox --runtime-dir=/var/lib/passenger
keep_web_download_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
keep_web_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
arvados_docsite: http://$localip:${services[doc]}/
+ force_ssl: false
EOF
+bundle exec rake assets:precompile
+
(cd config && /usr/local/lib/arvbox/application_yml_override.py)
include agpl-3.0.txt
include crunchstat_summary/dygraphs.js
-include crunchstat_summary/chartjs.js
+include crunchstat_summary/synchronizer.js
});
});
+ var sync = Dygraph.synchronize(Object.values(charts), {range: false});
+
if (typeof window.debug === 'undefined')
window.debug = {};
window.debug.charts = charts;
class DygraphsChart(crunchstat_summary.webchart.WebChart):
CSS = 'https://cdnjs.cloudflare.com/ajax/libs/dygraph/2.0.0/dygraph.min.css'
JSLIB = 'https://cdnjs.cloudflare.com/ajax/libs/dygraph/2.0.0/dygraph.min.js'
- JSASSET = 'dygraphs.js'
+ JSASSETS = ['synchronizer.js','dygraphs.js']
def headHTML(self):
return '<link rel="stylesheet" href="{}">\n'.format(self.CSS)
--- /dev/null
+/**
+ * Synchronize zooming and/or selections between a set of dygraphs.
+ *
+ * Usage:
+ *
+ * var g1 = new Dygraph(...),
+ * g2 = new Dygraph(...),
+ * ...;
+ * var sync = Dygraph.synchronize(g1, g2, ...);
+ * // charts are now synchronized
+ * sync.detach();
+ * // charts are no longer synchronized
+ *
+ * You can set options using the last parameter, for example:
+ *
+ * var sync = Dygraph.synchronize(g1, g2, g3, {
+ * selection: true,
+ * zoom: true
+ * });
+ *
+ * The default is to synchronize both of these.
+ *
+ * Instead of passing one Dygraph object as each parameter, you may also pass an
+ * array of dygraphs:
+ *
+ * var sync = Dygraph.synchronize([g1, g2, g3], {
+ * selection: false,
+ * zoom: true
+ * });
+ *
+ * You may also set `range: false` if you wish to only sync the x-axis.
+ * The `range` option has no effect unless `zoom` is true (the default).
+ *
+ * SPDX-License-Identifier: MIT
+ * Original source: https://github.com/danvk/dygraphs/blob/master/src/extras/synchronizer.js
+ * at commit b55a71d768d2f8de62877c32b3aec9e9975ac389
+ *
+ * Copyright (c) 2009 Dan Vanderkam
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+(function() {
+/* global Dygraph:false */
+'use strict';
+
+var Dygraph;
+if (window.Dygraph) {
+ Dygraph = window.Dygraph;
+} else if (typeof(module) !== 'undefined') {
+ Dygraph = require('../dygraph');
+}
+
+var synchronize = function(/* dygraphs..., opts */) {
+ if (arguments.length === 0) {
+ throw 'Invalid invocation of Dygraph.synchronize(). Need >= 1 argument.';
+ }
+
+ var OPTIONS = ['selection', 'zoom', 'range'];
+ var opts = {
+ selection: true,
+ zoom: true,
+ range: true
+ };
+ var dygraphs = [];
+ var prevCallbacks = [];
+
+ var parseOpts = function(obj) {
+ if (!(obj instanceof Object)) {
+ throw 'Last argument must be either Dygraph or Object.';
+ } else {
+ for (var i = 0; i < OPTIONS.length; i++) {
+ var optName = OPTIONS[i];
+ if (obj.hasOwnProperty(optName)) opts[optName] = obj[optName];
+ }
+ }
+ };
+
+ if (arguments[0] instanceof Dygraph) {
+ // Arguments are Dygraph objects.
+ for (var i = 0; i < arguments.length; i++) {
+ if (arguments[i] instanceof Dygraph) {
+ dygraphs.push(arguments[i]);
+ } else {
+ break;
+ }
+ }
+ if (i < arguments.length - 1) {
+ throw 'Invalid invocation of Dygraph.synchronize(). ' +
+ 'All but the last argument must be Dygraph objects.';
+ } else if (i == arguments.length - 1) {
+ parseOpts(arguments[arguments.length - 1]);
+ }
+ } else if (arguments[0].length) {
+ // Invoked w/ list of dygraphs, options
+ for (var i = 0; i < arguments[0].length; i++) {
+ dygraphs.push(arguments[0][i]);
+ }
+ if (arguments.length == 2) {
+ parseOpts(arguments[1]);
+ } else if (arguments.length > 2) {
+ throw 'Invalid invocation of Dygraph.synchronize(). ' +
+ 'Expected two arguments: array and optional options argument.';
+ } // otherwise arguments.length == 1, which is fine.
+ } else {
+ throw 'Invalid invocation of Dygraph.synchronize(). ' +
+ 'First parameter must be either Dygraph or list of Dygraphs.';
+ }
+
+ if (dygraphs.length < 2) {
+ throw 'Invalid invocation of Dygraph.synchronize(). ' +
+ 'Need two or more dygraphs to synchronize.';
+ }
+
+ var readycount = dygraphs.length;
+ for (var i = 0; i < dygraphs.length; i++) {
+ var g = dygraphs[i];
+ g.ready( function() {
+ if (--readycount == 0) {
+ // store original callbacks
+ var callBackTypes = ['drawCallback', 'highlightCallback', 'unhighlightCallback'];
+ for (var j = 0; j < dygraphs.length; j++) {
+ if (!prevCallbacks[j]) {
+ prevCallbacks[j] = {};
+ }
+ for (var k = callBackTypes.length - 1; k >= 0; k--) {
+ prevCallbacks[j][callBackTypes[k]] = dygraphs[j].getFunctionOption(callBackTypes[k]);
+ }
+ }
+
+ // Listen for draw, highlight, unhighlight callbacks.
+ if (opts.zoom) {
+ attachZoomHandlers(dygraphs, opts, prevCallbacks);
+ }
+
+ if (opts.selection) {
+ attachSelectionHandlers(dygraphs, prevCallbacks);
+ }
+ }
+ });
+ }
+
+ return {
+ detach: function() {
+ for (var i = 0; i < dygraphs.length; i++) {
+ var g = dygraphs[i];
+ if (opts.zoom) {
+ g.updateOptions({drawCallback: prevCallbacks[i].drawCallback});
+ }
+ if (opts.selection) {
+ g.updateOptions({
+ highlightCallback: prevCallbacks[i].highlightCallback,
+ unhighlightCallback: prevCallbacks[i].unhighlightCallback
+ });
+ }
+ }
+ // release references & make subsequent calls throw.
+ dygraphs = null;
+ opts = null;
+ prevCallbacks = null;
+ }
+ };
+};
+
+function arraysAreEqual(a, b) {
+ if (!Array.isArray(a) || !Array.isArray(b)) return false;
+ var i = a.length;
+ if (i !== b.length) return false;
+ while (i--) {
+ if (a[i] !== b[i]) return false;
+ }
+ return true;
+}
+
+function attachZoomHandlers(gs, syncOpts, prevCallbacks) {
+ var block = false;
+ for (var i = 0; i < gs.length; i++) {
+ var g = gs[i];
+ g.updateOptions({
+ drawCallback: function(me, initial) {
+ if (block || initial) return;
+ block = true;
+ var opts = {
+ dateWindow: me.xAxisRange()
+ };
+ if (syncOpts.range) opts.valueRange = me.yAxisRange();
+
+ for (var j = 0; j < gs.length; j++) {
+ if (gs[j] == me) {
+ if (prevCallbacks[j] && prevCallbacks[j].drawCallback) {
+ prevCallbacks[j].drawCallback.apply(this, arguments);
+ }
+ continue;
+ }
+
+ // Only redraw if there are new options
+ if (arraysAreEqual(opts.dateWindow, gs[j].getOption('dateWindow')) &&
+ arraysAreEqual(opts.valueRange, gs[j].getOption('valueRange'))) {
+ continue;
+ }
+
+ gs[j].updateOptions(opts);
+ }
+ block = false;
+ }
+ }, true /* no need to redraw */);
+ }
+}
+
+function attachSelectionHandlers(gs, prevCallbacks) {
+ var block = false;
+ for (var i = 0; i < gs.length; i++) {
+ var g = gs[i];
+
+ g.updateOptions({
+ highlightCallback: function(event, x, points, row, seriesName) {
+ if (block) return;
+ block = true;
+ var me = this;
+ for (var i = 0; i < gs.length; i++) {
+ if (me == gs[i]) {
+ if (prevCallbacks[i] && prevCallbacks[i].highlightCallback) {
+ prevCallbacks[i].highlightCallback.apply(this, arguments);
+ }
+ continue;
+ }
+ var idx = gs[i].getRowForX(x);
+ if (idx !== null) {
+ gs[i].setSelection(idx, seriesName);
+ }
+ }
+ block = false;
+ },
+ unhighlightCallback: function(event) {
+ if (block) return;
+ block = true;
+ var me = this;
+ for (var i = 0; i < gs.length; i++) {
+ if (me == gs[i]) {
+ if (prevCallbacks[i] && prevCallbacks[i].unhighlightCallback) {
+ prevCallbacks[i].unhighlightCallback.apply(this, arguments);
+ }
+ continue;
+ }
+ gs[i].clearSelection();
+ }
+ block = false;
+ }
+ }, true /* no need to redraw */);
+ }
+}
+
+Dygraph.synchronize = synchronize;
+
+})();
class WebChart(object):
"""Base class for a web chart.
- Subclasses must assign JSLIB and JSASSET, and override the
+ Subclasses must assign JSLIB and JSASSETS, and override the
chartdata() method.
"""
JSLIB = None
def js(self):
return 'var chartdata = {};\n{}'.format(
json.dumps(self.sections()),
- pkg_resources.resource_string('crunchstat_summary', self.JSASSET))
+ '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa) for jsa in self.JSASSETS]))
def sections(self):
return [