doc/sdk/python/arvados
doc/sdk/R/arvados
doc/sdk/java-v2/javadoc
-sdk/perl/MYMETA.*
-sdk/perl/Makefile
-sdk/perl/blib
-sdk/perl/pm_to_blib
*/vendor
*/*/vendor
sdk/java/target
net-ssh-gateway (2.0.0)
net-ssh (>= 4.0.0)
nio4r (2.5.8)
- nokogiri (1.13.7)
+ nokogiri (1.13.9)
mini_portile2 (~> 2.8.0)
racc (~> 1.4)
npm-rails (0.2.1)
rails (>= 3.2)
oj (3.7.12)
os (1.1.1)
- passenger (6.0.2)
+ passenger (6.0.15)
rack
rake (>= 0.8.1)
piwik_analytics (1.0.2)
# URL for browsing source code for the given version.
def version_link_target version
- "https://arvados.org/projects/arvados/repository/changes?rev=#{version.sub(/-.*/, "")}"
+ "https://dev.arvados.org/projects/arvados/repository/changes?rev=#{version.sub(/-.*/, "")}"
end
end
<%= link_to "contact us", Rails.configuration.Workbench.ActivationContactLink %>.
You should receive an email at the address you used to log in when
your account is activated. In the mean time, you can
- <%= link_to "learn more about Arvados", "https://arvados.org/projects/arvados/wiki/Introduction_to_Arvados" %>,
+ <%= link_to "learn more about Arvados", "https://arvados.org/" %>,
and <%= link_to "read the Arvados user guide", "http://doc.arvados.org/user" %>.
</p>
<p style="padding-bottom: 1em">
assert page.has_button?('Close'), 'No button - Close'
assert page.has_no_button?('Send problem report'), 'Found button - Send problem report'
history_links = all('a').select do |a|
- a[:href] =~ %r!^https://arvados.org/projects/arvados/repository/changes\?rev=[0-9a-f]+$!
+ a[:href] =~ %r!^https://dev.arvados.org/projects/arvados/repository/changes\?rev=[0-9a-f]+$!
end
assert_operator(2, :<=, history_links.count,
"Should have found two links to revision history " +
SHELL ["/bin/bash", "-c"]
# Install dependencies.
-RUN yum -q -y install make automake gcc gcc-c++ libyaml-devel patch readline-devel zlib-devel libffi-devel openssl-devel bzip2 libtool bison sqlite-devel rpm-build git perl-ExtUtils-MakeMaker libattr-devel nss-devel libcurl-devel which tar unzip scl-utils centos-release-scl postgresql-devel fuse-devel xz-libs git wget pam-devel
+RUN yum -q -y install make automake gcc gcc-c++ libyaml-devel patch readline-devel zlib-devel libffi-devel openssl-devel bzip2 libtool bison sqlite-devel rpm-build git libattr-devel nss-devel libcurl-devel which tar unzip scl-utils centos-release-scl postgresql-devel fuse-devel xz-libs git wget pam-devel
# Install RVM
ADD generated/mpapis.asc /tmp/
keep-rsync
keep-block-check
keep-web
- libarvados-perl
libpam-arvados-go
python3-cwltest
python3-arvados-fuse
# Required due to CVE-2022-24765
git config --global --add safe.directory /arvados
-# Perl packages
-debug_echo -e "\nPerl packages\n"
-
-handle_libarvados_perl
-
# Ruby gems
debug_echo -e "\nRuby gems\n"
)
}
-# Usage: handle_libarvados_perl
-handle_libarvados_perl () {
- if [[ -n "$ONLY_BUILD" ]] && [[ "$ONLY_BUILD" != "libarvados-perl" ]] ; then
- debug_echo -e "Skipping build of libarvados-perl package."
- return 0
- fi
- # The perl sdk subdirectory is so old that it has no tag in its history,
- # which causes version_at_commit.sh to fail. Just rebuild it every time.
- cd "$WORKSPACE"
- libarvados_perl_version="$(version_from_git)"
- cd "$WORKSPACE/sdk/perl"
-
- cd $WORKSPACE/packages/$TARGET
- test_package_presence libarvados-perl "$libarvados_perl_version"
-
- if [[ "$?" == "0" ]]; then
- cd "$WORKSPACE/sdk/perl"
-
- if [[ -e Makefile ]]; then
- make realclean >"$STDOUT_IF_DEBUG"
- fi
- find -maxdepth 1 \( -name 'MANIFEST*' -or -name "libarvados-perl*.$FORMAT" \) \
- -delete
- rm -rf install
-
- perl Makefile.PL INSTALL_BASE=install >"$STDOUT_IF_DEBUG" && \
- make install INSTALLDIRS=perl >"$STDOUT_IF_DEBUG" && \
- fpm_build "$WORKSPACE/sdk/perl" install/lib/=/usr/share libarvados-perl \
- dir "$libarvados_perl_version" install/man/=/usr/share/man \
- "$WORKSPACE/apache-2.0.txt=/usr/share/doc/libarvados-perl/apache-2.0.txt" && \
- mv --no-clobber libarvados-perl*.$FORMAT "$WORKSPACE/packages/$TARGET/"
- fi
-}
-
# Build python packages with a virtualenv built-in
# Usage: fpm_build_virtualenv arvados-python-client sdk/python [deb|rpm] [amd64|arm64]
fpm_build_virtualenv () {
More information and background:
-https://arvados.org/projects/arvados/wiki/Running_tests
+https://dev.arvados.org/projects/arvados/wiki/Running_tests
Available tests:
VENV3DIR=
PYTHONPATH=
GEMHOME=
-PERLINSTALLBASE=
R_LIBS=
export LANG=en_US.UTF-8
echo -n 'nginx: '
PATH="$PATH:/sbin:/usr/sbin:/usr/local/sbin" nginx -v \
|| fatal "No nginx. Try: apt-get install nginx"
- echo -n 'perl: '
- perl -v | grep version \
- || fatal "No perl. Try: apt-get install perl"
- for mod in ExtUtils::MakeMaker JSON LWP Net::SSL; do
- echo -n "perl $mod: "
- perl -e "use $mod; print \"\$$mod::VERSION\\n\"" \
- || fatal "No $mod. Try: apt-get install perl-modules libcrypt-ssleay-perl libjson-perl libwww-perl"
- done
echo -n 'gitolite: '
which gitolite \
|| fatal "No gitolite. Try: apt-get install gitolite3"
fi
# Set up temporary install dirs (unless existing dirs were supplied)
- for tmpdir in VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
+ for tmpdir in VENV3DIR GOPATH GEMHOME R_LIBS
do
if [[ -z "${!tmpdir}" ]]; then
eval "$tmpdir"="$temp/$tmpdir"
rm -vf "${WORKSPACE}/tmp/*.log"
- export PERLINSTALLBASE
- export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
-
export R_LIBS
export GOPATH
fi
}
-install_sdk/perl() {
- cd "$WORKSPACE/sdk/perl" \
- && perl Makefile.PL INSTALL_BASE="$PERLINSTALLBASE" \
- && make install INSTALLDIRS=perl
-}
-
install_sdk/cli() {
install_gem arvados-cli sdk/cli
}
do_install env
do_install cmd/arvados-server go
do_install sdk/cli
- do_install sdk/perl
do_install sdk/python pip "${VENV3DIR}/bin/"
do_install sdk/ruby
do_install services/api
do_install doc
do_install sdk/ruby
do_install sdk/R
- do_install sdk/perl
do_install sdk/cli
do_install services/login-sync
for p in "${pythonstuff[@]}"
- sdk/java-v2/index.html.textile.liquid
- sdk/java-v2/example.html.textile.liquid
- sdk/java-v2/javadoc.html.textile.liquid
- - Perl:
- - sdk/perl/index.html.textile.liquid
- - sdk/perl/example.html.textile.liquid
api:
- Concepts:
- api/index.html.textile.liquid
- api/keep-s3.html.textile.liquid
- api/keep-web-urls.html.textile.liquid
- api/projects.html.textile.liquid
+ - api/properties.html.textile.liquid
- api/methods/collections.html.textile.liquid
- api/methods/repositories.html.textile.liquid
- Container engine:
- admin/logging.html.textile.liquid
- admin/metrics.html.textile.liquid
- admin/health-checks.html.textile.liquid
+ - admin/diagnostics.html.textile.liquid
- admin/management-token.html.textile.liquid
- admin/user-activity.html.textile.liquid
- Data Management:
--- /dev/null
+---
+layout: default
+navsection: admin
+title: Diagnostics
+...
+
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+The @arvados-client diagnostics@ command exercises basic cluster functionality, and identifies some common installation and configuration problems. Especially after upgrading or reconfiguring Arvados or server/network infrastructure, it can be the quickest way to identify problems.
+
+h2. Using system privileges
+
+On a server node, it is easiest to run the diagnostics command with system privileges. The word @sudo@ here instructs the @arvados-client@ command to load @Controller.ExternalURL@ and @SystemRootToken@ from @/etc/arvados/config.yml@ and use those credentials to run tests with system privileges.
+
+When run this way, diagnostics will also include "health checks":health-checks.html.
+
+<notextile><pre>
+# <span class="userinput">arvados-client sudo diagnostics</span>
+</pre></notextile>
+
+h2. Using regular user privileges
+
+On any node (server node, shell node, or a workstation outside the system network), you can also run diagnostics by setting the usual @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ environment variables. Typically this is done with a regular user account.
+
+<notextile><pre>
+$ <span class="userinput">export ARVADOS_API_HOST=zzzzz.arvadosapi.com</span>
+$ <span class="userinput">export ARVADOS_API_TOKEN=xxxxxxxxxx</span>
+$ <span class="userinput">arvados-client diagnostics</span>
+</pre></notextile>
+
+h2. Internal/external client detection
+
+The diagnostics output indicates whether its client connection is categorized by the server as internal or external. If you run diagnostics automatically with cron or a monitoring tool, you can use the @-internal-client@ or @-external-client@ flag to specify how you _expect_ the client to be categorized, and the test will fail otherwise. Example:
+
+<notextile><pre>
+# <span class="userinput">arvados-client sudo diagnostics -internal-client</span>
+[...]
+
+--- cut here --- error summary ---
+
+ERROR 60: checking internal/external client detection (11 ms): expecting internal=true external=false, but found internal=false external=true
+</pre></notextile>
+
+h2. Example output
+
+<notextile><pre>
+# <span class="userinput">arvados-client sudo diagnostics</span>
+INFO 5: running health check (same as `arvados-server check`)
+INFO 10: getting discovery document from https://zzzzz.arvadosapi.com/discovery/v1/apis/arvados/v1/rest
+INFO 20: getting exported config from https://zzzzz.arvadosapi.com/arvados/v1/config
+INFO 30: getting current user record
+INFO 40: connecting to service endpoint https://keep.zzzzz.arvadosapi.com/
+INFO 41: connecting to service endpoint https://*.collections.zzzzz.arvadosapi.com/
+INFO 42: connecting to service endpoint https://download.zzzzz.arvadosapi.com/
+INFO 43: connecting to service endpoint wss://ws.zzzzz.arvadosapi.com/websocket
+INFO 44: connecting to service endpoint https://workbench.zzzzz.arvadosapi.com/
+INFO 45: connecting to service endpoint https://workbench2.zzzzz.arvadosapi.com/
+INFO 50: checking CORS headers at https://zzzzz.arvadosapi.com/
+INFO 51: checking CORS headers at https://keep.zzzzz.arvadosapi.com/d41d8cd98f00b204e9800998ecf8427e+0
+INFO 52: checking CORS headers at https://download.zzzzz.arvadosapi.com/
+INFO 60: checking internal/external client detection
+INFO 61: reading+writing via keep service at https://keep.zzzzz.arvadosapi.com:443/
+INFO 80: finding/creating "scratch area for diagnostics" project
+INFO 90: creating temporary collection
+INFO 100: uploading file via webdav
+INFO 110: checking WebDAV ExternalURL wildcard (https://*.collections.zzzzz.arvadosapi.com/)
+INFO 120: downloading from webdav (https://d41d8cd98f00b204e9800998ecf8427e-0.collections.zzzzz.arvadosapi.com/foo)
+INFO 121: downloading from webdav (https://d41d8cd98f00b204e9800998ecf8427e-0.collections.zzzzz.arvadosapi.com/sha256:feb5d9fea6a5e9606aa995e879d862b825965ba48de054caab5ef356dc6b3412.tar)
+INFO 122: downloading from webdav (https://download.zzzzz.arvadosapi.com/c=d41d8cd98f00b204e9800998ecf8427e+0/_/foo)
+INFO 123: downloading from webdav (https://download.zzzzz.arvadosapi.com/c=d41d8cd98f00b204e9800998ecf8427e+0/_/sha256:feb5d9fea6a5e9606aa995e879d862b825965ba48de054caab5ef356dc6b3412.tar)
+INFO 124: downloading from webdav (https://a15a27cbc1c7d2d4a0d9e02529aaec7e-128.collections.zzzzz.arvadosapi.com/sha256:feb5d9fea6a5e9606aa995e879d862b825965ba48de054caab5ef356dc6b3412.tar)
+INFO 125: downloading from webdav (https://download.zzzzz.arvadosapi.com/c=zzzzz-4zz18-twitqma8mbvwydy/_/sha256:feb5d9fea6a5e9606aa995e879d862b825965ba48de054caab5ef356dc6b3412.tar)
+INFO 130: getting list of virtual machines
+INFO 140: getting workbench1 webshell page
+INFO 150: connecting to webshell service
+INFO 160: running a container
+INFO ... container request submitted, waiting up to 10m for container to run
+INFO 9990: deleting temporary collection
+</pre></notextile>
clsr1:
RemoteClusters:
clsr2:
- Host: api.cluster2.com
+ Host: api.cluster2.example
Proxy: true
ActivateUsers: true
clsr3:
- Host: api.cluster3.com
+ Host: api.cluster3.example
Proxy: true
ActivateUsers: false
</pre>
clsr1:
Login:
TrustedClients:
- "https://workbench.cluster2.com": {}
- "https://workbench.cluster3.com": {}
+ "https://workbench.cluster2.example": {}
+ "https://workbench2.cluster2.example": {}
+ "https://workbench.cluster3.example": {}
+ "https://workbench2.cluster3.example": {}
</pre>
h2. Testing
Following the above example, let's suppose @clsr1@ is our "home cluster", that is to say, we use our @clsr1@ user account as our federated identity and both @clsr2@ and @clsr3@ remote clusters are set up to allow users from @clsr1@ and to auto-activate them. The first thing to do would be to log into a remote workbench using the local user token. This can be done following these steps:
1. Log into the local workbench and get the user token
-2. Visit the remote workbench specifying the local user token by URL: @https://workbench.cluster2.com?api_token=token_from_clsr1@
+2. Visit the remote workbench specifying the local user token by URL: @https://workbench.cluster2.example?api_token=token_from_clsr1@
3. You should now be logged into @clsr2@ with your account from @clsr1@
To further test the federation setup, you can create a collection on @clsr2@, uploading some files and copying its UUID. Next, logged into a shell node on your home cluster you should be able to get that collection by running:
}
</pre>
-h2. Healthcheck aggregator
+h2. Health check aggregator
The service @arvados-health@ performs health checks on all configured services and returns a single value of @OK@ or @ERROR@ for the entire cluster. It exposes the endpoint @/_health/all@ .
The healthcheck aggregator uses the @Services@ section of the cluster-wide @config.yml@ configuration file.
+
+h2. Health check command
+
+The @arvados-server check@ command is another way to perform the same health checks as the health check aggregator service. It does not depend on the aggregator service.
+
+If all checks pass, it writes @health check OK@ to stderr (unless the @-quiet@ flag is used) and exits 0. Otherwise, it writes error messages to stderr and exits with error status.
+
+@arvados-server check -yaml@ outputs a YAML document on stdout with additional details about each service endpoint that was checked.
+
+{% codeblock as yaml %}
+Checks:
+ "arvados-api-server+http://localhost:8004/_health/ping":
+ ClockTime: "2022-11-16T16:08:57Z"
+ ConfigSourceSHA256: e2c086ae3dd290cf029cb3fe79146529622279b6280cf6cd17dc8d8c30daa57f
+ ConfigSourceTimestamp: "2022-11-07T18:08:24.539545Z"
+ HTTPStatusCode: 200
+ Health: OK
+ Response:
+ health: OK
+ ResponseTime: 0.017159
+ Server: nginx/1.14.0 + Phusion Passenger(R) 6.0.15
+ Version: 2.5.0~dev20221116141533
+ "arvados-controller+http://localhost:8003/_health/ping":
+ ClockTime: "2022-11-16T16:08:57Z"
+ ConfigSourceSHA256: e2c086ae3dd290cf029cb3fe79146529622279b6280cf6cd17dc8d8c30daa57f
+ ConfigSourceTimestamp: "2022-11-07T18:08:24.539545Z"
+ HTTPStatusCode: 200
+ Health: OK
+ Response:
+ health: OK
+ ResponseTime: 0.004748
+ Server: ""
+ Version: 2.5.0~dev20221116141533 (go1.18.8)
+# ...
+{% endcodeblock %}
To check for services that have not restarted since the configuration file was updated, run the @arvados-server check@ command on each system node.
+To test functionality and check for common problems, run the @arvados-client sudo diagnostics@ command on a system node.
+
h2(#upgrading). Upgrading Arvados
Upgrading Arvados typically involves the following steps:
# Run @arvados-server config-check@ to detect configuration errors or deprecated entries.
# Verify that the Arvados services were restarted as part of the package upgrades.
# Run @arvados-server check@ to detect services that did not restart properly.
+# Run @arvados-client sudo diagnostics@ to test functionality.
</notextile>
-h2(#main). development main (as of 2022-10-14)
+h2(#main). development main (as of 2022-10-31)
-"previous: Upgrading to 2.4.3":#v2_4_3
+"previous: Upgrading to 2.4.4":#v2_4_4
+
+h3. Google or OpenID Connect login restricted to trusted clients
+
+If you use OpenID Connect or Google login, and your cluster serves as the @LoginCluster@ in a federation _or_ your users log in from a web application other than the Workbench1 and Workbench2 @ExternalURL@ addresses in your configuration file, the additional web application URLs (e.g., the other clusters' Workbench addresses) must be listed explicitly in @Login.TrustedClients@, otherwise login will fail. Previously, login would succeed with a less-privileged token.
+
+h3. New keepstore S3 driver enabled by default
+
+A more actively maintained S3 client library is now enabled by default for keeepstore services. The previous driver is still available for use in case of unknown issues. To use the old driver, set @DriverParameters.UseAWSS3v2Driver@ to @false@ on the appropriate @Volumes@ config entries.
+
+h3. Old container logs are automatically deleted from PostgreSQL
+
+Cached copies of log entries from containers that finished more than 1 month ago are now deleted automatically (this only affects the "live" logs saved in the PostgreSQL database, not log collections saved in Keep). If you have an existing cron job that runs @rake db:delete_old_container_logs@, you can remove it. See configuration options @Containers.Logging.MaxAge@ and @Containers.Logging.SweepInterval@.
h3. Fixed salt installer template file to support container shell access
The config entries @Collections.WebDAVCache.UUIDTTL@, @...MaxCollectionEntries@, and @...MaxUUIDEntries@ are no longer used, and should be removed from your config file.
+h2(#v2_4_4). v2.4.4 (2022-11-18)
+
+"previous: Upgrading to 2.4.3":#v2_4_3
+
+This update only consists of improvements to @arvados-cwl-runner@. There are no changes to backend services.
+
h2(#v2_4_3). v2.4.3 (2022-09-21)
"previous: Upgrading to 2.4.2":#v2_4_2
|is_trashed|datetime|True if @trash_at@ is in the past, false if not.||
|frozen_by_uuid|string|For a frozen project, indicates the user who froze the project; null in all other cases. When a project is frozen, no further changes can be made to the project or its contents, even by admins. Attempting to add new items or modify, rename, move, trash, or delete the project or its contents, including any subprojects, will return an error.||
-h3. Frozen projects
+h3(#frozen). Frozen projects
A user with @manage@ permission can set the @frozen_by_uuid@ attribute of a @project@ group to their own user UUID. Once this is done, no further changes can be made to the project or its contents, including subprojects.
Because the home project is a virtual project, other operations via the @groups@ API are not supported.
-h2. Filter groups
+h2(#filtergroups). Filter groups
Filter groups are another type of virtual project. They are implemented as an Arvados @group@ object with @group_class@ set to the value "filter".
--- /dev/null
+---
+layout: default
+navsection: api
+title: "Metadata properties"
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Arvados allows you to attach arbitrary properties to "collection":methods/collections.html, "container_request":methods/container_requests.html, "link":methods/links.html and "group":methods/groups.html records that have a @properties@ field. These are key-value pairs, where the value is a valid JSON type (string, number, null, boolean, array, object).
+
+Searching for records using properties is described in "Filtering on subproperties":methods.html#subpropertyfilters .
+
+h2. Reserved properties
+
+The following properties are set by Arvados components.
+
+table(table table-bordered table-condensed).
+|_. Property name|_. Appears on|_. Value type|_.Description|
+|type|collection|string|Appears on collections to indicates the contents or usage. See "Collection type values":#collectiontype below for details.|
+|container_request|collection|string|The UUID of the container request that produced an output or log collection.|
+|docker-image-repo-tag|collection|string|For collections containing a Docker image, the repo/name:tag identifier|
+|container_uuid|collection|string|The UUID of the container that produced a collection (set on collections with type=log)|
+|cwl_input|container_request|object|On an intermediate container request, the CWL workflow-level input parameters used to generate the container request|
+|cwl_output|container_request|object|On an intermediate container request, the CWL workflow-level output parameters collected from the container request|
+|template_uuid|container_request|string|For a workflow runner container request, the workflow record that was used to launch it.|
+|username|link|string|For a "can_login":permission-model.html#links permission link, the unix username on the VM that the user will have.|
+|groups|link|array of string|For a "can_login":permission-model.html#links permission link, the unix groups on the VM that the user will be added to.|
+|image_timestamp|link|string|When resolving a Docker image name and multiple links are found with @link_class=docker_image_repo+tag@ and same @link_name@, the @image_timestamp@ is used to determine precedence (most recent wins).|
+|filters|group|array of array of string|Used to define "filter groups":projects.html#filtergroup|
+
+h3(#collectiontype). Collection "type" values
+
+Meaningful values of the @type@ property. These are recognized by Workbench when filtering on types of collections from the project content listing.
+
+table(table table-bordered table-condensed).
+|_. Type|_.Description|
+|log|The collection contains log files from a container run.|
+|output|The collection contains the output of a top-level container run (this is a container request where @requesting_container_uuid@ is null).|
+|intermediate|The collection contains the output of a child container run (this is a container request where @requesting_container_uuid@ is non-empty).|
+
+h2. Controlling user-supplied properties
+
+Arvados can be configured with a vocabulary file that lists valid properties and the range of valid values for those properties. This is described in "Metadata vocabulary":{{site.baseurl}}/admin/metadata-vocabulary.html .
+
+Arvados offers options to set properties automatically and/or prevent certain properties, once set, from being changed by non-admin users. This is described in "Configuring collection's managed properties":{{site.baseurl}}/admin/collection-managed-properties.html .
+
+The admin can require that certain properties must be non-empty before "freezing a project":methods/groups.html#frozen .
clsr1:
RemoteClusters:
clsr2:
- Host: api.cluster2.com
+ Host: api.cluster2.example
Proxy: true
clsr3:
- Host: api.cluster3.com
+ Host: api.cluster3.example
Proxy: true
</pre>
-In this example, the cluster @clsr1@ is configured to contact @api.cluster2.com@ for requests involving @clsr2@ and @api.cluster3.com@ for requests involving @clsr3@.
+In this example, the cluster @clsr1@ is configured to contact @api.cluster2.example@ for requests involving @clsr2@ and @api.cluster3.example@ for requests involving @clsr3@.
h2(#identity). Identity
AccessKeyID: <span class="userinput">""</span>
SecretAccessKey: <span class="userinput">""</span>
- # Storage provider region. For Google Cloud Storage, use ""
- # or omit.
+ # Storage provider region. If Endpoint is specified, the
+ # region determines the request signing method, and defaults
+ # to "us-east-1".
Region: <span class="userinput">us-east-1</span>
# Storage provider endpoint. For Amazon S3, use "" or
On the dispatch node, start monitoring the arvados-dispatch-cloud logs:
<notextile>
-<pre><code>~$ <span class="userinput">sudo journalctl -o cat -fu arvados-dispatch-cloud.service</span>
+<pre><code># <span class="userinput">journalctl -o cat -fu arvados-dispatch-cloud.service</span>
</code></pre>
</notextile>
-"Make sure to install the arvados/jobs image.":../install-jobs-image.html
-
-Submit a simple container request:
+In another terminal window, use the diagnostics tool to run a simple container.
<notextile>
-<pre><code>shell:~$ <span class="userinput">arv container_request create --container-request '{
- "name": "test",
- "state": "Committed",
- "priority": 1,
- "container_image": "arvados/jobs:latest",
- "command": ["echo", "Hello, Crunch!"],
- "output_path": "/out",
- "mounts": {
- "/out": {
- "kind": "tmp",
- "capacity": 1000
- }
- },
- "runtime_constraints": {
- "vcpus": 1,
- "ram": 1048576
- }
-}'</span>
+<pre><code># <span class="userinput">arvados-client sudo diagnostics</span>
+INFO 5: running health check (same as `arvados-server check`)
+INFO 10: getting discovery document from https://zzzzz.arvadosapi.com/discovery/v1/apis/arvados/v1/rest
+...
+INFO 160: running a container
+INFO ... container request submitted, waiting up to 10m for container to run
</code></pre>
</notextile>
-This command should return a record with a @container_uuid@ field. Once @arvados-dispatch-cloud@ polls the API server for new containers to run, you should see it dispatch that same container.
+After performing a number of other quick tests, this will submit a new container request and wait for it to finish.
+
+While the diagnostics tool is waiting, the @arvados-dispatch-cloud@ logs will show details about creating a cloud instance, waiting for it to be ready, and scheduling the new container on it.
-The @arvados-dispatch-cloud@ API provides a list of queued and running jobs and cloud instances. Use your @ManagementToken@ to test the dispatcher's endpoint. For example, when one container is running:
+You can also use the "arvados-dispatch-cloud API":{{site.baseurl}}/api/dispatch.html to get a list of queued and running jobs and cloud instances. Use your @ManagementToken@ to test the dispatcher's endpoint. For example, when one container is running:
<notextile>
<pre><code>~$ <span class="userinput">curl -sH "Authorization: Bearer $token" http://localhost:9006/arvados/v1/dispatch/containers</span>
A similar request can be made to the @http://localhost:9006/arvados/v1/dispatch/instances@ endpoint.
-When the container finishes, the dispatcher will log it.
-
After the container finishes, you can get the container record by UUID *from a shell server* to see its results:
<notextile>
{% include 'start_service' %}
{% include 'restart_api' %}
+
+h2(#confirm-working). Confirm working installation
+
+On the dispatch node, start monitoring the arvados-dispatch-lsf logs:
+
+<notextile>
+<pre><code># <span class="userinput">journalctl -o cat -fu arvados-dispatch-lsf.service</span>
+</code></pre>
+</notextile>
+
+In another terminal window, use the diagnostics tool to run a simple container.
+
+<notextile>
+<pre><code># <span class="userinput">arvados-client sudo diagnostics</span>
+INFO 5: running health check (same as `arvados-server check`)
+INFO 10: getting discovery document from https://zzzzz.arvadosapi.com/discovery/v1/apis/arvados/v1/rest
+...
+INFO 160: running a container
+INFO ... container request submitted, waiting up to 10m for container to run
+</code></pre>
+</notextile>
+
+After performing a number of other quick tests, this will submit a new container request and wait for it to finish.
+
+While the diagnostics tool is waiting, the @arvados-dispatch-lsf@ logs will show details about submitting an LSF job to run the container.
On the dispatch node, start monitoring the crunch-dispatch-slurm logs:
<notextile>
-<pre><code>~$ <span class="userinput">sudo journalctl -o cat -fu crunch-dispatch-slurm.service</span>
+<pre><code># <span class="userinput">journalctl -o cat -fu crunch-dispatch-slurm.service</span>
</code></pre>
</notextile>
-Submit a simple container request:
+In another terminal window, use the diagnostics tool to run a simple container.
<notextile>
-<pre><code>shell:~$ <span class="userinput">arv container_request create --container-request '{
- "name": "test",
- "state": "Committed",
- "priority": 1,
- "container_image": "arvados/jobs:latest",
- "command": ["echo", "Hello, Crunch!"],
- "output_path": "/out",
- "mounts": {
- "/out": {
- "kind": "tmp",
- "capacity": 1000
- }
- },
- "runtime_constraints": {
- "vcpus": 1,
- "ram": 8388608
- }
-}'</span>
+<pre><code># <span class="userinput">arvados-client sudo diagnostics</span>
+INFO 5: running health check (same as `arvados-server check`)
+INFO 10: getting discovery document from https://zzzzz.arvadosapi.com/discovery/v1/apis/arvados/v1/rest
+...
+INFO 160: running a container
+INFO ... container request submitted, waiting up to 10m for container to run
</code></pre>
</notextile>
-This command should return a record with a @container_uuid@ field. Once @crunch-dispatch-slurm@ polls the API server for new containers to run, you should see it dispatch that same container. It will log messages like:
+Once @crunch-dispatch-slurm@ polls the API server for new containers to run, you should see it dispatch the new container. It will log messages like:
<notextile>
<pre><code>2016/08/05 13:52:54 Monitoring container zzzzz-dz642-hdp2vpu9nq14tx0 started
* "R SDK":{{site.baseurl}}/sdk/R/index.html
* "Ruby SDK":{{site.baseurl}}/sdk/ruby/index.html
* "Java SDK v2":{{site.baseurl}}/sdk/java-v2/index.html
-* "Perl SDK":{{site.baseurl}}/sdk/perl/index.html
Many Arvados Workbench pages, under the *Advanced* tab, provide examples of API and SDK use for accessing the current resource .
+++ /dev/null
----
-layout: default
-navsection: sdk
-navmenu: Perl
-title: "Examples"
-...
-{% comment %}
-Copyright (C) The Arvados Authors. All rights reserved.
-
-SPDX-License-Identifier: CC-BY-SA-3.0
-{% endcomment %}
-
-h2. Initialize SDK
-
-Set up an API client user agent:
-
-{% codeblock as perl %}
-use Arvados;
-my $arv = Arvados->new('apiVersion' => 'v1');
-{% endcodeblock %}
-
-The SDK retrieves the list of API methods from the server at run time. Therefore, the set of available methods is determined by the server version rather than the SDK version.
-
-h2. create
-
-Create an object:
-
-{% codeblock as perl %}
-my $test_link = $arv->{'links'}->{'create'}->execute('link' => { 'link_class' => 'test', 'name' => 'test' });
-{% endcodeblock %}
-
-h2. delete
-
-{% codeblock as perl %}
-my $some_user = $arv->{'collections'}->{'get'}->execute('uuid' => $collection_uuid);
-{% endcodeblock %}
-
-h2. get
-
-Retrieve an object by ID:
-
-{% codeblock as perl %}
-my $some_user = $arv->{'users'}->{'get'}->execute('uuid' => $current_user_uuid);
-{% endcodeblock %}
-
-Get the UUID of an object that was retrieved using the SDK:
-
-{% codeblock as perl %}
-my $current_user_uuid = $current_user->{'uuid'}
-{% endcodeblock %}
-
-h2. list
-
-Get a list of objects:
-
-{% codeblock as perl %}
-my $repos = $arv->{'repositories'}->{'list'}->execute;
-print ("UUID of first repo returned is ", $repos->{'items'}->[0], "\n");
-{% endcodeblock %}
-
-h2. update
-
-Update an object:
-
-{% codeblock as perl %}
-my $test_link = $arv->{'links'}->{'update'}->execute(
- 'uuid' => $test_link->{'uuid'},
- 'link' => { 'properties' => { 'foo' => 'bar' } });
-{% endcodeblock %}
-
-h2. Get current user
-
-Get the User object for the current user:
-
-{% codeblock as perl %}
-my $current_user = $arv->{'users'}->{'current'}->execute;
-{% endcodeblock %}
+++ /dev/null
----
-layout: default
-navsection: sdk
-navmenu: Perl
-title: "Installation"
-...
-{% comment %}
-Copyright (C) The Arvados Authors. All rights reserved.
-
-SPDX-License-Identifier: CC-BY-SA-3.0
-{% endcomment %}
-
-The Perl SDK provides a generic set of wrappers so you can make API calls easily.
-
-This is a legacy SDK. It is no longer used or maintained regularly.
-
-h3. Installation
-
-h4. Option 1: Install from distribution packages
-
-First, "add the appropriate package repository for your distribution":{{ site.baseurl }}/install/install-manual-prerequisites.html#repos.
-
-On Debian-based systems:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install libjson-perl libio-socket-ssl-perl libwww-perl libipc-system-simple-perl libarvados-perl</code>
-</code></pre>
-</notextile>
-
-On Red Hat-based systems:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo yum install perl-ExtUtils-MakeMaker perl-JSON perl-IO-Socket-SSL perl-Crypt-SSLeay perl-WWW-Curl libarvados-perl</code>
-</code></pre>
-</notextile>
-
-h4. Option 2: Install from source
-
-First, install dependencies from your distribution. Refer to the package lists above, but don't install @libarvados-perl@.
-
-Then run the following:
-
-<notextile>
-<pre><code>~$ <span class="userinput">git clone https://github.com/arvados/arvados.git</span>
-~$ <span class="userinput">cd arvados/sdk/perl</span>
-~$ <span class="userinput">perl Makefile.PL</span>
-~$ <span class="userinput">sudo make install</span>
-</code></pre>
-</notextile>
-
-h3. Test installation
-
-If the SDK is installed, @perl -MArvados -e ''@ should produce no errors.
-
-If your @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ environment variables are set up correctly (see "api-tokens":{{site.baseurl}}/user/reference/api-tokens.html for details), the following test script should work:
-
-<notextile>
-<pre>~$ <code class="userinput">perl <<'EOF'
-use Arvados;
-my $arv = Arvados->new('apiVersion' => 'v1');
-my $me = $arv->{'users'}->{'current'}->execute;
-print ("arvados.v1.users.current.full_name = '", $me->{'full_name'}, "'\n");
-EOF</code>
-arvados.v1.users.current.full_name = 'Your Name'
-</pre>
-</notextile>
# by going through login again.
IssueTrustedTokens: true
- # When the token is returned to a client, the token itself may
- # be restricted from viewing/creating other tokens based on whether
- # the client is "trusted" or not. The local Workbench1 and
- # Workbench2 are trusted by default, but if this is a
- # LoginCluster, you probably want to include the other Workbench
- # instances in the federation in this list.
+ # Origins (scheme://host[:port]) of clients trusted to receive
+ # new tokens via login process. The ExternalURLs of the local
+ # Workbench1 and Workbench2 are trusted implicitly and do not
+ # need to be listed here. If this is a LoginCluster, you
+ # probably want to include the other Workbench instances in the
+ # federation in this list.
+ #
+ # Example:
+ #
+ # TrustedClients:
+ # "https://workbench.other-cluster.example": {}
+ # "https://workbench2.other-cluster.example": {}
TrustedClients:
- SAMPLE:
- "https://workbench.federate1.example": {}
- "https://workbench.federate2.example": {}
+ SAMPLE: {}
+
+ # Treat any origin whose host part is "localhost" or a private
+ # IP address (e.g., http://10.0.0.123:3000/) as if it were
+ # listed in TrustedClients.
+ #
+ # Intended only for test/development use. Not appropriate for
+ # production use.
+ TrustPrivateNetworks: false
Git:
# Path to git or gitolite-shell executable. Each authenticated
# Extra RAM to reserve on the node, in addition to
# the amount specified in the container's RuntimeConstraints
- ReserveExtraRAM: 256MiB
+ ReserveExtraRAM: 550MiB
# Minimum time between two attempts to run the same container
MinRetryPeriod: 0s
LocalKeepLogsToContainerLog: none
Logging:
- # When you run the db:delete_old_container_logs task, it will find
- # containers that have been finished for at least this many seconds,
+ # Periodically (see SweepInterval) Arvados will check for
+ # containers that have been finished for at least this long,
# and delete their stdout, stderr, arv-mount, crunch-run, and
# crunchstat logs from the logs table.
MaxAge: 720h
+ # How often to delete cached log entries for finished
+ # containers (see MaxAge).
+ SweepInterval: 12h
+
# These two settings control how frequently log events are flushed to the
# database. Log lines are buffered until either crunch_log_bytes_per_event
# has been reached or crunch_log_seconds_between_events has elapsed since
RaceWindow: 24h
PrefixLength: 0
# Use aws-s3-go (v2) instead of goamz
- UseAWSS3v2Driver: false
+ UseAWSS3v2Driver: true
# For S3 driver, potentially unsafe tuning parameter,
# intentionally excluded from main documentation.
"Login.Test.Users": false,
"Login.TokenLifetime": false,
"Login.TrustedClients": false,
+ "Login.TrustPrivateNetworks": false,
"Mail": true,
"Mail.EmailFrom": false,
"Mail.IssueReporterEmailFrom": false,
"Workbench.ApplicationMimetypesWithViewIcon.*": true,
"Workbench.ArvadosDocsite": true,
"Workbench.ArvadosPublicDataDocURL": true,
+ "Workbench.BannerURL": true,
"Workbench.DefaultOpenIdPrefix": false,
"Workbench.DisableSharingURLsUI": true,
"Workbench.EnableGettingStartedPopup": true,
"Workbench.UserProfileFormFields.*.*.*": true,
"Workbench.UserProfileFormMessage": true,
"Workbench.WelcomePageHTML": true,
- "Workbench.BannerURL": true,
}
func redactUnsafe(m map[string]interface{}, mPrefix, lookupPrefix string) error {
import (
"context"
"database/sql"
+ "fmt"
+ "net"
"sync"
"time"
)
var (
- TrashSweep = &DBLocker{key: 10001}
- retryDelay = 5 * time.Second
+ TrashSweep = &DBLocker{key: 10001}
+ ContainerLogSweep = &DBLocker{key: 10002}
+ KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
+ KeepBalanceActive = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+ Dispatch = &DBLocker{key: 10005} // any dispatcher running
+ retryDelay = 5 * time.Second
)
// DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
}
// Lock acquires the advisory lock, waiting/reconnecting if needed.
-func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
- logger := ctxlog.FromContext(ctx)
+//
+// Returns false if ctx is canceled before the lock is acquired.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
+ logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
+ var lastHeldBy string
for ; ; time.Sleep(retryDelay) {
dbl.mtx.Lock()
if dbl.conn != nil {
dbl.mtx.Unlock()
continue
}
+ if ctx.Err() != nil {
+ dbl.mtx.Unlock()
+ return false
+ }
db, err := getdb(ctx)
- if err != nil {
- logger.WithError(err).Infof("error getting database pool")
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err != nil {
+ logger.WithError(err).Info("error getting database pool")
dbl.mtx.Unlock()
continue
}
conn, err := db.Conn(ctx)
- if err != nil {
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err != nil {
logger.WithError(err).Info("error getting database connection")
dbl.mtx.Unlock()
continue
}
var locked bool
err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
- if err != nil {
- logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
+ if err == context.Canceled {
+ return false
+ } else if err != nil {
+ logger.WithError(err).Info("error getting pg_try_advisory_lock")
conn.Close()
dbl.mtx.Unlock()
continue
}
if !locked {
+ var host string
+ var port int
+ err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
+ (SELECT pid FROM pg_locks
+ WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
+ if err != nil {
+ logger.WithError(err).Info("error getting other client info")
+ } else {
+ heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
+ if lastHeldBy != heldBy {
+ logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
+ lastHeldBy = heldBy
+ }
+ }
conn.Close()
dbl.mtx.Unlock()
continue
}
- logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+ logger.Debug("acquired pg_advisory_lock")
dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
dbl.mtx.Unlock()
- return
+ return true
}
}
// Check confirms that the lock is still active (i.e., the session is
// still alive), and re-acquires if needed. Panics if Lock is not
// acquired first.
-func (dbl *DBLocker) Check() {
+//
+// Returns false if the context passed to Lock() is canceled before
+// the lock is confirmed or reacquired.
+func (dbl *DBLocker) Check() bool {
dbl.mtx.Lock()
err := dbl.conn.PingContext(dbl.ctx)
- if err == nil {
- ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err == nil {
+ ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
dbl.mtx.Unlock()
- return
+ return true
}
ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
dbl.conn.Close()
dbl.conn = nil
ctx, getdb := dbl.ctx, dbl.getdb
dbl.mtx.Unlock()
- dbl.Lock(ctx, getdb)
+ return dbl.Lock(ctx, getdb)
}
func (dbl *DBLocker) Unlock() {
if dbl.conn != nil {
_, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
if err != nil {
- ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+ ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
} else {
- ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+ ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
}
dbl.conn.Close()
dbl.conn = nil
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dblock
+
+import (
+ "bytes"
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
+ "github.com/sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&suite{})
+
+type suite struct {
+ cluster *arvados.Cluster
+ db *sqlx.DB
+ getdb func(context.Context) (*sqlx.DB, error)
+}
+
+var testLocker = &DBLocker{key: 999}
+
+func (s *suite) SetUpSuite(c *check.C) {
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.IsNil)
+ s.cluster, err = cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ s.db = arvadostest.DB(c, s.cluster)
+ s.getdb = func(context.Context) (*sqlx.DB, error) { return s.db, nil }
+}
+
+func (s *suite) TestLock(c *check.C) {
+ retryDelay = 10 * time.Millisecond
+
+ var logbuf bytes.Buffer
+ logger := ctxlog.New(&logbuf, "text", "debug")
+ logger.Level = logrus.DebugLevel
+ ctx := ctxlog.Context(context.Background(), logger)
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ testLocker.Lock(ctx, s.getdb)
+ testLocker.Check()
+
+ lock2 := make(chan bool)
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ testLocker2 := &DBLocker{key: 999}
+ testLocker2.Lock(ctx, s.getdb)
+ close(lock2)
+ testLocker2.Check()
+ testLocker2.Unlock()
+ }()
+
+ // Second lock should wait for first to Unlock
+ select {
+ case <-time.After(time.Second / 10):
+ c.Check(logbuf.String(), check.Matches, `(?ms).*level=info.*DBClient="[^"]+:\d+".*ID=999.*`)
+ case <-lock2:
+ c.Log("double-lock")
+ c.Fail()
+ }
+
+ testLocker.Check()
+ testLocker.Unlock()
+
+ // Now the second lock should succeed within retryDelay
+ select {
+ case <-time.After(retryDelay * 2):
+ c.Log("timed out")
+ c.Fail()
+ case <-lock2:
+ }
+ c.Logf("%s", logbuf.String())
+}
// non-nil, true, nil -- if the token is valid
func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, bool, error) {
user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
- db, err := h.db(req.Context())
+ db, err := h.dbConnector.GetDB(req.Context())
if err != nil {
ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): database error", token)
return nil, false, err
}
func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
- db, err := h.db(req.Context())
+ db, err := h.dbConnector.GetDB(req.Context())
if err != nil {
return nil, err
}
return conn.chooseBackend(options.UUID).LinkDelete(ctx, options)
}
+func (conn *Conn) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.ClusterID).LogCreate(ctx, options)
+}
+
+func (conn *Conn) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.UUID).LogUpdate(ctx, options)
+}
+
+func (conn *Conn) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.UUID).LogGet(ctx, options)
+}
+
+func (conn *Conn) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ return conn.generated_LogList(ctx, options)
+}
+
+func (conn *Conn) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.UUID).LogDelete(ctx, options)
+}
+
func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
return conn.generated_SpecimenList(ctx, options)
}
defer out.Close()
out.Write(regexp.MustCompile(`(?ms)^.*package .*?import.*?\n\)\n`).Find(buf))
io.WriteString(out, "//\n// -- this file is auto-generated -- do not edit -- edit list.go and run \"go generate\" instead --\n//\n\n")
- for _, t := range []string{"Container", "ContainerRequest", "Group", "Specimen", "User", "Link", "APIClientAuthorization"} {
+ for _, t := range []string{"Container", "ContainerRequest", "Group", "Specimen", "User", "Link", "Log", "APIClientAuthorization"} {
_, err := out.Write(bytes.ReplaceAll(orig, []byte("Collection"), []byte(t)))
if err != nil {
panic(err)
return merged, err
}
+func (conn *Conn) generated_LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ var mtx sync.Mutex
+ var merged arvados.LogList
+ var needSort atomic.Value
+ needSort.Store(false)
+ err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
+ options.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor
+ cl, err := backend.LogList(ctx, options)
+ if err != nil {
+ return nil, err
+ }
+ mtx.Lock()
+ defer mtx.Unlock()
+ if len(merged.Items) == 0 {
+ merged = cl
+ } else if len(cl.Items) > 0 {
+ merged.Items = append(merged.Items, cl.Items...)
+ needSort.Store(true)
+ }
+ uuids := make([]string, 0, len(cl.Items))
+ for _, item := range cl.Items {
+ uuids = append(uuids, item.UUID)
+ }
+ return uuids, nil
+ })
+ if needSort.Load().(bool) {
+ // Apply the default/implied order, "modified_at desc"
+ sort.Slice(merged.Items, func(i, j int) bool {
+ mi, mj := merged.Items[i].ModifiedAt, merged.Items[j].ModifiedAt
+ return mj.Before(mi)
+ })
+ }
+ if merged.Items == nil {
+ // Return empty results as [], not null
+ // (https://github.com/golang/go/issues/27589 might be
+ // a better solution in the future)
+ merged.Items = []arvados.Log{}
+ }
+ return merged, err
+}
+
func (conn *Conn) generated_APIClientAuthorizationList(ctx context.Context, options arvados.ListOptions) (arvados.APIClientAuthorizationList, error) {
var mtx sync.Mutex
var merged arvados.APIClientAuthorizationList
// Call fn on one or more local/remote backends if opts indicates a
// federation-wide list query, i.e.:
//
-// * There is at least one filter of the form
-// ["uuid","in",[a,b,c,...]] or ["uuid","=",a]
+// - There is at least one filter of the form
+// ["uuid","in",[a,b,c,...]] or ["uuid","=",a]
//
-// * One or more of the supplied UUIDs (a,b,c,...) has a non-local
-// prefix.
+// - One or more of the supplied UUIDs (a,b,c,...) has a non-local
+// prefix.
//
-// * There are no other filters
+// - There are no other filters
//
// (If opts doesn't indicate a federation-wide list query, fn is just
// called once with the local backend.)
// fn is called more than once only if the query meets the following
// restrictions:
//
-// * Count=="none"
+// - Count=="none"
//
-// * Limit<0
+// - Limit<0
//
-// * len(Order)==0
+// - len(Order)==0
//
-// * Each filter is either "uuid = ..." or "uuid in [...]".
+// - Each filter is either "uuid = ..." or "uuid in [...]".
//
-// * The maximum possible response size (total number of objects that
-// could potentially be matched by all of the specified filters)
-// exceeds the local cluster's response page size limit.
+// - The maximum possible response size (total number of objects
+// that could potentially be matched by all of the specified
+// filters) exceeds the local cluster's response page size limit.
//
// If the query involves multiple backends but doesn't meet these
// restrictions, an error is returned without calling fn.
//
// Thus, the caller can assume that either:
//
-// * splitListRequest() returns an error, or
+// - splitListRequest() returns an error, or
//
-// * fn is called exactly once, or
+// - fn is called exactly once, or
//
-// * fn is called more than once, with options that satisfy the above
-// restrictions.
+// - fn is called more than once, with options that satisfy the above
+// restrictions.
//
// Each call to fn indicates a single (local or remote) backend and a
// corresponding options argument suitable for sending to that
}
func (s *LoginSuite) TestLogout(c *check.C) {
+ otherOrigin := arvados.URL{Scheme: "https", Host: "app.example.com", Path: "/"}
+ otherURL := "https://app.example.com/foo"
s.cluster.Services.Workbench1.ExternalURL = arvados.URL{Scheme: "https", Host: "workbench1.example.com"}
s.cluster.Services.Workbench2.ExternalURL = arvados.URL{Scheme: "https", Host: "workbench2.example.com"}
+ s.cluster.Login.TrustedClients = map[arvados.URL]struct{}{otherOrigin: {}}
s.addHTTPRemote(c, "zhome", &arvadostest.APIStub{})
s.cluster.Login.LoginCluster = "zhome"
// s.fed is already set by SetUpTest, but we need to
// reinitialize with the above config changes.
s.fed = New(s.cluster, nil)
- returnTo := "https://app.example.com/foo?bar"
for _, trial := range []struct {
token string
returnTo string
target string
}{
{token: "", returnTo: "", target: s.cluster.Services.Workbench2.ExternalURL.String()},
- {token: "", returnTo: returnTo, target: returnTo},
- {token: "zzzzzzzzzzzzzzzzzzzzz", returnTo: returnTo, target: returnTo},
- {token: "v2/zzzzz-aaaaa-aaaaaaaaaaaaaaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", returnTo: returnTo, target: returnTo},
- {token: "v2/zhome-aaaaa-aaaaaaaaaaaaaaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", returnTo: returnTo, target: "http://" + s.cluster.RemoteClusters["zhome"].Host + "/logout?" + url.Values{"return_to": {returnTo}}.Encode()},
+ {token: "", returnTo: otherURL, target: otherURL},
+ {token: "zzzzzzzzzzzzzzzzzzzzz", returnTo: otherURL, target: otherURL},
+ {token: "v2/zzzzz-aaaaa-aaaaaaaaaaaaaaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", returnTo: otherURL, target: otherURL},
+ {token: "v2/zhome-aaaaa-aaaaaaaaaaaaaaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", returnTo: otherURL, target: "http://" + s.cluster.RemoteClusters["zhome"].Host + "/logout?" + url.Values{"return_to": {otherURL}}.Encode()},
} {
c.Logf("trial %#v", trial)
ctx := s.ctx
import (
"context"
- "errors"
"fmt"
"net/http"
"net/http/httptest"
"git.arvados.org/arvados.git/lib/controller/router"
"git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
"git.arvados.org/arvados.git/sdk/go/httpserver"
- "github.com/jmoiron/sqlx"
// sqlx needs lib/pq to talk to PostgreSQL
_ "github.com/lib/pq"
proxy *proxy
secureClient *http.Client
insecureClient *http.Client
- pgdb *sqlx.DB
- pgdbMtx sync.Mutex
+ dbConnector ctrlctx.DBConnector
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (h *Handler) CheckHealth() error {
h.setupOnce.Do(h.setup)
- _, err := h.db(context.TODO())
+ _, err := h.dbConnector.GetDB(context.TODO())
if err != nil {
return err
}
mux := http.NewServeMux()
healthFuncs := make(map[string]health.Func)
- oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
+ h.dbConnector = ctrlctx.DBConnector{PostgreSQL: h.Cluster.PostgreSQL}
+ oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.dbConnector.GetDB)
h.federation = federation.New(h.Cluster, &healthFuncs)
rtr := router.New(h.federation, router.Config{
MaxRequestSize: h.Cluster.API.MaxRequestSize,
WrapCalls: api.ComposeWrappers(
- ctrlctx.WrapCallsInTransactions(h.db),
+ ctrlctx.WrapCallsInTransactions(h.dbConnector.GetDB),
oidcAuthorizer.WrapCalls,
ctrlctx.WrapCallsWithAuth(h.Cluster)),
})
- healthRoutes := health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }}
+ healthRoutes := health.Routes{"ping": func() error { _, err := h.dbConnector.GetDB(context.TODO()); return err }}
for name, f := range healthFuncs {
healthRoutes[name] = f
}
}
go h.trashSweepWorker()
-}
-
-var errDBConnection = errors.New("database connection error")
-
-func (h *Handler) db(ctx context.Context) (*sqlx.DB, error) {
- h.pgdbMtx.Lock()
- defer h.pgdbMtx.Unlock()
- if h.pgdb != nil {
- return h.pgdb, nil
- }
-
- db, err := sqlx.Open("postgres", h.Cluster.PostgreSQL.Connection.String())
- if err != nil {
- ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
- return nil, errDBConnection
- }
- if p := h.Cluster.PostgreSQL.ConnectionPool; p > 0 {
- db.SetMaxOpenConns(p)
- }
- if err := db.Ping(); err != nil {
- ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
- return nil, errDBConnection
- }
- h.pgdb = db
- return db, nil
+ go h.containerLogSweepWorker()
}
type middlewareFunc func(http.ResponseWriter, *http.Request, http.Handler)
}
func (s *HandlerSuite) TestLogoutGoogle(c *check.C) {
+ s.cluster.Services.Workbench2.ExternalURL = arvados.URL{Scheme: "https", Host: "wb2.example", Path: "/"}
s.cluster.Login.Google.Enable = true
s.cluster.Login.Google.ClientID = "test"
- req := httptest.NewRequest("GET", "https://0.0.0.0:1/logout?return_to=https://example.com/foo", nil)
+ req := httptest.NewRequest("GET", "https://0.0.0.0:1/logout?return_to=https://wb2.example/", nil)
resp := httptest.NewRecorder()
s.handler.ServeHTTP(resp, req)
if !c.Check(resp.Code, check.Equals, http.StatusFound) {
c.Log(resp.Body.String())
}
- c.Check(resp.Header().Get("Location"), check.Equals, "https://example.com/foo")
+ c.Check(resp.Header().Get("Location"), check.Equals, "https://wb2.example/")
}
func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
+ c.Assert(s.handler.CheckHealth(), check.IsNil)
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveToken)
c.Assert(err, check.IsNil)
}
func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) {
+ c.Assert(s.handler.CheckHealth(), check.IsNil)
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveTokenV2)
c.Assert(err, check.IsNil)
}
func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
+ c.Assert(s.handler.CheckHealth(), check.IsNil)
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil)
c.Assert(err, check.IsNil)
coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true})
c.Assert(err, check.IsNil)
defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID})
- db, err := s.handler.db(s.ctx)
+ db, err := s.handler.dbConnector.GetDB(s.ctx)
c.Assert(err, check.IsNil)
_, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID)
c.Assert(err, check.IsNil)
}
}
+func (s *HandlerSuite) TestContainerLogSweep(c *check.C) {
+ s.cluster.SystemRootToken = arvadostest.SystemRootToken
+ s.cluster.Containers.Logging.SweepInterval = arvados.Duration(time.Second / 10)
+ s.handler.CheckHealth()
+ ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+ logentry, err := s.handler.federation.LogCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+ "object_uuid": arvadostest.CompletedContainerUUID,
+ "event_type": "stderr",
+ "properties": map[string]interface{}{
+ "text": "test trash sweep\n",
+ },
+ }})
+ c.Assert(err, check.IsNil)
+ defer s.handler.federation.LogDelete(ctx, arvados.DeleteOptions{UUID: logentry.UUID})
+ deadline := time.Now().Add(5 * time.Second)
+ for {
+ if time.Now().After(deadline) {
+ c.Log("timed out")
+ c.FailNow()
+ }
+ logentries, err := s.handler.federation.LogList(ctx, arvados.ListOptions{Filters: []arvados.Filter{{"uuid", "=", logentry.UUID}}, Limit: -1})
+ c.Assert(err, check.IsNil)
+ if len(logentries.Items) == 0 {
+ break
+ }
+ time.Sleep(time.Second / 10)
+ }
+}
+
func (s *HandlerSuite) TestLogActivity(c *check.C) {
s.cluster.SystemRootToken = arvadostest.SystemRootToken
s.cluster.Users.ActivityLoggingPeriod = arvados.Duration(24 * time.Hour)
c.Assert(err, check.IsNil)
}
}
- db, err := s.handler.db(s.ctx)
+ db, err := s.handler.dbConnector.GetDB(s.ctx)
c.Assert(err, check.IsNil)
for _, userUUID := range []string{arvadostest.ActiveUserUUID, arvadostest.SpectatorUserUUID} {
var rows int
"context"
"encoding/json"
"fmt"
+ "net"
"net/http"
"os"
"sync"
return conn.loginController.UserAuthenticate(ctx, opts)
}
+var privateNetworks = func() (nets []*net.IPNet) {
+ for _, s := range []string{
+ "127.0.0.0/8",
+ "10.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "169.254.0.0/16",
+ "::1/128",
+ "fe80::/10",
+ "fc00::/7",
+ } {
+ _, n, err := net.ParseCIDR(s)
+ if err != nil {
+ panic(fmt.Sprintf("privateNetworks: %q: %s", s, err))
+ }
+ nets = append(nets, n)
+ }
+ return
+}()
+
func httpErrorf(code int, format string, args ...interface{}) error {
return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)
}
"encoding/json"
"errors"
"fmt"
+ "net"
"net/http"
"net/url"
"strings"
}
return
}
+
+func validateLoginRedirectTarget(cluster *arvados.Cluster, returnTo string) error {
+ u, err := url.Parse(returnTo)
+ if err != nil {
+ return err
+ }
+ u, err = u.Parse("/")
+ if err != nil {
+ return err
+ }
+ if u.Port() == "80" && u.Scheme == "http" {
+ u.Host = u.Hostname()
+ } else if u.Port() == "443" && u.Scheme == "https" {
+ u.Host = u.Hostname()
+ }
+ if _, ok := cluster.Login.TrustedClients[arvados.URL(*u)]; ok {
+ return nil
+ }
+ if u.String() == cluster.Services.Workbench1.ExternalURL.String() ||
+ u.String() == cluster.Services.Workbench2.ExternalURL.String() {
+ return nil
+ }
+ if cluster.Login.TrustPrivateNetworks {
+ if u.Hostname() == "localhost" {
+ return nil
+ }
+ if ip := net.ParseIP(u.Hostname()); len(ip) > 0 {
+ for _, n := range privateNetworks {
+ if n.Contains(ip) {
+ return nil
+ }
+ }
+ }
+ }
+ return fmt.Errorf("requesting site is not listed in TrustedClients config")
+}
if opts.ReturnTo == "" {
return loginError(errors.New("missing return_to parameter"))
}
+ if err := validateLoginRedirectTarget(ctrl.Parent.cluster, opts.ReturnTo); err != nil {
+ return loginError(fmt.Errorf("invalid return_to parameter: %s", err))
+ }
state := ctrl.newOAuth2State([]byte(ctrl.Cluster.SystemRootToken), opts.Remote, opts.ReturnTo)
var authparams []oauth2.AuthCodeOption
for k, v := range ctrl.AuthParams {
cluster *arvados.Cluster
localdb *Conn
railsSpy *arvadostest.Proxy
+ trustedURL *arvados.URL
fakeProvider *arvadostest.OIDCProvider
}
}
func (s *OIDCLoginSuite) SetUpTest(c *check.C) {
+ s.trustedURL = &arvados.URL{Scheme: "https", Host: "app.example.com", Path: "/"}
+
s.fakeProvider = arvadostest.NewOIDCProvider(c)
s.fakeProvider.AuthEmail = "active-user@arvados.local"
s.fakeProvider.AuthEmailVerified = true
s.cluster.Login.Google.Enable = true
s.cluster.Login.Google.ClientID = "test%client$id"
s.cluster.Login.Google.ClientSecret = "test#client/secret"
+ s.cluster.Login.TrustedClients = map[arvados.URL]struct{}{*s.trustedURL: {}}
s.cluster.Users.PreferDomainForUsername = "PreferDomainForUsername.example.com"
s.fakeProvider.ValidClientID = "test%client$id"
s.fakeProvider.ValidClientSecret = "test#client/secret"
}
func (s *OIDCLoginSuite) TestGoogleLogout(c *check.C) {
+ s.cluster.Login.TrustedClients[arvados.URL{Scheme: "https", Host: "foo.example", Path: "/"}] = struct{}{}
+ s.cluster.Login.TrustPrivateNetworks = false
+
resp, err := s.localdb.Logout(context.Background(), arvados.LogoutOptions{ReturnTo: "https://foo.example.com/bar"})
+ c.Check(err, check.NotNil)
+ c.Check(resp.RedirectLocation, check.Equals, "")
+
+ resp, err = s.localdb.Logout(context.Background(), arvados.LogoutOptions{ReturnTo: "https://127.0.0.1/bar"})
+ c.Check(err, check.NotNil)
+ c.Check(resp.RedirectLocation, check.Equals, "")
+
+ resp, err = s.localdb.Logout(context.Background(), arvados.LogoutOptions{ReturnTo: "https://foo.example/bar"})
+ c.Check(err, check.IsNil)
+ c.Check(resp.RedirectLocation, check.Equals, "https://foo.example/bar")
+
+ s.cluster.Login.TrustPrivateNetworks = true
+
+ resp, err = s.localdb.Logout(context.Background(), arvados.LogoutOptions{ReturnTo: "https://192.168.1.1/bar"})
c.Check(err, check.IsNil)
- c.Check(resp.RedirectLocation, check.Equals, "https://foo.example.com/bar")
+ c.Check(resp.RedirectLocation, check.Equals, "https://192.168.1.1/bar")
}
func (s *OIDCLoginSuite) TestGoogleLogin_Start_Bogus(c *check.C) {
}
}
+func (s *OIDCLoginSuite) TestGoogleLogin_UnknownClient(c *check.C) {
+ resp, err := s.localdb.Login(context.Background(), arvados.LoginOptions{ReturnTo: "https://bad-app.example.com/foo?bar"})
+ c.Check(err, check.IsNil)
+ c.Check(resp.RedirectLocation, check.Equals, "")
+ c.Check(resp.HTML.String(), check.Matches, `(?ms).*requesting site is not listed in TrustedClients.*`)
+}
+
func (s *OIDCLoginSuite) TestGoogleLogin_InvalidCode(c *check.C) {
state := s.startLogin(c)
resp, err := s.localdb.Login(context.Background(), arvados.LoginOptions{
// the provider, just grab state from the redirect URL.
resp, err := s.localdb.Login(context.Background(), arvados.LoginOptions{ReturnTo: "https://app.example.com/foo?bar"})
c.Check(err, check.IsNil)
+ c.Check(resp.HTML.String(), check.Not(check.Matches), `(?ms).*error:.*`)
target, err := url.Parse(resp.RedirectLocation)
c.Check(err, check.IsNil)
state = target.Query().Get("state")
- c.Check(state, check.Not(check.Equals), "")
+ if !c.Check(state, check.Not(check.Equals), "") {
+ c.Logf("Redirect target: %q", target)
+ c.Logf("HTML: %q", resp.HTML)
+ }
for _, fn := range checks {
fn(target.Query())
}
return
}
+func (s *OIDCLoginSuite) TestValidateLoginRedirectTarget(c *check.C) {
+ for _, trial := range []struct {
+ permit bool
+ trustPrivate bool
+ url string
+ }{
+ // wb1, wb2 => accept
+ {true, false, s.cluster.Services.Workbench1.ExternalURL.String()},
+ {true, false, s.cluster.Services.Workbench2.ExternalURL.String()},
+ // explicitly listed host => accept
+ {true, false, "https://app.example.com/"},
+ {true, false, "https://app.example.com:443/foo?bar=baz"},
+ // non-listed hostname => deny (regardless of TrustPrivateNetworks)
+ {false, false, "https://bad.example/"},
+ {false, true, "https://bad.example/"},
+ // non-listed non-private IP addr => deny (regardless of TrustPrivateNetworks)
+ {false, true, "https://1.2.3.4/"},
+ {false, true, "https://1.2.3.4/"},
+ {false, true, "https://[ab::cd]:1234/"},
+ // localhost or non-listed private IP addr => accept only if TrustPrivateNetworks is set
+ {false, false, "https://localhost/"},
+ {true, true, "https://localhost/"},
+ {false, false, "https://[10.9.8.7]:80/foo"},
+ {true, true, "https://[10.9.8.7]:80/foo"},
+ {false, false, "https://[::1]:80/foo"},
+ {true, true, "https://[::1]:80/foo"},
+ {true, true, "http://192.168.1.1/"},
+ {true, true, "http://172.17.2.0/"},
+ // bad url => deny
+ {false, true, "https://10.1.1.1:blorp/foo"}, // non-numeric port
+ {false, true, "https://app.example.com:blorp/foo"}, // non-numeric port
+ {false, true, "https://]:443"},
+ {false, true, "https://"},
+ {false, true, "https:"},
+ {false, true, ""},
+ // explicitly listed host but different port, protocol, or user/pass => deny
+ {false, true, "http://app.example.com/"},
+ {false, true, "http://app.example.com:443/"},
+ {false, true, "https://app.example.com:80/"},
+ {false, true, "https://app.example.com:4433/"},
+ {false, true, "https://u:p@app.example.com:443/foo?bar=baz"},
+ } {
+ c.Logf("trial %+v", trial)
+ s.cluster.Login.TrustPrivateNetworks = trial.trustPrivate
+ err := validateLoginRedirectTarget(s.cluster, trial.url)
+ c.Check(err == nil, check.Equals, trial.permit)
+ }
+
+}
+
func getCallbackAuthInfo(c *check.C, railsSpy *arvadostest.Proxy) (authinfo rpc.UserSessionAuthInfo) {
for _, dump := range railsSpy.RequestDumps {
c.Logf("spied request: %q", dump)
}
func (s *TestUserSuite) TestExpireTokenOnLogout(c *check.C) {
- returnTo := "https://localhost:12345/logout"
+ s.cluster.Login.TrustPrivateNetworks = true
+ returnTo := "https://[::1]:12345/logout"
for _, trial := range []struct {
requestToken string
expiringTokenUUID string
} else {
target = cluster.Services.Workbench1.ExternalURL.String()
}
+ } else if err := validateLoginRedirectTarget(cluster, target); err != nil {
+ return arvados.LogoutResponse{}, httpserver.ErrorWithStatus(fmt.Errorf("invalid return_to parameter: %s", err), http.StatusBadRequest)
}
return arvados.LogoutResponse{RedirectLocation: target}, nil
}
return rtr.backend.LinkDelete(ctx, *opts.(*arvados.DeleteOptions))
},
},
+ {
+ arvados.EndpointLogCreate,
+ func() interface{} { return &arvados.CreateOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogCreate(ctx, *opts.(*arvados.CreateOptions))
+ },
+ },
+ {
+ arvados.EndpointLogUpdate,
+ func() interface{} { return &arvados.UpdateOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogUpdate(ctx, *opts.(*arvados.UpdateOptions))
+ },
+ },
+ {
+ arvados.EndpointLogList,
+ func() interface{} { return &arvados.ListOptions{Limit: -1} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogList(ctx, *opts.(*arvados.ListOptions))
+ },
+ },
+ {
+ arvados.EndpointLogGet,
+ func() interface{} { return &arvados.GetOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogGet(ctx, *opts.(*arvados.GetOptions))
+ },
+ },
+ {
+ arvados.EndpointLogDelete,
+ func() interface{} { return &arvados.DeleteOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogDelete(ctx, *opts.(*arvados.DeleteOptions))
+ },
+ },
{
arvados.EndpointSpecimenCreate,
func() interface{} { return &arvados.CreateOptions{} },
return resp, err
}
+func (conn *Conn) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogCreate
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogUpdate
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogGet
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ ep := arvados.EndpointLogList
+ var resp arvados.LogList
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogDelete
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
func (conn *Conn) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
ep := arvados.EndpointSpecimenCreate
var resp arvados.Specimen
package controller
import (
+ "context"
"time"
"git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
)
-func (h *Handler) trashSweepWorker() {
- sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
- logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+func (h *Handler) periodicWorker(workerName string, interval time.Duration, locker *dblock.DBLocker, run func(context.Context) error) {
+ logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", workerName)
ctx := ctxlog.Context(h.BackgroundContext, logger)
- if sleep <= 0 {
- logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+ if interval <= 0 {
+ logger.Debugf("interval is %v, not running worker", interval)
return
}
- dblock.TrashSweep.Lock(ctx, h.db)
- defer dblock.TrashSweep.Unlock()
- for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
- dblock.TrashSweep.Check()
- ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
- _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ if !locker.Lock(ctx, h.dbConnector.GetDB) {
+ // context canceled
+ return
+ }
+ defer locker.Unlock()
+ for time.Sleep(interval); ctx.Err() == nil; time.Sleep(interval) {
+ if !locker.Check() {
+ // context canceled
+ return
+ }
+ err := run(ctx)
if err != nil {
- logger.WithError(err).Info("trash sweep failed")
+ logger.WithError(err).Infof("%s failed", workerName)
}
}
}
+
+func (h *Handler) trashSweepWorker() {
+ h.periodicWorker("trash sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.TrashSweep, func(ctx context.Context) error {
+ ctx = auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+ _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ return err
+ })
+}
+
+func (h *Handler) containerLogSweepWorker() {
+ h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
+ db, err := h.dbConnector.GetDB(ctx)
+ if err != nil {
+ return err
+ }
+ res, err := db.ExecContext(ctx, `
+DELETE FROM logs
+ USING containers
+ WHERE logs.object_uuid=containers.uuid
+ AND logs.event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat', 'hoststat', 'node', 'container', 'keepstore')
+ AND containers.log IS NOT NULL
+ AND now() - containers.finished_at > $1::interval`,
+ h.Cluster.Containers.Logging.MaxAge.String())
+ if err != nil {
+ return err
+ }
+ logger := ctxlog.FromContext(ctx)
+ rows, err := res.RowsAffected()
+ if err != nil {
+ logger.WithError(err).Warn("unexpected error from RowsAffected()")
+ } else {
+ logger.WithField("rows", rows).Info("deleted rows from logs table")
+ }
+ return nil
+ })
+}
parentTemp string
costStartTime time.Time
+ keepstore *exec.Cmd
keepstoreLogger io.WriteCloser
keepstoreLogbuf *bufThenWrite
statLogger io.WriteCloser
if err != nil {
return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
}
+ if runner.hoststatReporter != nil && runner.ArvMount != nil {
+ runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+ }
for _, p := range collectionPaths {
_, err = os.Stat(p)
PollPeriod: runner.statInterval,
}
runner.hoststatReporter.Start()
+ runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
return nil
}
if err != nil {
return
}
+ if runner.keepstore != nil {
+ runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+ }
// set up FUSE mount and binds
bindmounts, err = runner.SetupMounts()
return 1
}
+ cr.keepstore = keepstore
if keepstore == nil {
// Log explanation (if any) for why we're not running
// a local keepstore.
"fmt"
"io"
"io/ioutil"
- "log"
"os"
+ "regexp"
+ "sort"
"strconv"
"strings"
+ "sync"
"syscall"
"time"
)
TempDir string
// Where to write statistics. Must not be nil.
- Logger *log.Logger
+ Logger interface {
+ Printf(fmt string, args ...interface{})
+ }
+ kernelPageSize int64
reportedStatFile map[string]string
lastNetSample map[string]ioSample
lastDiskIOSample map[string]ioSample
lastCPUSample cpuSample
lastDiskSpaceSample diskSpaceSample
+ reportPIDs map[string]int
+ reportPIDsMu sync.Mutex
+
done chan struct{} // closed when we should stop reporting
flushed chan struct{} // closed when we have made our last report
}
go r.run()
}
+// ReportPID starts reporting stats for a specified process.
+func (r *Reporter) ReportPID(name string, pid int) {
+ r.reportPIDsMu.Lock()
+ defer r.reportPIDsMu.Unlock()
+ if r.reportPIDs == nil {
+ r.reportPIDs = map[string]int{name: pid}
+ } else {
+ r.reportPIDs[name] = pid
+ }
+}
+
// Stop reporting. Do not call more than once, or before calling
// Start.
//
}
}
r.Logger.Printf("mem%s\n", outstat.String())
+
+ if r.kernelPageSize == 0 {
+ // assign "don't try again" value in case we give up
+ // and return without assigning the real value
+ r.kernelPageSize = -1
+ buf, err := os.ReadFile("/proc/self/smaps")
+ if err != nil {
+ r.Logger.Printf("error reading /proc/self/smaps: %s", err)
+ return
+ }
+ m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
+ if len(m) != 2 {
+ r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found")
+ return
+ }
+ size, err := strconv.ParseInt(string(m[1]), 10, 64)
+ if err != nil {
+ r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err)
+ return
+ }
+ r.kernelPageSize = size * 1024
+ } else if r.kernelPageSize < 0 {
+ // already failed to determine page size, don't keep
+ // trying/logging
+ return
+ }
+
+ r.reportPIDsMu.Lock()
+ defer r.reportPIDsMu.Unlock()
+ procnames := make([]string, 0, len(r.reportPIDs))
+ for name := range r.reportPIDs {
+ procnames = append(procnames, name)
+ }
+ sort.Strings(procnames)
+ procmem := ""
+ for _, procname := range procnames {
+ pid := r.reportPIDs[procname]
+ buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
+ if err != nil {
+ continue
+ }
+ // If the executable name contains a ')' char,
+ // /proc/$pid/stat will look like '1234 (exec name)) S
+ // 123 ...' -- the last ')' is the end of the 2nd
+ // field.
+ paren := bytes.LastIndexByte(buf, ')')
+ if paren < 0 {
+ continue
+ }
+ fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
+ if len(fields) < 24 {
+ continue
+ }
+ // rss is the 24th field in .../stat, and fields[0]
+ // here is the last char ')' of the 2nd field, so
+ // rss is fields[22]
+ rss, err := strconv.ParseInt(string(fields[22]), 10, 64)
+ if err != nil {
+ continue
+ }
+ procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname)
+ }
+ if procmem != "" {
+ r.Logger.Printf("procmem%s\n", procmem)
+ }
}
func (r *Reporter) doNetworkStats() {
package crunchstat
import (
- "bufio"
- "io"
+ "bytes"
"log"
"os"
"regexp"
+ "strconv"
"testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ . "gopkg.in/check.v1"
)
-func bufLogger() (*log.Logger, *bufio.Reader) {
- r, w := io.Pipe()
- logger := log.New(w, "", 0)
- return logger, bufio.NewReader(r)
+func Test(t *testing.T) {
+ TestingT(t)
}
-func TestReadAllOrWarnFail(t *testing.T) {
- logger, rcv := bufLogger()
- rep := Reporter{Logger: logger}
+var _ = Suite(&suite{})
- done := make(chan bool)
- var msg []byte
- var err error
- go func() {
- msg, err = rcv.ReadBytes('\n')
- close(done)
- }()
- {
- // The special file /proc/self/mem can be opened for
- // reading, but reading from byte 0 returns an error.
- f, err := os.Open("/proc/self/mem")
- if err != nil {
- t.Fatalf("Opening /proc/self/mem: %s", err)
- }
- if x, err := rep.readAllOrWarn(f); err == nil {
- t.Fatalf("Expected error, got %v", x)
- }
- }
- <-done
- if err != nil {
- t.Fatal(err)
- } else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
- t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
- }
+type suite struct{}
+
+func (s *suite) TestReadAllOrWarnFail(c *C) {
+ var logger bytes.Buffer
+ rep := Reporter{Logger: log.New(&logger, "", 0)}
+
+ // The special file /proc/self/mem can be opened for
+ // reading, but reading from byte 0 returns an error.
+ f, err := os.Open("/proc/self/mem")
+ c.Assert(err, IsNil)
+ defer f.Close()
+ _, err = rep.readAllOrWarn(f)
+ c.Check(err, NotNil)
+ c.Check(logger.String(), Matches, "^warning: read /proc/self/mem: .*\n")
}
-func TestReadAllOrWarnSuccess(t *testing.T) {
- rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
+func (s *suite) TestReadAllOrWarnSuccess(c *C) {
+ var logbuf bytes.Buffer
+ rep := Reporter{Logger: log.New(&logbuf, "", 0)}
f, err := os.Open("./crunchstat_test.go")
- if err != nil {
- t.Fatalf("Opening ./crunchstat_test.go: %s", err)
- }
+ c.Assert(err, IsNil)
+ defer f.Close()
data, err := rep.readAllOrWarn(f)
- if err != nil {
- t.Fatalf("got error %s", err)
+ c.Check(err, IsNil)
+ c.Check(string(data), Matches, "(?ms).*\npackage crunchstat\n.*")
+ c.Check(logbuf.String(), Equals, "")
+}
+
+func (s *suite) TestReportPIDs(c *C) {
+ var logbuf bytes.Buffer
+ logger := logrus.New()
+ logger.Out = &logbuf
+ r := Reporter{
+ Logger: logger,
+ CgroupRoot: "/sys/fs/cgroup",
+ PollPeriod: time.Second,
}
- if matched, err := regexp.MatchString("\npackage crunchstat\n", string(data)); err != nil || !matched {
- t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+ r.Start()
+ r.ReportPID("init", 1)
+ r.ReportPID("test_process", os.Getpid())
+ r.ReportPID("nonexistent", 12345) // should be silently ignored/omitted
+ for deadline := time.Now().Add(10 * time.Second); ; time.Sleep(time.Millisecond) {
+ if time.Now().After(deadline) {
+ c.Error("timed out")
+ break
+ }
+ if m := regexp.MustCompile(`(?ms).*procmem \d+ init (\d+) test_process.*`).FindSubmatch(logbuf.Bytes()); len(m) > 0 {
+ size, err := strconv.ParseInt(string(m[1]), 10, 64)
+ c.Check(err, IsNil)
+ // Expect >1 MiB and <100 MiB -- otherwise we
+ // are probably misinterpreting /proc/N/stat
+ // or multiplying by the wrong page size.
+ c.Check(size > 1000000, Equals, true)
+ c.Check(size < 100000000, Equals, true)
+ break
+ }
}
+ c.Logf("%s", logbuf.String())
}
"sync"
"git.arvados.org/arvados.git/lib/controller/api"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/jmoiron/sqlx"
})
return txn.tx, txn.err
}
+
+var errDBConnection = errors.New("database connection error")
+
+type DBConnector struct {
+ PostgreSQL arvados.PostgreSQL
+ pgdb *sqlx.DB
+ mtx sync.Mutex
+}
+
+func (dbc *DBConnector) GetDB(ctx context.Context) (*sqlx.DB, error) {
+ dbc.mtx.Lock()
+ defer dbc.mtx.Unlock()
+ if dbc.pgdb != nil {
+ return dbc.pgdb, nil
+ }
+ db, err := sqlx.Open("postgres", dbc.PostgreSQL.Connection.String())
+ if err != nil {
+ ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
+ return nil, errDBConnection
+ }
+ if p := dbc.PostgreSQL.ConnectionPool; p > 0 {
+ db.SetMaxOpenConns(p)
+ }
+ if err := db.Ping(); err != nil {
+ ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
+ return nil, errDBConnection
+ }
+ dbc.pgdb = db
+ return db, nil
+}
f.StringVar(&diag.dockerImage, "docker-image", "", "image to use when running a test container (default: use embedded hello-world image)")
f.BoolVar(&diag.checkInternal, "internal-client", false, "check that this host is considered an \"internal\" client")
f.BoolVar(&diag.checkExternal, "external-client", false, "check that this host is considered an \"external\" client")
+ f.BoolVar(&diag.verbose, "v", false, "verbose: include more information in report")
f.IntVar(&diag.priority, "priority", 500, "priority for test container (1..1000, or 0 to skip)")
f.DurationVar(&diag.timeout, "timeout", 10*time.Second, "timeout for http requests")
if ok, code := cmd.ParseFlags(f, prog, args, "", stderr); !ok {
}
// docker save hello-world > hello-world.tar
+//
//go:embed hello-world.tar
var HelloWorldDockerImage []byte
dockerImage string
checkInternal bool
checkExternal bool
+ verbose bool
timeout time.Duration
logger *logrus.Logger
errors []string
diag.logger.Infof(" ... "+f, args...)
}
+func (diag *diagnoser) verbosef(f string, args ...interface{}) {
+ if diag.verbose {
+ diag.logger.Infof(" ... "+f, args...)
+ }
+}
+
func (diag *diagnoser) warnf(f string, args ...interface{}) {
diag.logger.Warnf(" ... "+f, args...)
}
return
}
+ hostname, err := os.Hostname()
+ if err != nil {
+ diag.warnf("error getting hostname: %s")
+ } else {
+ diag.verbosef("hostname = %s", hostname)
+ }
+
diag.dotest(5, "running health check (same as `arvados-server check`)", func() error {
ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(&bytes.Buffer{}, "text", "info"))
ldr.SetupFlags(flag.NewFlagSet("diagnostics", flag.ContinueOnError))
return err
}
if cluster.SystemRootToken != os.Getenv("ARVADOS_API_TOKEN") {
- diag.infof("skipping because provided token is not SystemRootToken")
+ return fmt.Errorf("diagnostics usage error: %s is readable but SystemRootToken does not match $ARVADOS_API_TOKEN (to fix, either run 'arvados-client sudo diagnostics' to load everything from config file, or set ARVADOS_CONFIG=- to load nothing from config file)", ldr.Path)
}
agg := &health.Aggregator{Cluster: cluster}
resp := agg.ClusterHealth()
for _, e := range resp.Errors {
diag.errorf("health check: %s", e)
}
- diag.infof("health check: reported clock skew %v", resp.ClockSkew)
+ if len(resp.Errors) > 0 {
+ diag.infof("consider running `arvados-server check -yaml` for a comprehensive report")
+ }
+ diag.verbosef("reported clock skew = %v", resp.ClockSkew)
+ reported := map[string]bool{}
+ for _, result := range resp.Checks {
+ version := strings.SplitN(result.Metrics.Version, " (go", 2)[0]
+ if version != "" && !reported[version] {
+ diag.verbosef("arvados version = %s", version)
+ reported[version] = true
+ }
+ }
+ reported = map[string]bool{}
+ for _, result := range resp.Checks {
+ if result.Server != "" && !reported[result.Server] {
+ diag.verbosef("http frontend version = %s", result.Server)
+ reported[result.Server] = true
+ }
+ }
+ reported = map[string]bool{}
+ for _, result := range resp.Checks {
+ if sha := result.ConfigSourceSHA256; sha != "" && !reported[sha] {
+ diag.verbosef("config file sha256 = %s", sha)
+ reported[sha] = true
+ }
+ }
return nil
})
if err != nil {
return err
}
- diag.debugf("BlobSignatureTTL = %d", dd.BlobSignatureTTL)
+ diag.verbosef("BlobSignatureTTL = %d", dd.BlobSignatureTTL)
return nil
})
if err != nil {
return err
}
- diag.debugf("Collections.BlobSigning = %v", cluster.Collections.BlobSigning)
+ diag.verbosef("Collections.BlobSigning = %v", cluster.Collections.BlobSigning)
cfgOK = true
return nil
})
if err != nil {
return err
}
- diag.debugf("user uuid = %s", user.UUID)
+ diag.verbosef("user uuid = %s", user.UUID)
return nil
})
isInternal := found["proxy"] == 0 && len(keeplist.Items) > 0
isExternal := found["proxy"] > 0 && found["proxy"] == len(keeplist.Items)
if isExternal {
- diag.debugf("controller returned only proxy services, this host is treated as \"external\"")
+ diag.infof("controller returned only proxy services, this host is treated as \"external\"")
} else if isInternal {
- diag.debugf("controller returned only non-proxy services, this host is treated as \"internal\"")
+ diag.infof("controller returned only non-proxy services, this host is treated as \"internal\"")
}
if (diag.checkInternal && !isInternal) || (diag.checkExternal && !isExternal) {
return fmt.Errorf("expecting internal=%v external=%v, but found internal=%v external=%v", diag.checkInternal, diag.checkExternal, isInternal, isExternal)
}
if len(grplist.Items) > 0 {
project = grplist.Items[0]
- diag.debugf("using existing project, uuid = %s", project.UUID)
+ diag.verbosef("using existing project, uuid = %s", project.UUID)
return nil
}
diag.debugf("list groups: ok, no results")
if err != nil {
return fmt.Errorf("create project: %s", err)
}
- diag.debugf("created project, uuid = %s", project.UUID)
+ diag.verbosef("created project, uuid = %s", project.UUID)
return nil
})
if err != nil {
return err
}
- diag.debugf("ok, uuid = %s", collection.UUID)
+ diag.verbosef("ok, uuid = %s", collection.UUID)
return nil
})
if err != nil {
return err
}
- diag.debugf("container request uuid = %s", cr.UUID)
- diag.debugf("container uuid = %s", cr.ContainerUUID)
+ diag.verbosef("container request uuid = %s", cr.UUID)
+ diag.verbosef("container uuid = %s", cr.ContainerUUID)
timeout := 10 * time.Minute
diag.infof("container request submitted, waiting up to %v for container to run", arvados.Duration(timeout))
- ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(timeout))
- defer cancel()
+ deadline := time.Now().Add(timeout)
var c arvados.Container
- for ; cr.State != arvados.ContainerRequestStateFinal; time.Sleep(2 * time.Second) {
- ctx, cancel := context.WithDeadline(ctx, time.Now().Add(diag.timeout))
+ for ; cr.State != arvados.ContainerRequestStateFinal && time.Now().Before(deadline); time.Sleep(2 * time.Second) {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(diag.timeout))
defer cancel()
crStateWas := cr.State
if c.State != cStateWas {
diag.debugf("container state = %s", c.State)
}
+
+ cancel()
}
+ if cr.State != arvados.ContainerRequestStateFinal {
+ err := client.RequestAndDecodeContext(context.Background(), &cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
+ "container_request": map[string]interface{}{
+ "priority": 0,
+ }})
+ if err != nil {
+ diag.infof("error canceling container request %s: %s", cr.UUID, err)
+ } else {
+ diag.debugf("canceled container request %s", cr.UUID)
+ }
+ return fmt.Errorf("timed out waiting for container to finish; container request %s state was %q, container %s state was %q", cr.UUID, cr.State, c.UUID, c.State)
+ }
if c.State != arvados.ContainerStateComplete {
return fmt.Errorf("container request %s is final but container %s did not complete: container state = %q", cr.UUID, cr.ContainerUUID, c.State)
- } else if c.ExitCode != 0 {
+ }
+ if c.ExitCode != 0 {
return fmt.Errorf("container exited %d", c.ExitCode)
}
return nil
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/lib/dispatchcloud/container"
"git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
"git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
Registry *prometheus.Registry
InstanceSetID cloud.InstanceSetID
+ dbConnector ctrlctx.DBConnector
logger logrus.FieldLogger
instanceSet cloud.InstanceSet
pool pool
func (disp *dispatcher) initialize() {
disp.logger = ctxlog.FromContext(disp.Context)
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
disp.ArvClient.AuthToken = disp.AuthToken
if err != nil {
disp.logger.Fatalf("error initializing driver: %s", err)
}
+ dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
disp.instanceSet = instanceSet
disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
}
func (disp *dispatcher) run() {
+ defer dblock.Dispatch.Unlock()
defer close(disp.stopped)
defer disp.instanceSet.Stop()
defer disp.pool.Stop()
"sync"
"time"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
MinTimeBetweenCreateCalls: time.Millisecond,
}
+ // We need the postgresql connection info from the integration
+ // test config.
+ cfg, err := config.NewLoader(nil, ctxlog.FromContext(s.ctx)).Load()
+ c.Assert(err, check.IsNil)
+ testcluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+
s.cluster = &arvados.Cluster{
ManagementToken: "test-management-token",
+ PostgreSQL: testcluster.PostgreSQL,
Containers: arvados.ContainersConfig{
CrunchRunCommand: "crunch-run",
CrunchRunArgumentsList: []string{"--foo", "--extra='args'"},
err := s.disp.CheckHealth()
c.Check(err, check.IsNil)
- select {
- case <-done:
- c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
- case <-time.After(10 * time.Second):
- c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
+ for len(waiting) > 0 {
+ waswaiting := len(waiting)
+ select {
+ case <-done:
+ // loop will end because len(waiting)==0
+ case <-time.After(3 * time.Second):
+ if len(waiting) >= waswaiting {
+ c.Fatalf("timed out; no progress in 3s while waiting for %d containers: %q", len(waiting), waiting)
+ }
+ }
}
+ c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
deadline := time.Now().Add(5 * time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
needRAM += int64(cc.Containers.ReserveExtraRAM)
- needRAM += int64(cc.Containers.LocalKeepBlobBuffersPerVCPU * needVCPUs * (1 << 26))
+ if cc.Containers.LocalKeepBlobBuffersPerVCPU > 0 {
+ // + 200 MiB for keepstore process + 10% for GOGC=10
+ needRAM += 220 << 20
+ // + 64 MiB for each blob buffer + 10% for GOGC=10
+ needRAM += int64(cc.Containers.LocalKeepBlobBuffersPerVCPU * needVCPUs * (1 << 26) * 11 / 10)
+ }
needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
ok := false
"costly": {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
},
} {
- best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu, Containers: arvados.ContainersConfig{ReserveExtraRAM: 268435456}}, &arvados.Container{
+ best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu, Containers: arvados.ContainersConfig{
+ LocalKeepBlobBuffersPerVCPU: 1,
+ ReserveExtraRAM: 268435456,
+ }}, &arvados.Container{
Mounts: map[string]arvados.Mount{
"/tmp": {Kind: "tmp", Capacity: 2 * int64(GiB)},
},
}
}
-func (*NodeSizeSuite) TestChoosePreemptable(c *check.C) {
+func (*NodeSizeSuite) TestChooseWithBlobBuffersOverhead(c *check.C) {
+ menu := map[string]arvados.InstanceType{
+ "nearly": {Price: 2.2, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "small"},
+ "best": {Price: 3.3, RAM: 8000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+ "costly": {Price: 4.4, RAM: 12000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+ }
+ best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu, Containers: arvados.ContainersConfig{
+ LocalKeepBlobBuffersPerVCPU: 16, // 1 GiB per vcpu => 2 GiB
+ ReserveExtraRAM: 268435456,
+ }}, &arvados.Container{
+ Mounts: map[string]arvados.Mount{
+ "/tmp": {Kind: "tmp", Capacity: 2 * int64(GiB)},
+ },
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 2,
+ RAM: 987654321,
+ KeepCacheRAM: 123456789,
+ },
+ })
+ c.Check(err, check.IsNil)
+ c.Check(best.Name, check.Equals, "best")
+}
+
+func (*NodeSizeSuite) TestChoosePreemptible(c *check.C) {
menu := map[string]arvados.InstanceType{
"costly": {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Preemptible: true, Name: "costly"},
"almost best": {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "almost best"},
var Command cmd.Handler = &installCommand{}
-const goversion = "1.17.7"
+const goversion = "1.18.8"
const (
- rubyversion = "2.7.5"
+ rubyversion = "2.7.6"
bundlerversion = "2.2.19"
singularityversion = "3.9.9"
pjsversion = "1.9.8"
geckoversion = "0.24.0"
gradleversion = "5.3.1"
- nodejsversion = "v12.22.11"
+ nodejsversion = "v12.22.12"
devtestDatabasePassword = "insecure_arvados_test"
- workbench2version = "2454ac35292a79594c32a80430740317ed5005cf"
+ workbench2version = "e30e54d674c95ee15e296c71e471c1555bdc5a38" // 2.4.3
)
//go:embed arvados.service
"default-jre-headless",
"gettext",
"libattr1-dev",
- "libcrypt-ssleay-perl",
"libfuse-dev",
"libgbm1", // cypress / workbench2 tests
"libgnutls28-dev",
- "libjson-perl",
"libpam-dev",
"libpcre3-dev",
"libpq-dev",
"libreadline-dev",
"libssl-dev",
- "libwww-perl",
"libxml2-dev",
"libxslt1-dev",
"linkchecker",
}
switch {
case osv.Debian && osv.Major >= 11:
- pkgs = append(pkgs, "g++", "libcurl4", "libcurl4-openssl-dev", "perl-modules-5.32")
+ pkgs = append(pkgs, "g++", "libcurl4", "libcurl4-openssl-dev")
case osv.Debian && osv.Major >= 10:
- pkgs = append(pkgs, "g++", "libcurl4", "libcurl4-openssl-dev", "perl-modules")
+ pkgs = append(pkgs, "g++", "libcurl4", "libcurl4-openssl-dev")
case osv.Debian || osv.Ubuntu:
- pkgs = append(pkgs, "g++", "libcurl3", "libcurl3-openssl-dev", "perl-modules")
+ pkgs = append(pkgs, "g++", "libcurl3", "libcurl3-openssl-dev")
case osv.Centos:
pkgs = append(pkgs, "gcc", "gcc-c++", "libcurl-devel", "postgresql-devel")
}
} else {
err = inst.runBash(`
NJS=`+nodejsversion+`
+rm -rf /var/lib/arvados/node-*-linux-x64
wget --progress=dot:giga -O- https://nodejs.org/dist/${NJS}/node-${NJS}-linux-x64.tar.xz | sudo tar -C /var/lib/arvados -xJf -
ln -sfv /var/lib/arvados/node-${NJS}-linux-x64/bin/{node,npm} /usr/local/bin/
`, stdout, stderr)
"time"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
Registry *prometheus.Registry
logger logrus.FieldLogger
+ dbConnector ctrlctx.DBConnector
lsfcli lsfcli
lsfqueue lsfqueue
arvDispatcher *dispatch.Dispatcher
func (disp *dispatcher) Start() {
disp.initOnce.Do(func() {
disp.init()
+ dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
go func() {
+ defer dblock.Dispatch.Unlock()
disp.checkLsfQueueForOrphans()
err := disp.arvDispatcher.Run(disp.Context)
if err != nil {
lsfcli: &disp.lsfcli,
}
disp.ArvClient.AuthToken = disp.AuthToken
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
c.Assert(err, check.IsNil)
cluster, err := cfg.GetCluster("")
c.Assert(err, check.IsNil)
+ cluster.Containers.ReserveExtraRAM = 256 << 20
cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
cluster.InstanceTypes = arvados.InstanceTypeMap{
}
err = tx.Authenticate(pam.DisallowNullAuthtok)
if err != nil {
- err = fmt.Errorf("PAM: %s (message = %q)", err, errorMessage)
+ err = fmt.Errorf("PAM: %s (message = %q, sentPassword = %v)", err, errorMessage, sentPassword)
logrus.WithError(err).Print("authentication failed")
os.Exit(1)
}
from .perf import Perf
from ._version import __version__
from .executor import ArvCwlExecutor
+from .fsaccess import workflow_uuid_pattern
# These aren't used directly in this file but
# other code expects to import them from here
action="store_false", default=True,
help=argparse.SUPPRESS)
+ parser.add_argument("--disable-git", dest="git_info",
+ action="store_false", default=True,
+ help=argparse.SUPPRESS)
+
parser.add_argument("--disable-color", dest="enable_color",
action="store_false", default=True,
help=argparse.SUPPRESS)
parser.add_argument("--http-timeout", type=int,
default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
+ parser.add_argument("--defer-downloads", action="store_true", default=False,
+ help="When submitting a workflow, defer downloading HTTP URLs to workflow launch instead of downloading to Keep before submit.")
+
+ parser.add_argument("--varying-url-params", type=str, default="",
+ help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+ parser.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+ help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
# unit tests.
stdout = None
+ if arvargs.submit and (arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow)):
+ executor.loadingContext.do_validate = False
+ executor.fast_submit = True
+
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
container_request["state"] = "Committed"
container_request.setdefault("properties", {})
+ container_request["properties"]["cwl_input"] = self.joborder
+
runtime_constraints = {}
if runtimeContext.project_uuid:
if container["output"]:
outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+
+ properties = record["properties"].copy()
+ properties["cwl_output"] = outputs
+ self.arvrunner.api.container_requests().update(
+ uuid=self.uuid,
+ body={"container_request": {"properties": properties}}
+ ).execute(num_retries=self.arvrunner.num_retries)
except WorkflowException as e:
# Only include a stack trace if in debug mode.
# A stack trace may obfuscate more useful output about the workflow.
"kind": "collection",
"portable_data_hash": "%s" % workflowcollection
}
+ elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
+ packed = yaml.safe_load(record["definition"])
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "content": packed
+ }
+ container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
else:
packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
workflowpath = "/var/lib/cwl/workflow.json#main"
"kind": "json",
"content": packed
}
- if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
- container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
if runtimeContext.enable_preemptible is False:
command.append("--disable-preemptible")
+ if runtimeContext.varying_url_params:
+ command.append("--varying-url-params="+runtimeContext.varying_url_params)
+
+ if runtimeContext.prefer_cached_downloads:
+ command.append("--prefer-cached-downloads")
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
from schema_salad.sourceline import SourceLine, cmap
import schema_salad.ref_resolver
+import arvados.collection
+
from cwltool.pack import pack
from cwltool.load_tool import fetch_document, resolve_and_validate_document
from cwltool.process import shortname
max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
sum_res_pars = ("outdirMin", "outdirMax")
+def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
+ col = arvados.collection.Collection(api_client=arvRunner.api,
+ keep_client=arvRunner.keep_client)
+
+ with col.open("workflow.json", "wt") as f:
+ json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
+
+ pdh = col.portable_data_hash()
+
+ toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+ if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
+ toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
+
+ existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
+ if len(existing["items"]) == 0:
+ col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
+
+ # now construct the wrapper
+
+ step = {
+ "id": "#main/" + toolname,
+ "in": [],
+ "out": [],
+ "run": "keep:%s/workflow.json#main" % pdh,
+ "label": name
+ }
+
+ newinputs = []
+ for i in main["inputs"]:
+ inp = {}
+ # Make sure to only copy known fields that are meaningful at
+ # the workflow level. In practice this ensures that if we're
+ # wrapping a CommandLineTool we don't grab inputBinding.
+ # Right now also excludes extension fields, which is fine,
+ # Arvados doesn't currently look for any extension fields on
+ # input parameters.
+ for f in ("type", "label", "secondaryFiles", "streamable",
+ "doc", "id", "format", "loadContents",
+ "loadListing", "default"):
+ if f in i:
+ inp[f] = i[f]
+ newinputs.append(inp)
+
+ wrapper = {
+ "class": "Workflow",
+ "id": "#main",
+ "inputs": newinputs,
+ "outputs": [],
+ "steps": [step]
+ }
+
+ for i in main["inputs"]:
+ step["in"].append({
+ "id": "#main/step/%s" % shortname(i["id"]),
+ "source": i["id"]
+ })
+
+ for i in main["outputs"]:
+ step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
+ wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
+ "type": i["type"],
+ "id": i["id"]})
+
+ wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
+
+ if main.get("requirements"):
+ wrapper["requirements"].extend(main["requirements"])
+ if main.get("hints"):
+ wrapper["hints"] = main["hints"]
+
+ doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
+
+ if git_info:
+ for g in git_info:
+ doc[g] = git_info[g]
+
+ return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
+
def upload_workflow(arvRunner, tool, job_order, project_uuid,
runtimeContext, uuid=None,
submit_runner_ram=0, name=None, merged_map=None,
main["hints"] = hints
+ wrapper = make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool)
+
body = {
"workflow": {
"name": name,
"description": tool.tool.get("doc", ""),
- "definition":json.dumps(packed, sort_keys=True, indent=4, separators=(',',': '))
+ "definition": wrapper
}}
if project_uuid:
body["workflow"]["owner_uuid"] = project_uuid
**argv
): # type: (...) -> None
- super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
- self.tool["class"] = "WorkflowStep"
+ if arvrunner.fast_submit:
+ self.tool = toolpath_object
+ self.tool["inputs"] = []
+ self.tool["outputs"] = []
+ else:
+ super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+ self.tool["class"] = "WorkflowStep"
self.arvrunner = arvrunner
def job(self, joborder, output_callback, runtimeContext):
self.match_local_docker = False
self.enable_preemptible = None
self.copy_deps = None
+ self.defer_downloads = False
+ self.varying_url_params = ""
+ self.prefer_cached_downloads = False
super(ArvRuntimeContext, self).__init__(kwargs)
kind = 'error'
elif record.levelno >= logging.WARNING:
kind = 'warning'
+ if kind == 'warning' and record.name == "salad":
+ # Don't send validation warnings to runtime status,
+ # they're noisy and unhelpful.
+ return
if kind is not None and self.updatingRuntimeStatus is not True:
self.updatingRuntimeStatus = True
try:
arvargs.output_tags = None
arvargs.thread_count = 1
arvargs.collection_cache_size = None
+ arvargs.git_info = True
+ arvargs.submit = False
+ arvargs.defer_downloads = False
self.api = api_client
self.processes = {}
self.fs_access = None
self.secret_store = None
self.stdout = stdout
+ self.fast_submit = False
+ self.git_info = arvargs.git_info
if keep_client is not None:
self.keep_client = keep_client
self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=self.collection_cache)
+ self.defer_downloads = arvargs.submit and arvargs.defer_downloads
+
validate_cluster_target(self, self.toplevel_runtimeContext)
page = keys[:pageSize]
try:
proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
- except Exception:
- logger.exception("Error checking states on API server: %s")
+ except Exception as e:
+ logger.exception("Error checking states on API server: %s", e)
remain_wait = self.poll_interval
continue
def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
self.debug = runtimeContext.debug
- git_info = self.get_git_info(updated_tool)
+ git_info = self.get_git_info(updated_tool) if self.git_info else {}
if git_info:
logger.info("Git provenance")
for g in git_info:
controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
- updated_tool.visit(self.check_features)
+ if not self.fast_submit:
+ updated_tool.visit(self.check_features)
self.pipeline = None
self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
loadingContext = self.loadingContext.copy()
loadingContext.do_validate = False
loadingContext.disable_js_validation = True
- if submitting:
+ if submitting and not self.fast_submit:
loadingContext.do_update = False
# Document may have been auto-updated. Reload the original
# document with updating disabled because we want to
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- logger.info("Uploading workflow dependencies")
- with Perf(metrics, "upload_workflow_deps"):
- merged_map = upload_workflow_deps(self, tool, runtimeContext)
+ if not self.fast_submit:
+ logger.info("Uploading workflow dependencies")
+ with Perf(metrics, "upload_workflow_deps"):
+ merged_map = upload_workflow_deps(self, tool, runtimeContext)
+ else:
+ merged_map = {}
# Recreate process object (ArvadosWorkflow or
# ArvadosCommandTool) because tool document may have been
try:
if url.startswith("http://arvados.org/cwl"):
return True
- if url.startswith("keep:"):
- return self.fsaccess.exists(url)
- if url.startswith("arvwf:"):
- if self.fetch_text(url):
+ urld, _ = urllib.parse.urldefrag(url)
+ if urld.startswith("keep:"):
+ return self.fsaccess.exists(urld)
+ if urld.startswith("arvwf:"):
+ if self.fetch_text(urld):
return True
except arvados.errors.NotFoundError:
return False
properties[url]["Date"] = my_formatdate(now)
-def changed(url, properties, now):
+def changed(url, clean_url, properties, now):
req = requests.head(url, allow_redirects=True)
- remember_headers(url, properties, req.headers, now)
if req.status_code != 200:
- raise Exception("Got status %s" % req.status_code)
+ # Sometimes endpoints are misconfigured and will deny HEAD but
+ # allow GET so instead of failing here, we'll try GET If-None-Match
+ return True
- pr = properties[url]
- if "ETag" in pr and "ETag" in req.headers:
- if pr["ETag"] == req.headers["ETag"]:
- return False
+ etag = properties[url].get("ETag")
+
+ if url in properties:
+ del properties[url]
+ remember_headers(clean_url, properties, req.headers, now)
+
+ if "ETag" in req.headers and etag == req.headers["ETag"]:
+ # Didn't change
+ return False
return True
-def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
- r = api.collections().list(filters=[["properties", "exists", url]]).execute()
+def etag_quote(etag):
+ # if it already has leading and trailing quotes, do nothing
+ if etag[0] == '"' and etag[-1] == '"':
+ return etag
+ else:
+ # Add quotes.
+ return '"' + etag + '"'
+
+
+def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
+ varying_params = [s.strip() for s in varying_url_params.split(",")]
+
+ parsed = urllib.parse.urlparse(url)
+ query = [q for q in urllib.parse.parse_qsl(parsed.query)
+ if q[0] not in varying_params]
+
+ clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
+ urllib.parse.urlencode(query, safe="/"), parsed.fragment))
+
+ r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+ if clean_url == url:
+ items = r1["items"]
+ else:
+ r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
+ items = r1["items"] + r2["items"]
now = utcnow()
- for item in r["items"]:
+ etags = {}
+
+ for item in items:
properties = item["properties"]
- if fresh_cache(url, properties, now):
- # Do nothing
+
+ if clean_url in properties:
+ cache_url = clean_url
+ elif url in properties:
+ cache_url = url
+ else:
+ return False
+
+ if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
+ # HTTP caching rules say we should use the cache
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
- if not changed(url, properties, now):
+ if not changed(cache_url, clean_url, properties, now):
# ETag didn't change, same content, just update headers
api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
+ if "ETag" in properties[cache_url] and len(properties[cache_url]["ETag"]) > 2:
+ etags[properties[cache_url]["ETag"]] = item
+
+ logger.debug("Found ETags %s", etags)
+
properties = {}
- req = requests.get(url, stream=True, allow_redirects=True)
+ headers = {}
+ if etags:
+ headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
+ logger.debug("Sending GET request with headers %s", headers)
+ req = requests.get(url, stream=True, allow_redirects=True, headers=headers)
- if req.status_code != 200:
+ if req.status_code not in (200, 304):
raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
- remember_headers(url, properties, req.headers, now)
+ remember_headers(clean_url, properties, req.headers, now)
+
+ if req.status_code == 304 and "ETag" in req.headers and req.headers["ETag"] in etags:
+ item = etags[req.headers["ETag"]]
+ item["properties"].update(properties)
+ api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
+ cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+ return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
- if "Content-Length" in properties[url]:
- cl = int(properties[url]["Content-Length"])
+ if "Content-Length" in properties[clean_url]:
+ cl = int(properties[clean_url]["Content-Length"])
logger.info("Downloading %s (%s bytes)", url, cl)
else:
cl = None
else:
name = grp.group(4)
else:
- name = urllib.parse.urlparse(url).path.split("/")[-1]
+ name = parsed.path.split("/")[-1]
count = 0
start = time.time()
logger.info("%d downloaded, %3.2f MiB/s", count, (bps / (1024*1024)))
checkpoint = loopnow
+ logger.info("Download complete")
+
+ collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
+
+ # max length - space to add a timestamp used by ensure_unique_name
+ max_name_len = 254 - 28
+
+ if len(collectionname) > max_name_len:
+ over = len(collectionname) - max_name_len
+ split = int(max_name_len/2)
+ collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
- collectionname = "Downloaded from %s" % urllib.parse.quote(url, safe='')
c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
elif src.startswith("http:") or src.startswith("https:"):
try:
- keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
- logger.info("%s is %s", src, keepref)
- self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
+ if self.arvrunner.defer_downloads:
+ # passthrough, we'll download it later.
+ self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
+ else:
+ keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+ varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
+ prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
+ logger.info("%s is %s", src, keepref)
+ self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
except Exception as e:
logger.warning(str(e))
else:
if loc.startswith("_:"):
return True
+ if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
+ return False
+
i = loc.rfind("/")
if i > -1:
loc_prefix = loc[:i+1]
import cwltool.workflow
from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
shortname, Process, fill_in_defaults)
-from cwltool.load_tool import fetch_document
+from cwltool.load_tool import fetch_document, jobloaderctx
from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
from cwltool.builder import substitute
from cwltool.pack import pack
from cwltool.update import INTERNAL_VERSION
from cwltool.builder import Builder
import schema_salad.validate as validate
+import schema_salad.ref_resolver
import arvados.collection
import arvados.util
tool.tool["inputs"],
job_order)
+ _jobloaderctx = jobloaderctx.copy()
+ jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
+
jobmapper = upload_dependencies(arvrunner,
name,
- tool.doc_loader,
+ jobloader,
job_order,
job_order.get("id", "#"),
False,
merged_map = {}
tool_dep_cache = {}
+
+ todo = []
+
+ # Standard traversal is top down, we want to go bottom up, so use
+ # the visitor to accumalate a list of nodes to visit, then
+ # visit them in reverse order.
def upload_tool_deps(deptool):
if "id" in deptool:
- discovered_secondaryfiles = {}
- with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
- pm = upload_dependencies(arvrunner,
- "%s dependencies" % (shortname(deptool["id"])),
- document_loader,
- deptool,
- deptool["id"],
- False,
- runtimeContext,
- include_primary=False,
- discovered_secondaryfiles=discovered_secondaryfiles,
- cache=tool_dep_cache)
- document_loader.idx[deptool["id"]] = deptool
- toolmap = {}
- for k,v in pm.items():
- toolmap[k] = v.resolved
- merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+ todo.append(deptool)
tool.visit(upload_tool_deps)
+ for deptool in reversed(todo):
+ discovered_secondaryfiles = {}
+ with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+ pm = upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ runtimeContext,
+ include_primary=False,
+ discovered_secondaryfiles=discovered_secondaryfiles,
+ cache=tool_dep_cache)
+ document_loader.idx[deptool["id"]] = deptool
+ toolmap = {}
+ for k,v in pm.items():
+ toolmap[k] = v.resolved
+ merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+
return merged_map
def arvados_jobs_image(arvrunner, img, runtimeContext):
'cwltool==3.1.20220907141119',
'schema-salad==8.3.20220913105718',
'arvados-python-client{}'.format(pysdk_dep),
- 'setuptools',
'ciso8601 >= 2.0.0',
'networkx < 2.6',
- 'msgpack==1.0.3'
+ 'msgpack==1.0.3',
+ 'importlib-metadata<5',
+ 'setuptools>=40.3.0'
],
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: Workflow
+cwlVersion: v1.1
+inputs:
+ - type:
+ fields:
+ - name: first
+ type: string
+ - name: last
+ type: string
+ type: record
+ id: name
+outputs:
+ - type:
+ fields:
+ - name: first
+ type: string
+ - name: last
+ type: string
+ type: record
+ id: processed_name
+ outputSource: name
+steps: []
}
tool: 19109-upload-secondary.cwl
doc: "Test issue 19109 - correctly discover & upload secondary files"
+
+- job: 19678-name-id.yml
+ output: {
+ "processed_name": {
+ "first": "foo",
+ "last": "bar"
+ }
+ }
+ tool: 19678-name-id.cwl
+ doc: "Test issue 19678 - non-string type input parameter called 'name'"
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+{
+ "$graph": [
+ {
+ "class": "Workflow",
+ "hints": [
+ {
+ "acrContainerImage": "999999999999999999999999999999d3+99",
+ "class": "http://arvados.org/cwl#WorkflowRunnerResources"
+ }
+ ],
+ "id": "#main",
+ "inputs": [],
+ "outputs": [],
+ "requirements": [
+ {
+ "class": "SubworkflowFeatureRequirement"
+ }
+ ],
+ "steps": [
+ {
+ "id": "#main/collection_per_tool.cwl",
+ "in": [],
+ "label": "collection_per_tool.cwl",
+ "out": [],
+ "run": "keep:92045991f69a417f2f26660db67911ef+61/workflow.json#main"
+ }
+ ]
+ }
+ ],
+ "cwlVersion": "v1.2"
+}
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}))
'scheduling_parameters': {
'partitions': ['blurb']
},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}
'cwd': '/var/spool/cwl',
'scheduling_parameters': {
},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}))
arvjob.successCodes = [0]
arvjob.outdir = "/var/spool/cwl"
arvjob.output_ttl = 3600
+ arvjob.uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzz1"
arvjob.collect_outputs.return_value = {"out": "stuff"}
"output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
"uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
"container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
- "modified_at": "2017-05-26T12:01:22Z"
+ "modified_at": "2017-05-26T12:01:22Z",
+ "properties": {}
})
self.assertFalse(api.collections().create.called)
arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
+ runner.api.container_requests().update.assert_called_with(uuid="zzzzz-xvhdp-zzzzzzzzzzzzzz1",
+ body={'container_request': {'properties': {'cwl_output': {'out': 'stuff'}}}})
+
+
# Test to make sure we dont call runtime_status_update if we already did
# some where higher up in the call stack
@mock.patch("arvados_cwl.util.get_current_container")
"output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
"uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
"container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
- "modified_at": "2017-05-26T12:01:22Z"
+ "modified_at": "2017-05-26T12:01:22Z",
+ "properties": {}
})
rts_mock.assert_called_with(
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {
+ "p1": {
+ "basename": "99999999999999999999999999999994+44",
+ "class": "Directory",
+ "dirname": "/keep",
+ "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "listing": [
+ {
+ "basename": "file1",
+ "class": "File",
+ "dirname": "/keep/99999999999999999999999999999994+44",
+ "location": "keep:99999999999999999999999999999994+44/file1",
+ "nameext": "",
+ "nameroot": "file1",
+ "path": "/keep/99999999999999999999999999999994+44/file1",
+ "size": 0
+ },
+ {
+ "basename": "file2",
+ "class": "File",
+ "dirname": "/keep/99999999999999999999999999999994+44",
+ "location": "keep:99999999999999999999999999999994+44/file2",
+ "nameext": "",
+ "nameroot": "file2",
+ "path": "/keep/99999999999999999999999999999994+44/file2",
+ "size": 0
+ }
+ ],
+ "location": "keep:99999999999999999999999999999994+44",
+ "path": "/keep/99999999999999999999999999999994+44"
+ }
+ }},
'secret_mounts': {},
'output_storage_classes': ["default"]
}))
'command': ['md5sum', 'example.conf'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': job_order},
"secret_mounts": {
"/var/spool/cwl/example.conf": {
"content": "username: user\npassword: blorp\n",
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["foo_sc", "bar_sc"]
}))
'scheduling_parameters': {},
'properties': {
"baz": "blorp",
+ "cwl_input": {"x": "blorp"},
"foo": "bar",
"quux": {
"q1": 1,
'command': ['nvidia-smi'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}))
'command': ['echo'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': sched,
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}))
"output_path": "/var/spool/cwl",
"output_ttl": 0,
"priority": 500,
- "properties": {},
+ "properties": {'cwl_input': {
+ "fileblub": {
+ "basename": "token.txt",
+ "class": "File",
+ "dirname": "/keep/99999999999999999999999999999999+118",
+ "location": "keep:99999999999999999999999999999999+118/token.txt",
+ "nameext": ".txt",
+ "nameroot": "token",
+ "path": "/keep/99999999999999999999999999999999+118/token.txt",
+ "size": 0
+ },
+ "sleeptime": 5
+ }},
"runtime_constraints": {
"ram": 1073741824,
"vcpus": 1
'name': u'echo-subwf',
'secret_mounts': {},
'runtime_constraints': {'API': True, 'vcpus': 3, 'ram': 1073741824},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'priority': 500,
'mounts': {
'/var/spool/cwl/cwl.input.yml': {
r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
- getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+ getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={})
cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt",
r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
self.assertEqual(r, "keep:99999999999999999999999999999997+99/file1.txt")
- getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+ getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={})
cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt",
'http://example.com/file1.txt': {
'Date': 'Tue, 15 May 2018 00:00:00 GMT',
'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
- 'ETag': '123456'
+ 'ETag': '"123456"'
}
}
}]
req.headers = {
'Date': 'Tue, 17 May 2018 00:00:00 GMT',
'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
- 'ETag': '123456'
+ 'ETag': '"123456"'
}
headmock.return_value = req
body={"collection":{"properties": {'http://example.com/file1.txt': {
'Date': 'Tue, 17 May 2018 00:00:00 GMT',
'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
- 'ETag': '123456'
+ 'ETag': '"123456"'
}}}})
])
r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
- getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True)
+ getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True, headers={})
cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Fdownload%3Ffn%3D%2Ffile1.txt",
mock.call(uuid=cm.manifest_locator(),
body={"collection":{"properties": {"http://example.com/download?fn=/file1.txt": {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
])
+
+ @mock.patch("requests.get")
+ @mock.patch("requests.head")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_etag_if_none_match(self, collectionmock, headmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ # Head request fails, will try a conditional GET instead
+ req = mock.MagicMock()
+ req.status_code = 403
+ req.headers = {
+ }
+ headmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+ req = mock.MagicMock()
+ req.status_code = 304
+ req.headers = {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }
+ getmock.return_value = req
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={"If-None-Match": '"123456"'})
+ cm.open.assert_not_called()
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {'http://example.com/file1.txt': {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }}}})
+ ])
+
+
+ @mock.patch("requests.get")
+ @mock.patch("requests.head")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_prefer_cached_downloads(self, collectionmock, headmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow, prefer_cached_downloads=True)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ headmock.assert_not_called()
+ getmock.assert_not_called()
+ cm.open.assert_not_called()
+ api.collections().update.assert_not_called()
+
+ @mock.patch("requests.get")
+ @mock.patch("requests.head")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_varying_url_params(self, collectionmock, headmock, getmock):
+ for prurl in ("http://example.com/file1.txt", "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789"):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ prurl: {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }
+ headmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789",
+ utcnow=utcnow, varying_url_params="KeyId,Signature,Expires")
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_not_called()
+ cm.open.assert_not_called()
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {'http://example.com/file1.txt': {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }}}})
+ ])
gitinfo_workflow["$graph"][0]["id"] = "file://%s/tests/wf/%s" % (cwd, wfpath)
mocktool = mock.NonCallableMock(tool=gitinfo_workflow["$graph"][0], metadata=gitinfo_workflow)
- git_info = arvados_cwl.executor.ArvCwlExecutor.get_git_info(mocktool)
- expect_packed_workflow.update(git_info)
+ stubs.git_info = arvados_cwl.executor.ArvCwlExecutor.get_git_info(mocktool)
+ expect_packed_workflow.update(stubs.git_info)
- git_props = {"arv:"+k.split("#", 1)[1]: v for k,v in git_info.items()}
+ stubs.git_props = {"arv:"+k.split("#", 1)[1]: v for k,v in stubs.git_info.items()}
if wfname == wfpath:
- container_name = "%s (%s)" % (wfpath, git_props["arv:gitDescribe"])
+ container_name = "%s (%s)" % (wfpath, stubs.git_props["arv:gitDescribe"])
else:
container_name = wfname
'ram': (1024+256)*1024*1024
},
'use_existing': False,
- 'properties': git_props,
+ 'properties': stubs.git_props,
'secret_mounts': {}
}
root_logger.handlers = handlers
@mock.patch("time.sleep")
- @stubs
+ @stubs()
def test_submit_invalid_runner_ram(self, stubs, tm):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--submit-runner-ram=-2048",
self.assertEqual(exited, 1)
- @stubs
+ @stubs()
def test_submit_container(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
+ 'name': 'submit_wf.cwl ('+ stubs.git_props["arv:gitDescribe"] +') input (169f39d466a5438ac4a90e779bf750c7+53)',
}), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_tool(self, stubs):
# test for issue #16139
exited = arvados_cwl.main(
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_no_reuse(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--disable-reuse",
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--disable-reuse', "--collection-cache-size=256",
- "--output-name=Output from workflow submit_wf.cwl",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
'--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
stubs.expect_container_request_uuid + '\n')
- @stubs
+ @stubs()
def test_submit_container_on_error(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--on-error=stop",
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
- "--output-name=Output from workflow submit_wf.cwl",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
'--debug', '--on-error=stop',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_output_name(self, stubs):
output_name = "test_output_name"
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_storage_classes(self, stubs):
exited = arvados_cwl.main(
["--debug", "--submit", "--no-wait", "--api=containers", "--storage-classes=foo",
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
- '--output-name=Output from workflow submit_wf.cwl',
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
"--debug",
"--storage-classes=foo", '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_multiple_storage_classes(self, stubs):
exited = arvados_cwl.main(
["--debug", "--submit", "--no-wait", "--api=containers", "--storage-classes=foo,bar", "--intermediate-storage-classes=baz",
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
- "--output-name=Output from workflow submit_wf.cwl",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
"--debug",
"--storage-classes=foo,bar", "--intermediate-storage-classes=baz", '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@mock.patch("cwltool.task_queue.TaskQueue")
@mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
@mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection")
- @stubs
+ @stubs()
def test_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
final_output_c = arvados.collection.Collection()
make_output.return_value = ({},final_output_c)
job.side_effect = set_final_output
exited = arvados_cwl.main(
- ["--debug", "--local", "--storage-classes=foo",
+ ["--debug", "--local", "--storage-classes=foo", "--disable-git",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
- make_output.assert_called_with(u'Output of submit_wf.cwl', ['foo'], '', {}, {"out": "zzzzz"})
+ make_output.assert_called_with(u'Output from workflow submit_wf.cwl', ['foo'], '', {}, {"out": "zzzzz"})
self.assertEqual(exited, 0)
@mock.patch("cwltool.task_queue.TaskQueue")
@mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
@mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection")
- @stubs
+ @stubs()
def test_default_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
final_output_c = arvados.collection.Collection()
make_output.return_value = ({},final_output_c)
job.side_effect = set_final_output
exited = arvados_cwl.main(
- ["--debug", "--local",
+ ["--debug", "--local", "--disable-git",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
- make_output.assert_called_with(u'Output of submit_wf.cwl', ['default'], '', {}, {"out": "zzzzz"})
+ make_output.assert_called_with(u'Output from workflow submit_wf.cwl', ['default'], '', {}, {"out": "zzzzz"})
self.assertEqual(exited, 0)
@mock.patch("cwltool.task_queue.TaskQueue")
@mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
@mock.patch("arvados_cwl.executor.ArvCwlExecutor.make_output_collection")
- @stubs
+ @stubs()
def test_storage_class_hint_to_make_output_collection(self, stubs, make_output, job, tq):
final_output_c = arvados.collection.Collection()
make_output.return_value = ({},final_output_c)
job.side_effect = set_final_output
exited = arvados_cwl.main(
- ["--debug", "--local",
+ ["--debug", "--local", "--disable-git",
"tests/wf/submit_storage_class_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
- make_output.assert_called_with(u'Output of submit_storage_class_wf.cwl', ['foo', 'bar'], '', {}, {"out": "zzzzz"})
+ make_output.assert_called_with(u'Output from workflow submit_storage_class_wf.cwl', ['foo', 'bar'], '', {}, {"out": "zzzzz"})
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_output_ttl(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--intermediate-output-ttl", "3600",
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
- "--output-name=Output from workflow submit_wf.cwl", '--debug',
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
+ '--debug',
'--on-error=continue',
"--intermediate-output-ttl=3600",
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_trash_intermediate(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--trash-intermediate",
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
'--debug', '--on-error=continue',
"--trash-intermediate",
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_output_tags(self, stubs):
output_tags = "tag0,tag1,tag2"
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
- "--output-name=Output from workflow submit_wf.cwl",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
"--output-tags="+output_tags, '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_runner_ram(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--submit-runner-ram=2048",
@mock.patch("arvados.collection.CollectionReader")
@mock.patch("time.sleep")
- @stubs
+ @stubs()
def test_submit_file_keepref(self, stubs, tm, collectionReader):
collectionReader().exists.return_value = True
collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "blorp.txt")
@mock.patch("arvados.collection.CollectionReader")
@mock.patch("time.sleep")
- @stubs
+ @stubs()
def test_submit_keepref(self, stubs, tm, reader):
with open("tests/wf/expect_arvworkflow.cwl") as f:
reader().open().__enter__().read.return_value = f.read()
self.assertEqual(exited, 0)
@mock.patch("time.sleep")
- @stubs
+ @stubs()
def test_submit_arvworkflow(self, stubs, tm):
with open("tests/wf/expect_arvworkflow.cwl") as f:
stubs.api.workflows().get().execute.return_value = {"definition": f.read(), "name": "a test workflow"}
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=containers", "--debug",
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--disable-git",
"962eh-7fd4e-gkbzl62qqtfig37", "-x", "XxX"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
'requirements': [
{
'dockerPull': 'debian:buster-slim',
- 'class': 'DockerRequirement',
- "http://arvados.org/cwl#dockerCollectionPDH": "999999999999999999999999999999d4+99"
+ 'class': 'DockerRequirement'
}
],
'id': '#submit_tool.cwl',
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow a test workflow",
+ '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
+ 'output_name': 'Output from workflow a test workflow',
'cwd': '/var/spool/cwl',
'runtime_constraints': {
'API': True,
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_missing_input(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
self.assertEqual(exited, 1)
- @stubs
+ @stubs()
def test_submit_container_project(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
stubs.api.groups().get().execute.return_value = {"group_class": "project"}
'--no-log-timestamps', '--disable-validate', '--disable-color',
"--eval-timeout=20", "--thread-count=0",
'--enable-reuse', "--collection-cache-size=256",
- "--output-name=Output from workflow submit_wf.cwl", '--debug',
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
+ '--debug',
'--on-error=continue',
'--project-uuid='+project_uuid,
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_eval_timeout(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--eval-timeout=60",
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=60.0', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
'--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_collection_cache(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--collection-cache-size=500",
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=500",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
'--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["runtime_constraints"]["ram"] = (1024+500)*1024*1024
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_thread_count(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--thread-count=20",
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=20',
'--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
'--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_runner_image(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--submit-runner-image=arvados/jobs:123",
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_priority(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--priority=669",
arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__, arvrunner.runtimeContext))
- @stubs
+ @stubs()
def test_submit_secrets(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
"tests/wf/secret_wf.cwl", "tests/secret_test_job.yml"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ stubs.git_props["arv:gitPath"] = "sdk/cwl/tests/wf/secret_wf.cwl"
+ stubs.git_info["http://arvados.org/cwl#gitPath"] = "sdk/cwl/tests/wf/secret_wf.cwl"
+
expect_container = {
"command": [
"arvados-cwl-runner",
'--thread-count=0',
"--enable-reuse",
"--collection-cache-size=256",
- '--output-name=Output from workflow secret_wf.cwl'
- '--debug',
+ '--output-name=Output from workflow secret_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
+ "--debug",
"--on-error=continue",
"/var/lib/cwl/workflow.json#main",
"/var/lib/cwl/cwl.input.json"
"path": "/var/spool/cwl/cwl.output.json"
}
},
- "name": "secret_wf.cwl",
- "output_name": "Output from workflow secret_wf.cwl",
+ "name": "secret_wf.cwl (%s)" % stubs.git_props["arv:gitDescribe"],
+ "output_name": "Output from workflow secret_wf.cwl (%s)" % stubs.git_props["arv:gitDescribe"],
"output_path": "/var/spool/cwl",
"priority": 500,
- "properties": {},
+ "properties": stubs.git_props,
"runtime_constraints": {
"API": True,
"ram": 1342177280,
"use_existing": False
}
+ expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"].update(stubs.git_info)
+
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(stubs.capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_request_uuid(self, stubs):
stubs.api._rootDesc["remoteHosts"]["zzzzz"] = "123"
stubs.expect_container_request_uuid = "zzzzz-xvhdp-yyyyyyyyyyyyyyy"
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_container_cluster_id(self, stubs):
stubs.api._rootDesc["remoteHosts"]["zbbbb"] = "123"
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_validate_cluster_id(self, stubs):
stubs.api._rootDesc["remoteHosts"]["zbbbb"] = "123"
exited = arvados_cwl.main(
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
self.assertEqual(exited, 1)
- @stubs
+ @stubs()
def test_submit_validate_project_uuid(self, stubs):
# Fails with bad cluster prefix
exited = arvados_cwl.main(
@mock.patch("arvados.collection.CollectionReader")
- @stubs
+ @stubs()
def test_submit_uuid_inputs(self, stubs, collectionReader):
collectionReader().exists.return_value = True
collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "file1.txt")
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_mismatched_uuid_inputs(self, stubs):
def list_side_effect(**kwargs):
m = mock.MagicMock()
cwltool_logger.removeHandler(stderr_logger)
@mock.patch("arvados.collection.CollectionReader")
- @stubs
+ @stubs()
def test_submit_unknown_uuid_inputs(self, stubs, collectionReader):
collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "file1.txt")
capture_stderr = StringIO()
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_enable_preemptible(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--enable-preemptible",
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers',
- '--no-log-timestamps', '--disable-validate', '--disable-color',
- '--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
+ '--no-log-timestamps', '--disable-validate', '--disable-color',
+ '--eval-timeout=20', '--thread-count=0',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
+ '--debug', '--on-error=continue',
'--enable-preemptible',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_submit_disable_preemptible(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--disable-preemptible",
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers',
- '--no-log-timestamps', '--disable-validate', '--disable-color',
- '--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
+ '--no-log-timestamps', '--disable-validate', '--disable-color',
+ '--eval-timeout=20', '--thread-count=0',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
+ '--debug', '--on-error=continue',
'--disable-preemptible',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(stubs.capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+ self.assertEqual(exited, 0)
+
+ @stubs()
+ def test_submit_container_prefer_cached_downloads(self, stubs):
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--prefer-cached-downloads",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate', '--disable-color',
+ '--eval-timeout=20', '--thread-count=0',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
+ '--debug', "--on-error=continue", '--prefer-cached-downloads',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(stubs.capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+ self.assertEqual(exited, 0)
+
+ @stubs()
+ def test_submit_container_varying_url_params(self, stubs):
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--varying-url-params", "KeyId,Signature",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate', '--disable-color',
+ '--eval-timeout=20', '--thread-count=0',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
+ '--debug', "--on-error=continue", "--varying-url-params=KeyId,Signature",
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
class TestCreateWorkflow(unittest.TestCase):
existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml"
expect_workflow = StripYAMLComments(
- open("tests/wf/expect_upload_packed.cwl").read().rstrip())
+ open("tests/wf/expect_upload_wrapper.cwl").read().rstrip())
+ expect_workflow_altname = StripYAMLComments(
+ open("tests/wf/expect_upload_wrapper_altname.cwl").read().rstrip())
def setUp(self):
cwltool.process._names = set()
handlers = [h for h in root_logger.handlers if not isinstance(h, arvados_cwl.executor.RuntimeStatusLoggingHandler)]
root_logger.handlers = handlers
- @stubs
+ @stubs()
def test_create(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
stubs.api.groups().get().execute.return_value = {"group_class": "project"}
["--create-workflow", "--debug",
"--api=containers",
"--project-uuid", project_uuid,
+ "--disable-git",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
stubs.expect_workflow_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_create_name(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
stubs.api.groups().get().execute.return_value = {"group_class": "project"}
"--api=containers",
"--project-uuid", project_uuid,
"--name", "testing 123",
+ "--disable-git",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
"owner_uuid": project_uuid,
"name": "testing 123",
"description": "",
- "definition": self.expect_workflow,
+ "definition": self.expect_workflow_altname,
}
}
stubs.api.workflows().create.assert_called_with(
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_update(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid}
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
"--debug",
+ "--disable-git",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_update_name(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid}
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
"--debug", "--name", "testing 123",
+ "--disable-git",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
"workflow": {
"name": "testing 123",
"description": "",
- "definition": self.expect_workflow,
+ "definition": self.expect_workflow_altname,
"owner_uuid": project_uuid
}
}
self.existing_workflow_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_create_collection_per_tool(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
stubs.api.groups().get().execute.return_value = {"group_class": "project"}
["--create-workflow", "--debug",
"--api=containers",
"--project-uuid", project_uuid,
+ "--disable-git",
"tests/collection_per_tool/collection_per_tool.cwl"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
- toolfile = "tests/collection_per_tool/collection_per_tool_packed.cwl"
+ toolfile = "tests/collection_per_tool/collection_per_tool_wrapper.cwl"
expect_workflow = StripYAMLComments(open(toolfile).read().rstrip())
body = {
stubs.expect_workflow_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_create_with_imports(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
stubs.api.groups().get().execute.return_value = {"group_class": "project"}
stubs.expect_workflow_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs()
def test_create_with_no_input(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
stubs.api.groups().get().execute.return_value = {"group_class": "project"}
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+{
+ "$graph": [
+ {
+ "class": "Workflow",
+ "hints": [
+ {
+ "acrContainerImage": "999999999999999999999999999999d3+99",
+ "class": "http://arvados.org/cwl#WorkflowRunnerResources"
+ }
+ ],
+ "id": "#main",
+ "inputs": [
+ {
+ "default": {
+ "basename": "blorp.txt",
+ "class": "File",
+ "location": "keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt",
+ "nameext": ".txt",
+ "nameroot": "blorp",
+ "size": 16
+ },
+ "id": "#main/x",
+ "type": "File"
+ },
+ {
+ "default": {
+ "basename": "99999999999999999999999999999998+99",
+ "class": "Directory",
+ "location": "keep:99999999999999999999999999999998+99"
+ },
+ "id": "#main/y",
+ "type": "Directory"
+ },
+ {
+ "default": {
+ "basename": "anonymous",
+ "class": "Directory",
+ "listing": [
+ {
+ "basename": "renamed.txt",
+ "class": "File",
+ "location": "keep:99999999999999999999999999999998+99/file1.txt",
+ "nameext": ".txt",
+ "nameroot": "renamed",
+ "size": 0
+ }
+ ],
+ "location": "_:df80736f-f14d-4b10-b2e3-03aa27f034b2"
+ },
+ "id": "#main/z",
+ "type": "Directory"
+ }
+ ],
+ "outputs": [],
+ "requirements": [
+ {
+ "class": "SubworkflowFeatureRequirement"
+ }
+ ],
+ "steps": [
+ {
+ "id": "#main/submit_wf.cwl",
+ "in": [
+ {
+ "id": "#main/step/x",
+ "source": "#main/x"
+ },
+ {
+ "id": "#main/step/y",
+ "source": "#main/y"
+ },
+ {
+ "id": "#main/step/z",
+ "source": "#main/z"
+ }
+ ],
+ "label": "submit_wf.cwl",
+ "out": [],
+ "run": "keep:f1c2b0c514a5fb9b2a8b5b38a31bab66+61/workflow.json#main"
+ }
+ ]
+ }
+ ],
+ "cwlVersion": "v1.2"
+}
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+{
+ "$graph": [
+ {
+ "class": "Workflow",
+ "hints": [
+ {
+ "acrContainerImage": "999999999999999999999999999999d3+99",
+ "class": "http://arvados.org/cwl#WorkflowRunnerResources"
+ }
+ ],
+ "id": "#main",
+ "inputs": [
+ {
+ "default": {
+ "basename": "blorp.txt",
+ "class": "File",
+ "location": "keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt",
+ "nameext": ".txt",
+ "nameroot": "blorp",
+ "size": 16
+ },
+ "id": "#main/x",
+ "type": "File"
+ },
+ {
+ "default": {
+ "basename": "99999999999999999999999999999998+99",
+ "class": "Directory",
+ "location": "keep:99999999999999999999999999999998+99"
+ },
+ "id": "#main/y",
+ "type": "Directory"
+ },
+ {
+ "default": {
+ "basename": "anonymous",
+ "class": "Directory",
+ "listing": [
+ {
+ "basename": "renamed.txt",
+ "class": "File",
+ "location": "keep:99999999999999999999999999999998+99/file1.txt",
+ "nameext": ".txt",
+ "nameroot": "renamed",
+ "size": 0
+ }
+ ],
+ "location": "_:df80736f-f14d-4b10-b2e3-03aa27f034b2"
+ },
+ "id": "#main/z",
+ "type": "Directory"
+ }
+ ],
+ "outputs": [],
+ "requirements": [
+ {
+ "class": "SubworkflowFeatureRequirement"
+ }
+ ],
+ "steps": [
+ {
+ "id": "#main/submit_wf.cwl",
+ "in": [
+ {
+ "id": "#main/step/x",
+ "source": "#main/x"
+ },
+ {
+ "id": "#main/step/y",
+ "source": "#main/y"
+ },
+ {
+ "id": "#main/step/z",
+ "source": "#main/z"
+ }
+ ],
+ "label": "testing 123",
+ "out": [],
+ "run": "keep:f1c2b0c514a5fb9b2a8b5b38a31bab66+61/workflow.json#main"
+ }
+ ]
+ }
+ ],
+ "cwlVersion": "v1.2"
+}
ADD cwl/cwltool_dist/$cwltool /tmp/
ADD cwl/dist/$runner /tmp/
+RUN $pipcmd install wheel
RUN cd /tmp/arvados-python-client-* && $pipcmd install .
RUN if test -d /tmp/schema-salad-* ; then cd /tmp/schema-salad-* && $pipcmd install . ; fi
RUN if test -d /tmp/cwltool-* ; then cd /tmp/cwltool-* && $pipcmd install . ; fi
RUN cd /tmp/arvados-cwl-runner-* && $pipcmd install .
+# Sometimes Python dependencies install successfully but don't
+# actually work. So run arvados-cwl-runner here to catch fun
+# dependency errors like pkg_resources.DistributionNotFound.
+RUN arvados-cwl-runner --version
+
# Install dependencies and set up system.
RUN /usr/sbin/adduser --disabled-password \
--gecos 'Crunch execution user' crunch && \
EndpointLinkGet = APIEndpoint{"GET", "arvados/v1/links/{uuid}", ""}
EndpointLinkList = APIEndpoint{"GET", "arvados/v1/links", ""}
EndpointLinkDelete = APIEndpoint{"DELETE", "arvados/v1/links/{uuid}", ""}
+ EndpointLogCreate = APIEndpoint{"POST", "arvados/v1/logs", "log"}
+ EndpointLogUpdate = APIEndpoint{"PATCH", "arvados/v1/logs/{uuid}", "log"}
+ EndpointLogGet = APIEndpoint{"GET", "arvados/v1/logs/{uuid}", ""}
+ EndpointLogList = APIEndpoint{"GET", "arvados/v1/logs", ""}
+ EndpointLogDelete = APIEndpoint{"DELETE", "arvados/v1/logs/{uuid}", ""}
EndpointSysTrashSweep = APIEndpoint{"POST", "sys/trash_sweep", ""}
EndpointUserActivate = APIEndpoint{"POST", "arvados/v1/users/{uuid}/activate", ""}
EndpointUserCreate = APIEndpoint{"POST", "arvados/v1/users", "user"}
LinkGet(ctx context.Context, options GetOptions) (Link, error)
LinkList(ctx context.Context, options ListOptions) (LinkList, error)
LinkDelete(ctx context.Context, options DeleteOptions) (Link, error)
+ LogCreate(ctx context.Context, options CreateOptions) (Log, error)
+ LogUpdate(ctx context.Context, options UpdateOptions) (Log, error)
+ LogGet(ctx context.Context, options GetOptions) (Log, error)
+ LogList(ctx context.Context, options ListOptions) (LogList, error)
+ LogDelete(ctx context.Context, options DeleteOptions) (Log, error)
SpecimenCreate(ctx context.Context, options CreateOptions) (Specimen, error)
SpecimenUpdate(ctx context.Context, options UpdateOptions) (Specimen, error)
SpecimenGet(ctx context.Context, options GetOptions) (Specimen, error)
// Space characters are trimmed when reading the settings file, so
// these are equivalent:
//
-// ARVADOS_API_HOST=localhost\n
-// ARVADOS_API_HOST=localhost\r\n
-// ARVADOS_API_HOST = localhost \n
-// \tARVADOS_API_HOST = localhost\n
+// ARVADOS_API_HOST=localhost\n
+// ARVADOS_API_HOST=localhost\r\n
+// ARVADOS_API_HOST = localhost \n
+// \tARVADOS_API_HOST = localhost\n
func NewClientFromEnv() *Client {
vars := map[string]string{}
home := os.Getenv("HOME")
// Convert an arbitrary struct to url.Values. For example,
//
-// Foo{Bar: []int{1,2,3}, Baz: "waz"}
+// Foo{Bar: []int{1,2,3}, Baz: "waz"}
//
// becomes
//
-// url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+// url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
//
// params itself is returned if it is already an url.Values.
func anythingToValues(params interface{}) (url.Values, error) {
Enable bool
Users map[string]TestUser
}
- LoginCluster string
- RemoteTokenRefresh Duration
- TokenLifetime Duration
- TrustedClients map[string]struct{}
- IssueTrustedTokens bool
+ LoginCluster string
+ RemoteTokenRefresh Duration
+ TokenLifetime Duration
+ TrustedClients map[URL]struct{}
+ TrustPrivateNetworks bool
+ IssueTrustedTokens bool
}
Mail struct {
MailchimpAPIKey string
}
func (su URL) MarshalText() ([]byte, error) {
- return []byte(fmt.Sprintf("%s", (*url.URL)(&su).String())), nil
+ return []byte(su.String()), nil
}
func (su URL) String() string {
}
Logging struct {
MaxAge Duration
+ SweepInterval Duration
LogBytesPerEvent int
LogSecondsBetweenEvents Duration
LogThrottlePeriod Duration
var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
// UnmarshalJSON does special handling of InstanceTypes:
-// * populate computed fields (Name and Scratch)
-// * error out if InstancesTypes are populated as an array, which was
-// deprecated in Arvados 1.2.0
+//
+// - populate computed fields (Name and Scratch)
+//
+// - error out if InstancesTypes are populated as an array, which was
+// deprecated in Arvados 1.2.0
func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
fixup := func(t InstanceType) (InstanceType, error) {
if t.ProviderType == "" {
package arvados
import (
+ "bytes"
"encoding/json"
"fmt"
"strings"
// UnmarshalJSON implements json.Unmarshaler.
func (d *Duration) UnmarshalJSON(data []byte) error {
+ if bytes.Equal(data, []byte(`"0"`)) || bytes.Equal(data, []byte(`0`)) {
+ // Unitless 0 is not accepted by ParseDuration, but we
+ // accept it as a reasonable spelling of 0
+ // nanoseconds.
+ *d = 0
+ return nil
+ }
if data[0] == '"' {
return d.Set(string(data[1 : len(data)-1]))
}
err = json.Unmarshal([]byte(`{"D":"60s"}`), &d)
c.Check(err, check.IsNil)
c.Check(d.D.Duration(), check.Equals, time.Minute)
+
+ d.D = Duration(time.Second)
+ err = json.Unmarshal([]byte(`{"D":"0"}`), &d)
+ c.Check(err, check.IsNil)
+ c.Check(d.D.Duration(), check.Equals, time.Duration(0))
+
+ d.D = Duration(time.Second)
+ err = json.Unmarshal([]byte(`{"D":0}`), &d)
+ c.Check(err, check.IsNil)
+ c.Check(d.D.Duration(), check.Equals, time.Duration(0))
}
//
// After seeking:
//
-// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
-// ||
-// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
+// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
+// ||
+// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
ptr = startPtr
if ptr.off < 0 {
type Log struct {
ID uint64 `json:"id"`
UUID string `json:"uuid"`
+ OwnerUUID string `json:"owner_uuid"`
ObjectUUID string `json:"object_uuid"`
ObjectOwnerUUID string `json:"object_owner_uuid"`
EventType string `json:"event_type"`
- EventAt *time.Time `json:"event"`
+ EventAt time.Time `json:"event"`
+ Summary string `json:"summary"`
Properties map[string]interface{} `json:"properties"`
- CreatedAt *time.Time `json:"created_at"`
+ CreatedAt time.Time `json:"created_at"`
+ ModifiedAt time.Time `json:"modified_at"`
}
// LogList is an arvados#logList resource.
"docker-image-repo-tag": true,
"filters": true,
"container_request": true,
+ "cwl_input": true,
+ "cwl_output": true,
}
}
"docker-image-repo-tag": true,
"filters": true,
"container_request": true,
+ "cwl_input": true,
+ "cwl_output": true,
},
StrictTags: false,
Tags: map[string]VocabularyTag{
as.appendCall(ctx, as.LinkDelete, options)
return arvados.Link{}, as.Error
}
+func (as *APIStub) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogCreate, options)
+ return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogUpdate, options)
+ return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogGet, options)
+ return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ as.appendCall(ctx, as.LogList, options)
+ return arvados.LogList{}, as.Error
+}
+func (as *APIStub) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogDelete, options)
+ return arvados.Log{}, as.Error
+}
func (as *APIStub) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
as.appendCall(ctx, as.SpecimenCreate, options)
return arvados.Specimen{}, as.Error
Response map[string]interface{} `json:",omitempty"`
ResponseTime json.Number
ClockTime time.Time
+ Server string // "Server" header in http response
Metrics
respTime time.Duration
}
}
result.Health = "OK"
result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
+ result.Server = resp.Header.Get("Server")
return
}
versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
quiet := flags.Bool("quiet", false, "Silent on success (suppress 'health check OK' message on stderr)")
- outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode shows errors as plain text, is silent on success)")
+ outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode prints 'health check OK' or plain text errors)")
if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
// cmd.ParseFlags already reported the error
return errSilent
+++ /dev/null
-#! /usr/bin/perl
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-use strict;
-
-use ExtUtils::MakeMaker;
-
-WriteMakefile(
- NAME => 'Arvados',
- VERSION_FROM => 'lib/Arvados.pm',
- PREREQ_PM => {
- 'JSON' => 0,
- 'LWP' => 0,
- 'Net::SSL' => 0,
- },
-);
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-=head1 NAME
-
-Arvados -- client library for Arvados services
-
-=head1 SYNOPSIS
-
- use Arvados;
- $arv = Arvados->new(apiHost => 'arvados.local');
-
- my $instances = $arv->{'pipeline_instances'}->{'list'}->execute();
- print "UUID is ", $instances->{'items'}->[0]->{'uuid'}, "\n";
-
- $uuid = 'eiv0u-arx5y-2c5ovx43zw90gvh';
- $instance = $arv->{'pipeline_instances'}->{'get'}->execute('uuid' => $uuid);
- print "ETag is ", $instance->{'etag'}, "\n";
-
- $instance->{'active'} = 1;
- $instance->{'name'} = '';
- $instance->save();
- print "ETag is ", $instance->{'etag'}, "\n";
-
-=head1 METHODS
-
-=head2 new()
-
- my $whc = Arvados->new( %OPTIONS );
-
-Set up a client and retrieve the schema from the server.
-
-=head3 Options
-
-=over
-
-=item apiHost
-
-Hostname of API discovery service. Default: C<ARVADOS_API_HOST>
-environment variable, or C<arvados>
-
-=item apiProtocolScheme
-
-Protocol scheme. Default: C<ARVADOS_API_PROTOCOL_SCHEME> environment
-variable, or C<https>
-
-=item authToken
-
-Authorization token. Default: C<ARVADOS_API_TOKEN> environment variable
-
-=item apiService
-
-Default C<arvados>
-
-=item apiVersion
-
-Default C<v1>
-
-=back
-
-=cut
-
-package Arvados;
-
-use Net::SSL (); # From Crypt-SSLeay
-BEGIN {
- $Net::HTTPS::SSL_SOCKET_CLASS = "Net::SSL"; # Force use of Net::SSL
-}
-
-use JSON;
-use Carp;
-use Arvados::ResourceAccessor;
-use Arvados::ResourceMethod;
-use Arvados::ResourceProxy;
-use Arvados::ResourceProxyList;
-use Arvados::Request;
-use Data::Dumper;
-
-$Arvados::VERSION = 0.1;
-
-sub new
-{
- my $class = shift;
- my %self = @_;
- my $self = \%self;
- bless ($self, $class);
- return $self->build(@_);
-}
-
-sub build
-{
- my $self = shift;
-
- $config = load_config_file("$ENV{HOME}/.config/arvados/settings.conf");
-
- $self->{'authToken'} ||=
- $ENV{ARVADOS_API_TOKEN} || $config->{ARVADOS_API_TOKEN};
-
- $self->{'apiHost'} ||=
- $ENV{ARVADOS_API_HOST} || $config->{ARVADOS_API_HOST};
-
- $self->{'noVerifyHostname'} ||=
- $ENV{ARVADOS_API_HOST_INSECURE};
-
- $self->{'apiProtocolScheme'} ||=
- $ENV{ARVADOS_API_PROTOCOL_SCHEME} ||
- $config->{ARVADOS_API_PROTOCOL_SCHEME};
-
- $self->{'ua'} = new Arvados::Request;
-
- my $host = $self->{'apiHost'} || 'arvados';
- my $service = $self->{'apiService'} || 'arvados';
- my $version = $self->{'apiVersion'} || 'v1';
- my $scheme = $self->{'apiProtocolScheme'} || 'https';
- my $uri = "$scheme://$host/discovery/v1/apis/$service/$version/rest";
- my $r = $self->new_request;
- $r->set_uri($uri);
- $r->set_method("GET");
- $r->process_request();
- my $data, $headers;
- my ($status_number, $status_phrase) = $r->get_status();
- $data = $r->get_body() if $status_number == 200;
- $headers = $r->get_headers();
- if ($data) {
- my $doc = $self->{'discoveryDocument'} = JSON::decode_json($data);
- print STDERR Dumper $doc if $ENV{'DEBUG_ARVADOS_API_DISCOVERY'};
- my $k, $v;
- while (($k, $v) = each %{$doc->{'resources'}}) {
- $self->{$k} = Arvados::ResourceAccessor->new($self, $k);
- }
- } else {
- croak "No discovery doc at $uri - $status_number $status_phrase";
- }
- $self;
-}
-
-sub new_request
-{
- my $self = shift;
- local $ENV{'PERL_LWP_SSL_VERIFY_HOSTNAME'};
- if ($self->{'noVerifyHostname'} || ($host =~ /\.local$/)) {
- $ENV{'PERL_LWP_SSL_VERIFY_HOSTNAME'} = 0;
- }
- Arvados::Request->new();
-}
-
-sub load_config_file ($)
-{
- my $config_file = shift;
- my %config;
-
- if (open (CONF, $config_file)) {
- while (<CONF>) {
- next if /^\s*#/ || /^\s*$/; # skip comments and blank lines
- chomp;
- my ($key, $val) = split /\s*=\s*/, $_, 2;
- $config{$key} = $val;
- }
- }
- close CONF;
- return \%config;
-}
-
-1;
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-package Arvados::Request;
-use Data::Dumper;
-use LWP::UserAgent;
-use URI::Escape;
-use Encode;
-use strict;
-@Arvados::HTTP::ISA = qw(LWP::UserAgent);
-
-sub new
-{
- my $class = shift;
- my $self = {};
- bless ($self, $class);
- return $self->_init(@_);
-}
-
-sub _init
-{
- my $self = shift;
- $self->{'ua'} = new LWP::UserAgent(@_);
- $self->{'ua'}->agent ("libarvados-perl/".$Arvados::VERSION);
- $self;
-}
-
-sub set_uri
-{
- my $self = shift;
- $self->{'uri'} = shift;
-}
-
-sub process_request
-{
- my $self = shift;
- my %req;
- my %content;
- my $method = $self->{'method'};
- if ($method eq 'GET' || $method eq 'HEAD') {
- $content{'_method'} = $method;
- $method = 'POST';
- }
- $req{$method} = $self->{'uri'};
- $self->{'req'} = new HTTP::Request (%req);
- $self->{'req'}->header('Authorization' => ('OAuth2 ' . $self->{'authToken'})) if $self->{'authToken'};
- $self->{'req'}->header('Accept' => 'application/json');
-
- # allow_nonref lets us encode JSON::true and JSON::false, see #12078
- my $json = JSON->new->allow_nonref;
- my ($p, $v);
- while (($p, $v) = each %{$self->{'queryParams'}}) {
- $content{$p} = (ref($v) eq "") ? $v : $json->encode($v);
- }
- my $content;
- while (($p, $v) = each %content) {
- $content .= '&' unless $content eq '';
- $content .= uri_escape($p);
- $content .= '=';
- $content .= uri_escape($v);
- }
- $self->{'req'}->content_type("application/x-www-form-urlencoded; charset='utf8'");
- $self->{'req'}->content(Encode::encode('utf8', $content));
- $self->{'res'} = $self->{'ua'}->request ($self->{'req'});
-}
-
-sub get_status
-{
- my $self = shift;
- return ($self->{'res'}->code(),
- $self->{'res'}->message());
-}
-
-sub get_body
-{
- my $self = shift;
- return $self->{'res'}->content;
-}
-
-sub set_method
-{
- my $self = shift;
- $self->{'method'} = shift;
-}
-
-sub set_query_params
-{
- my $self = shift;
- $self->{'queryParams'} = shift;
-}
-
-sub set_auth_token
-{
- my $self = shift;
- $self->{'authToken'} = shift;
-}
-
-sub get_headers
-{
- ""
-}
-
-1;
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-package Arvados::ResourceAccessor;
-use Carp;
-use Data::Dumper;
-
-sub new
-{
- my $class = shift;
- my $self = {};
- bless ($self, $class);
-
- $self->{'api'} = shift;
- $self->{'resourcesName'} = shift;
- $self->{'methods'} = $self->{'api'}->{'discoveryDocument'}->{'resources'}->{$self->{'resourcesName'}}->{'methods'};
- my $method_name, $method;
- while (($method_name, $method) = each %{$self->{'methods'}}) {
- $self->{$method_name} = Arvados::ResourceMethod->new($self, $method);
- }
- $self;
-}
-
-1;
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-package Arvados::ResourceMethod;
-use Carp;
-use Data::Dumper;
-
-sub new
-{
- my $class = shift;
- my $self = {};
- bless ($self, $class);
- return $self->_init(@_);
-}
-
-sub _init
-{
- my $self = shift;
- $self->{'resourceAccessor'} = shift;
- $self->{'method'} = shift;
- return $self;
-}
-
-sub execute
-{
- my $self = shift;
- my $method = $self->{'method'};
-
- my $path = $method->{'path'};
-
- my %body_params;
- my %given_params = @_;
- my %extra_params = %given_params;
- my %method_params = %{$method->{'parameters'}};
- if ($method->{'request'}->{'properties'}) {
- while (my ($prop_name, $prop_value) =
- each %{$method->{'request'}->{'properties'}}) {
- if (ref($prop_value) eq 'HASH' && $prop_value->{'$ref'}) {
- $method_params{$prop_name} = { 'type' => 'object' };
- }
- }
- }
- while (my ($param_name, $param) = each %method_params) {
- delete $extra_params{$param_name};
- if ($param->{'required'} && !exists $given_params{$param_name}) {
- croak("Required parameter not supplied: $param_name");
- }
- elsif ($param->{'location'} eq 'path') {
- $path =~ s/{\Q$param_name\E}/$given_params{$param_name}/eg;
- }
- elsif (!exists $given_params{$param_name}) {
- ;
- }
- elsif ($param->{'type'} eq 'object') {
- my %param_value;
- my ($p, $v);
- if (exists $param->{'properties'}) {
- while (my ($property_name, $property) =
- each %{$param->{'properties'}}) {
- # if the discovery doc specifies object structure,
- # convert to true/false depending on supplied type
- if (!exists $given_params{$param_name}->{$property_name}) {
- ;
- }
- elsif (!defined $given_params{$param_name}->{$property_name}) {
- $param_value{$property_name} = JSON::null;
- }
- elsif ($property->{'type'} eq 'boolean') {
- $param_value{$property_name} = $given_params{$param_name}->{$property_name} ? JSON::true : JSON::false;
- }
- else {
- $param_value{$property_name} = $given_params{$param_name}->{$property_name};
- }
- }
- }
- else {
- while (my ($property_name, $property) =
- each %{$given_params{$param_name}}) {
- if (ref $property eq '' || $property eq undef) {
- $param_value{$property_name} = $property;
- }
- elsif (ref $property eq 'HASH') {
- $param_value{$property_name} = {};
- while (my ($k, $v) = each %$property) {
- $param_value{$property_name}->{$k} = $v;
- }
- }
- }
- }
- $body_params{$param_name} = \%param_value;
- } elsif ($param->{'type'} eq 'boolean') {
- $body_params{$param_name} = $given_params{$param_name} ? JSON::true : JSON::false;
- } else {
- $body_params{$param_name} = $given_params{$param_name};
- }
- }
- if (%extra_params) {
- croak("Unsupported parameter(s) passed to API call /$path: \"" . join('", "', keys %extra_params) . '"');
- }
- my $r = $self->{'resourceAccessor'}->{'api'}->new_request;
- my $base_uri = $self->{'resourceAccessor'}->{'api'}->{'discoveryDocument'}->{'baseUrl'};
- $base_uri =~ s:/$::;
- $r->set_uri($base_uri . "/" . $path);
- $r->set_method($method->{'httpMethod'});
- $r->set_auth_token($self->{'resourceAccessor'}->{'api'}->{'authToken'});
- $r->set_query_params(\%body_params) if %body_params;
- $r->process_request();
- my $data, $headers;
- my ($status_number, $status_phrase) = $r->get_status();
- if ($status_number != 200) {
- croak("API call /$path failed: $status_number $status_phrase\n". $r->get_body());
- }
- $data = $r->get_body();
- $headers = $r->get_headers();
- my $result = JSON::decode_json($data);
- if ($method->{'response'}->{'$ref'} =~ /List$/) {
- Arvados::ResourceProxyList->new($result, $self->{'resourceAccessor'});
- } else {
- Arvados::ResourceProxy->new($result, $self->{'resourceAccessor'});
- }
-}
-
-1;
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-package Arvados::ResourceProxy;
-
-sub new
-{
- my $class = shift;
- my $self = shift;
- $self->{'resourceAccessor'} = shift;
- bless ($self, $class);
- $self;
-}
-
-sub save
-{
- my $self = shift;
- $response = $self->{'resourceAccessor'}->{'update'}->execute('uuid' => $self->{'uuid'}, $self->resource_parameter_name() => $self);
- foreach my $param (keys %$self) {
- if (exists $response->{$param}) {
- $self->{$param} = $response->{$param};
- }
- }
- $self;
-}
-
-sub update_attributes
-{
- my $self = shift;
- my %updates = @_;
- $response = $self->{'resourceAccessor'}->{'update'}->execute('uuid' => $self->{'uuid'}, $self->resource_parameter_name() => \%updates);
- foreach my $param (keys %updates) {
- if (exists $response->{$param}) {
- $self->{$param} = $response->{$param};
- }
- }
- $self;
-}
-
-sub reload
-{
- my $self = shift;
- $response = $self->{'resourceAccessor'}->{'get'}->execute('uuid' => $self->{'uuid'});
- foreach my $param (keys %$self) {
- if (exists $response->{$param}) {
- $self->{$param} = $response->{$param};
- }
- }
- $self;
-}
-
-sub resource_parameter_name
-{
- my $self = shift;
- my $pname = $self->{'resourceAccessor'}->{'resourcesName'};
- $pname =~ s/s$//; # XXX not a very good singularize()
- $pname;
-}
-
-1;
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-package Arvados::ResourceProxyList;
-
-sub new
-{
- my $class = shift;
- my $self = {};
- bless ($self, $class);
- $self->_init(@_);
-}
-
-sub _init
-{
- my $self = shift;
- $self->{'serverResponse'} = shift;
- $self->{'resourceAccessor'} = shift;
- $self->{'items'} = [ map { Arvados::ResourceProxy->new($_, $self->{'resourceAccessor'}) } @{$self->{'serverResponse'}->{'items'}} ];
- $self;
-}
-
-1;
This package is one part of the Arvados source package, and it has
integration tests to check interoperability with other Arvados
components. Our `hacking guide
-<https://arvados.org/projects/arvados/wiki/Hacking_Python_SDK>`_
+<https://dev.arvados.org/projects/arvados/wiki/Hacking_Python_SDK>`_
describes how to set up a development environment and run tests.
+"""Utilities to retry operations.
+
+The core of this module is `RetryLoop`, a utility class to retry operations
+that might fail. It can distinguish between temporary and permanent failures;
+provide exponential backoff; and save a series of results.
+
+It also provides utility functions for common operations with `RetryLoop`:
+
+* `check_http_response_success` can be used as a `RetryLoop` `success_check`
+ for HTTP response codes from the Arvados API server.
+* `retry_method` can decorate methods to provide a default `num_retries`
+ keyword argument.
+"""
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
class RetryLoop(object):
"""Coordinate limited retries of code.
- RetryLoop coordinates a loop that runs until it records a
+ `RetryLoop` coordinates a loop that runs until it records a
successful result or tries too many times, whichever comes first.
Typical use looks like:
loop.save_result(result)
if loop.success():
return loop.last_result()
+
+ Arguments:
+
+ num_retries: int
+ : The maximum number of times to retry the loop if it
+ doesn't succeed. This means the loop body could run at most
+ `num_retries + 1` times.
+
+ success_check: Callable
+ : This is a function that will be called each
+ time the loop saves a result. The function should return
+ `True` if the result indicates the code succeeded, `False` if it
+ represents a permanent failure, and `None` if it represents a
+ temporary failure. If no function is provided, the loop will
+ end after any result is saved.
+
+ backoff_start: float
+ : The number of seconds that must pass before the loop's second
+ iteration. Default 0, which disables all waiting.
+
+ backoff_growth: float
+ : The wait time multiplier after each iteration.
+ Default 2 (i.e., double the wait time each time).
+
+ save_results: int
+ : Specify a number to store that many saved results from the loop.
+ These are available through the `results` attribute, oldest first.
+ Default 1.
+
+ max_wait: float
+ : Maximum number of seconds to wait between retries. Default 60.
"""
def __init__(self, num_retries, success_check=lambda r: True,
backoff_start=0, backoff_growth=2, save_results=1,
max_wait=60):
- """Construct a new RetryLoop.
-
- Arguments:
- * num_retries: The maximum number of times to retry the loop if it
- doesn't succeed. This means the loop could run at most 1+N times.
- * success_check: This is a function that will be called each
- time the loop saves a result. The function should return
- True if the result indicates loop success, False if it
- represents a permanent failure state, and None if the loop
- should continue. If no function is provided, the loop will
- end as soon as it records any result.
- * backoff_start: The number of seconds that must pass before the
- loop's second iteration. Default 0, which disables all waiting.
- * backoff_growth: The wait time multiplier after each iteration.
- Default 2 (i.e., double the wait time each time).
- * save_results: Specify a number to save the last N results
- that the loop recorded. These records are available through
- the results attribute, oldest first. Default 1.
- * max_wait: Maximum number of seconds to wait between retries.
- """
self.tries_left = num_retries + 1
self.check_result = success_check
self.backoff_wait = backoff_start
self._success = None
def __iter__(self):
+ """Return an iterator of retries."""
return self
def running(self):
+ """Return whether this loop is running.
+
+ Returns `None` if the loop has never run, `True` if it is still running,
+ or `False` if it has stopped—whether that's because it has saved a
+ successful result, a permanent failure, or has run out of retries.
+ """
return self._running and (self._success is None)
def __next__(self):
+ """Record a loop attempt.
+
+ If the loop is still running, decrements the number of tries left and
+ returns it. Otherwise, raises `StopIteration`.
+ """
if self._running is None:
self._running = True
if (self.tries_left < 1) or not self.running():
"""Record a loop result.
Save the given result, and end the loop if it indicates
- success or permanent failure. See __init__'s documentation
- about success_check to learn how to make that indication.
+ success or permanent failure. See documentation for the `__init__`
+ `success_check` argument to learn how that's indicated.
+
+ Raises `arvados.errors.AssertionError` if called after the loop has
+ already ended.
+
+ Arguments:
+
+ result: Any
+ : The result from this loop attempt to check and save.
"""
if not self.running():
raise arvados.errors.AssertionError(
def success(self):
"""Return the loop's end state.
- Returns True if the loop obtained a successful result, False if it
- encountered permanent failure, or else None.
+ Returns `True` if the loop recorded a successful result, `False` if it
+ recorded permanent failure, or else `None`.
"""
return self._success
def last_result(self):
- """Return the most recent result the loop recorded."""
+ """Return the most recent result the loop saved.
+
+ Raises `arvados.errors.AssertionError` if called before any result has
+ been saved.
+ """
try:
return self.results[-1]
except IndexError:
"queried loop results before any were recorded")
def attempts(self):
- """Return the number of attempts that have been made.
+ """Return the number of results that have been saved.
- Includes successes and failures."""
+ This count includes all kinds of results: success, permanent failure,
+ and temporary failure.
+ """
return self._attempts
def attempts_str(self):
- """Human-readable attempts(): 'N attempts' or '1 attempt'"""
+ """Return a human-friendly string counting saved results.
+
+ This method returns '1 attempt' or 'N attempts', where the number
+ in the string is the number of saved results.
+ """
if self._attempts == 1:
return '1 attempt'
else:
def check_http_response_success(status_code):
- """Convert an HTTP status code to a loop control flag.
+ """Convert a numeric HTTP status code to a loop control flag.
- Pass this method a numeric HTTP status code. It returns True if
- the code indicates success, None if it indicates temporary
- failure, and False otherwise. You can use this as the
- success_check for a RetryLoop.
+ This method takes a numeric HTTP status code and returns `True` if
+ the code indicates success, `None` if it indicates temporary
+ failure, and `False` otherwise. You can use this as the
+ `success_check` for a `RetryLoop` that queries the Arvados API server.
+ Specifically:
- Implementation details:
- * Any 2xx result returns True.
- * A select few status codes, or any malformed responses, return None.
+ * Any 2xx result returns `True`.
+
+ * A select few status codes, or any malformed responses, return `None`.
422 Unprocessable Entity is in this category. This may not meet the
letter of the HTTP specification, but the Arvados API server will
use it for various server-side problems like database connection
errors.
- * Everything else returns False. Note that this includes 1xx and
+
+ * Everything else returns `False`. Note that this includes 1xx and
3xx status codes. They don't indicate success, and you can't
retry those requests verbatim.
+
+ Arguments:
+
+ status_code: int
+ : A numeric HTTP response code
"""
if status_code in _HTTP_SUCCESSES:
return True
"""Provide a default value for a method's num_retries argument.
This is a decorator for instance and class methods that accept a
- num_retries argument, with a None default. When the method is called
- without a value for num_retries, it will be set from the underlying
- instance or class' num_retries attribute.
+ `num_retries` keyword argument, with a `None` default. When the method
+ is called without a value for `num_retries`, this decorator will set it
+ from the `num_retries` attribute of the underlying instance or class.
+
+ Arguments:
+
+ orig_func: Callable
+ : A class or instance method that accepts a `num_retries` keyword argument
"""
@functools.wraps(orig_func)
def num_retries_setter(self, *args, **kwargs):
install_requires=[
'ciso8601 >=2.0.0',
'future',
+ 'google-api-core <2.11.0', # 2.11.0rc1 is incompatible with google-auth<2
'google-api-python-client >=1.6.2, <2',
'google-auth<2',
'httplib2 >=0.9.2, <0.20.2',
'pycurl >=7.19.5.1, <7.45.0',
- 'ruamel.yaml >=0.15.54, <0.17.11',
+ 'ruamel.yaml >=0.15.54, <0.17.22',
'setuptools',
'ws4py >=0.4.2',
- 'protobuf<4.0.0dev'
+ 'protobuf<4.0.0dev',
+ 'pyparsing<3',
+ 'setuptools>=40.3.0',
],
classifiers=[
'Programming Language :: Python :: 3',
"GitInternalDir": os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'internal.git'),
},
"LocalKeepBlobBuffersPerVCPU": 0,
+ "Logging": {
+ "SweepInterval": 0, # disable, otherwise test cases can't acquire dblock
+ },
"SupportedDockerImageFormats": {"v1": {}},
"ShellAccess": {
"Admin": True,
end
# Verify that a given manifest is valid according to
- # https://arvados.org/projects/arvados/wiki/Keep_manifest_format
+ # https://dev.arvados.org/projects/arvados/wiki/Keep_manifest_format
def self.validate! manifest
raise ArgumentError.new "No manifest found" if !manifest
oj (3.9.2)
optimist (3.0.0)
os (1.1.1)
- passenger (6.0.2)
+ passenger (6.0.15)
rack
rake (>= 0.8.1)
pg (1.1.4)
def norm url
# normalize URL for comparison
url = URI(url.to_s)
- if url.scheme == "https"
- url.port == "443"
- end
- if url.scheme == "http"
- url.port == "80"
+ if url.scheme == "https" && url.port == ""
+ url.port = "443"
+ elsif url.scheme == "http" && url.port == ""
+ url.port = "80"
end
url.path = "/"
url
conn.exec_query 'SAVEPOINT save_with_unique_name'
begin
save!
+ conn.exec_query 'RELEASE SAVEPOINT save_with_unique_name'
rescue ActiveRecord::RecordNotUnique => rn
raise if max_retries == 0
max_retries -= 1
- conn.exec_query 'ROLLBACK TO SAVEPOINT save_with_unique_name'
-
# Dig into the error to determine if it is specifically calling out a
# (owner_uuid, name) uniqueness violation. In this specific case, and
# the client requested a unique name with ensure_unique_name==true,
detail = err.result.error_field(PG::Result::PG_DIAG_MESSAGE_DETAIL)
raise unless /^Key \(owner_uuid, name\)=\([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}, .*?\) already exists\./.match detail
+ conn.exec_query 'ROLLBACK TO SAVEPOINT save_with_unique_name'
+
new_name = "#{name_was} (#{db_current_time.utc.iso8601(3)})"
if new_name == name
# If the database is fast enough to do two attempts in the
self[:current_version_uuid] = nil
end
end
- conn.exec_query 'SAVEPOINT save_with_unique_name'
+
retry
- ensure
- conn.exec_query 'RELEASE SAVEPOINT save_with_unique_name'
end
end
end
# delete oid_login_perms for this user
#
- # note: these permission links are obsolete, they have no effect
- # on anything and they are not created for new users.
+ # note: these permission links are obsolete anyway: they have no
+ # effect on anything and they are not created for new users.
Link.where(tail_uuid: self.email,
link_class: 'permission',
name: 'can_login').destroy_all
- # delete repo_perms for this user
- Link.where(tail_uuid: self.uuid,
- link_class: 'permission',
- name: 'can_manage').destroy_all
-
- # delete vm_login_perms for this user
- Link.where(tail_uuid: self.uuid,
- link_class: 'permission',
- name: 'can_login').destroy_all
-
- # delete "All users" group read permissions for this user
+ # Delete all sharing permissions so (a) the user doesn't
+ # automatically regain access to anything if re-setup in future,
+ # (b) the user doesn't appear in "currently shared with" lists
+ # shown to other users.
+ #
+ # Notably this includes the can_read -> "all users" group
+ # permission.
Link.where(tail_uuid: self.uuid,
- head_uuid: all_users_group_uuid,
link_class: 'permission').destroy_all
# delete any signatures by this user
# from the logs table.
namespace :db do
- desc "Remove old container log entries from the logs table"
+ desc "deprecated / no-op"
task delete_old_container_logs: :environment do
- delete_sql = "DELETE FROM logs WHERE id in (SELECT logs.id FROM logs JOIN containers ON logs.object_uuid = containers.uuid WHERE event_type IN ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat') AND containers.log IS NOT NULL AND now() - containers.finished_at > interval '#{Rails.configuration.Containers.Logging.MaxAge.to_i} seconds')"
-
- ActiveRecord::Base.connection.execute(delete_sql)
+ Rails.logger.info "this db:delete_old_container_logs rake task is no longer used"
end
end
"Expected 'duplicate key' error in #{response_errors.first}")
end
+ [false, true].each do |ensure_unique_name|
+ test "create failure with duplicate name, ensure_unique_name #{ensure_unique_name}" do
+ authorize_with :active
+ post :create, params: {
+ collection: {
+ owner_uuid: users(:active).uuid,
+ manifest_text: "",
+ name: "this...............................................................................................................................................................................................................................................................name is too long"
+ },
+ ensure_unique_name: ensure_unique_name
+ }
+ assert_response 422
+ # check the real error isn't masked by an
+ # ensure_unique_name-related error (#19698)
+ assert_match /value too long for type/, json_response['errors'][0]
+ end
+ end
+
[false, true].each do |unsigned|
test "create with duplicate name, ensure_unique_name, unsigned=#{unsigned}" do
permit_unsigned_manifests unsigned
ApiClientAuthorization.create!(user: User.find_by_uuid(created['uuid']), api_client: ApiClient.all.first).api_token
end
+ # share project and collections with the new user
+ act_as_system_user do
+ Link.create!(tail_uuid: created['uuid'],
+ head_uuid: groups(:aproject).uuid,
+ link_class: 'permission',
+ name: 'can_manage')
+ Link.create!(tail_uuid: created['uuid'],
+ head_uuid: collections(:collection_owned_by_active).uuid,
+ link_class: 'permission',
+ name: 'can_read')
+ Link.create!(tail_uuid: created['uuid'],
+ head_uuid: collections(:collection_owned_by_active_with_file_stats).uuid,
+ link_class: 'permission',
+ name: 'can_write')
+ end
+
assert_equal 1, ApiClientAuthorization.where(user_id: User.find_by_uuid(created['uuid']).id).size, 'expected token not found'
post "/arvados/v1/users/#{created['uuid']}/unsetup", params: {}, headers: auth(:admin)
assert_not_nil created2['uuid'], 'expected uuid for the newly created user'
assert_equal created['uuid'], created2['uuid'], 'expected uuid not found'
assert_equal 0, ApiClientAuthorization.where(user_id: User.find_by_uuid(created['uuid']).id).size, 'token should have been deleted by user unsetup'
+ # check permissions are deleted
+ assert_empty Link.where(tail_uuid: created['uuid'])
verify_link_existence created['uuid'], created['email'], false, false, false, false, false
end
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-require 'test_helper'
-require 'rake'
-
-Rake.application.rake_require "tasks/delete_old_container_logs"
-Rake::Task.define_task(:environment)
-
-class DeleteOldContainerLogsTaskTest < ActiveSupport::TestCase
- TASK_NAME = "db:delete_old_container_logs"
-
- def log_uuids(*fixture_names)
- fixture_names.map { |name| logs(name).uuid }
- end
-
- def run_with_expiry(clean_after)
- Rails.configuration.Containers.Logging.MaxAge = clean_after
- Rake::Task[TASK_NAME].reenable
- Rake.application.invoke_task TASK_NAME
- end
-
- def check_log_existence(test_method, fixture_uuids)
- uuids_now = Log.where("object_uuid LIKE :pattern AND event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat')", pattern: "%-dz642-%").map(&:uuid)
- fixture_uuids.each do |expect_uuid|
- send(test_method, uuids_now, expect_uuid)
- end
- end
-
- test "delete all finished logs" do
- uuids_to_keep = log_uuids(:stderr_for_running_container,
- :crunchstat_for_running_container)
- uuids_to_clean = log_uuids(:stderr_for_previous_container,
- :crunchstat_for_previous_container,
- :stderr_for_ancient_container,
- :crunchstat_for_ancient_container)
- run_with_expiry(1)
- check_log_existence(:assert_includes, uuids_to_keep)
- check_log_existence(:refute_includes, uuids_to_clean)
- end
-
- test "delete old finished logs" do
- uuids_to_keep = log_uuids(:stderr_for_running_container,
- :crunchstat_for_running_container,
- :stderr_for_previous_container,
- :crunchstat_for_previous_container)
- uuids_to_clean = log_uuids(:stderr_for_ancient_container,
- :crunchstat_for_ancient_container)
- run_with_expiry(360.days)
- check_log_existence(:assert_includes, uuids_to_keep)
- check_log_existence(:refute_includes, uuids_to_clean)
- end
-end
"time"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
type Dispatcher struct {
*dispatch.Dispatcher
- logger logrus.FieldLogger
- cluster *arvados.Cluster
- sqCheck *SqueueChecker
- slurm Slurm
+ logger logrus.FieldLogger
+ cluster *arvados.Cluster
+ sqCheck *SqueueChecker
+ slurm Slurm
+ dbConnector ctrlctx.DBConnector
done chan struct{}
err error
disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
disp.Client.AuthToken = disp.cluster.SystemRootToken
disp.Client.Insecure = disp.cluster.TLS.Insecure
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.cluster.PostgreSQL}
if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
// Copy real configs into env vars so [a]
}
func (disp *Dispatcher) run() error {
+ dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
+ defer dblock.Dispatch.Unlock()
defer disp.sqCheck.Stop()
if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
version = "dev"
)
+type logger interface {
+ Printf(string, ...interface{})
+}
+
func main() {
reporter := crunchstat.Reporter{
Logger: log.New(os.Stderr, "crunchstat: ", 0),
reporter.Logger.Printf("crunchstat %s started", version)
if reporter.CgroupRoot == "" {
- reporter.Logger.Fatal("error: must provide -cgroup-root")
+ reporter.Logger.Printf("error: must provide -cgroup-root")
+ os.Exit(2)
} else if signalOnDeadPPID < 0 {
- reporter.Logger.Fatalf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+ reporter.Logger.Printf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+ os.Exit(2)
}
reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond
if status, ok := err.Sys().(syscall.WaitStatus); ok {
os.Exit(status.ExitStatus())
} else {
- reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
+ reporter.Logger.Printf("ExitError without WaitStatus: %v", err)
+ os.Exit(1)
}
} else if err != nil {
- reporter.Logger.Fatalln("error in cmd.Wait:", err)
+ reporter.Logger.Printf("error running command: %v", err)
+ os.Exit(1)
}
}
-func runCommand(argv []string, logger *log.Logger) error {
+func runCommand(argv []string, logger logger) error {
cmd := exec.Command(argv[0], argv[1:]...)
- logger.Println("Running", argv)
+ logger.Printf("Running %v", argv)
// Child process will use our stdin and stdout pipes
// (we close our copies below)
if cmd.Process != nil {
cmd.Process.Signal(catch)
}
- logger.Println("notice: caught signal:", catch)
+ logger.Printf("notice: caught signal: %v", catch)
}(sigChan)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)
// Funnel stderr through our channel
stderrPipe, err := cmd.StderrPipe()
if err != nil {
- logger.Fatalln("error in StderrPipe:", err)
+ logger.Printf("error in StderrPipe: %v", err)
+ return err
}
// Run subprocess
if err := cmd.Start(); err != nil {
- logger.Fatalln("error in cmd.Start:", err)
+ logger.Printf("error in cmd.Start: %v", err)
+ return err
}
// Close stdin/stdout in this (parent) process
os.Stdin.Close()
os.Stdout.Close()
- copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+ err = copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+ if err != nil {
+ cmd.Process.Kill()
+ return err
+ }
return cmd.Wait()
}
-func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger *log.Logger) {
+func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger logger) {
ticker := time.NewTicker(intvl)
for range ticker.C {
ppid := os.Getppid()
}
}
-func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
+func copyPipeToChildLog(in io.ReadCloser, logger logger) error {
reader := bufio.NewReaderSize(in, MaxLogLine)
var prefix string
for {
if err == io.EOF {
break
} else if err != nil {
- logger.Fatal("error reading child stderr:", err)
+ return fmt.Errorf("error reading child stderr: %w", err)
}
var suffix string
if isPrefix {
suffix = "[...]"
}
- logger.Print(prefix, string(line), suffix)
+ logger.Printf("%s%s%s", prefix, string(line), suffix)
// Set up prefix for following line
if isPrefix {
prefix = "[...]"
prefix = ""
}
}
- in.Close()
+ return in.Close()
}
This package is one part of the Arvados source package, and it has
integration tests to check interoperability with other Arvados
components. Our `hacking guide
-<https://arvados.org/projects/arvados/wiki/Hacking_Python_SDK>`_
+<https://dev.arvados.org/projects/arvados/wiki/Hacking_Python_SDK>`_
describes how to set up a development environment and run tests.
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
// subsequent balance operation.
//
// Run should only be called once on a given Balancer object.
-//
-// Typical usage:
-//
-// runOptions, err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
nextRunOptions = runOptions
+ ctxlog.FromContext(ctx).Info("acquiring active lock")
+ if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
+ // context canceled
+ return
+ }
+ defer dblock.KeepBalanceActive.Unlock()
+
defer bal.time("sweep", "wall clock time to run one full sweep")()
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
defer cancel()
var lbFile *os.File
import (
"bytes"
+ "context"
"encoding/json"
"fmt"
"io"
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err = srv.runOnce()
+ _, err = srv.runOnce(context.Background())
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err := srv.runOnce()
+ _, err := srv.runOnce(context.Background())
c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err := srv.runOnce()
+ _, err := srv.runOnce(context.Background())
c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
c.Assert(err, check.IsNil)
- _, err = srv.runOnce()
+ _, err = srv.runOnce(context.Background())
c.Check(err, check.IsNil)
lost, err := ioutil.ReadFile(lostf.Name())
c.Assert(err, check.IsNil)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- bal, err := srv.runOnce()
+ bal, err := srv.runOnce(context.Background())
c.Check(err, check.IsNil)
for _, req := range collReqs.reqs {
c.Check(req.Form.Get("include_trash"), check.Equals, "true")
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- bal, err := srv.runOnce()
+ bal, err := srv.runOnce(context.Background())
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 8)
c.Check(pullReqs.Count(), check.Equals, 4)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- stop := make(chan interface{})
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
srv := s.newServer(&opts)
done := make(chan bool)
go func() {
- srv.runForever(stop)
+ srv.runForever(ctx)
close(done)
}()
for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
time.Sleep(time.Millisecond)
}
- stop <- true
+ cancel()
<-done
c.Check(pullReqs.Count() >= 16, check.Equals, true)
c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
import (
"bytes"
+ "context"
"io"
"os"
"strings"
Logger: logger,
Metrics: newMetrics(prometheus.NewRegistry()),
}
- nextOpts, err := bal.Run(s.client, s.config, opts)
+ nextOpts, err := bal.Run(context.Background(), s.client, s.config, opts)
c.Check(err, check.IsNil)
c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
c.Check(nextOpts.CommitPulls, check.Equals, true)
Routes: health.Routes{"ping": srv.CheckHealth},
}
- go srv.run()
+ go srv.run(ctx)
return srv
}).RunCommand(prog, args, stdin, stdout, stderr)
}
package keepbalance
import (
+ "context"
"net/http"
"os"
"os/signal"
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
return nil
}
-func (srv *Server) run() {
+func (srv *Server) run(ctx context.Context) {
var err error
if srv.RunOptions.Once {
- _, err = srv.runOnce()
+ _, err = srv.runOnce(ctx)
} else {
- err = srv.runForever(nil)
+ err = srv.runForever(ctx)
}
if err != nil {
srv.Logger.Error(err)
}
}
-func (srv *Server) runOnce() (*Balancer, error) {
+func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
bal := &Balancer{
DB: srv.DB,
Logger: srv.Logger,
LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
}
var err error
- srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
+ srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
return bal, err
}
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) runForever(stop <-chan interface{}) error {
+// RunForever runs forever, or until ctx is cancelled.
+func (srv *Server) runForever(ctx context.Context) error {
logger := srv.Logger
ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
sigUSR1 := make(chan os.Signal)
signal.Notify(sigUSR1, syscall.SIGUSR1)
+ logger.Info("acquiring service lock")
+ dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
+ defer dblock.KeepBalanceService.Unlock()
+
logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
for {
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
- _, err := srv.runOnce()
+ if !dblock.KeepBalanceService.Check() {
+ // context canceled
+ return nil
+ }
+ _, err := srv.runOnce(ctx)
if err != nil {
logger.Print("run failed: ", err)
} else {
}
select {
- case <-stop:
+ case <-ctx.Done():
signal.Stop(sigUSR1)
return nil
case <-ticker.C:
return errors.New("DriverParameters: RaceWindow must not be negative")
}
- var ok bool
- v.region, ok = aws.Regions[v.Region]
if v.Endpoint == "" {
+ r, ok := aws.Regions[v.Region]
if !ok {
return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
}
- } else if ok {
- return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
- "specify empty endpoint or use a different region name", v.Region, v.Endpoint)
+ v.region = r
} else {
v.region = aws.Region{
Name: v.Region,
// aws-sdk-go based on the UseAWSS3v2Driver feature flag
func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
+ // Default value will be overriden if it happens to be defined in the config
+ v.S3VolumeDriverParameters.UseAWSS3v2Driver = true
err := json.Unmarshal(volume.DriverParameters, v)
if err != nil {
return nil, err
if v.Endpoint != "" && service == "s3" {
return aws.Endpoint{
URL: v.Endpoint,
- SigningRegion: v.Region,
+ SigningRegion: region,
}, nil
} else if service == "ec2metadata" && ec2metadataHostname != "" {
return aws.Endpoint{
URL: ec2metadataHostname,
}, nil
+ } else {
+ return defaultResolver.ResolveEndpoint(service, region)
}
-
- return defaultResolver.ResolveEndpoint(service, region)
}
cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
}
-
+ if v.Region == "" {
+ // Endpoint is already specified (otherwise we would
+ // have errored out above), but Region is also
+ // required by the aws sdk, in order to determine
+ // SignatureVersions.
+ v.Region = "us-east-1"
+ }
cfg.Region = v.Region
// Zero timeouts mean "wait forever", which is a bad
# If present, use the one associated with rails workbench or API
BUNDLER=$PWD/bin/bundle
fi
+
+ if test -z "$(flock $GEMLOCK /var/lib/arvados/bin/gem list | grep 'arvados[[:blank:]].*[0-9.]*dev')" ; then
+ (cd /usr/src/arvados/sdk/ruby && \
+ /var/lib/arvados/bin/gem build arvados.gemspec && flock $GEMLOCK /var/lib/arvados/bin/gem install $(ls -1 *.gem | sort -r | head -n1))
+ fi
if ! flock $GEMLOCK $BUNDLER install --verbose --local --no-deployment $frozen "$@" ; then
flock $GEMLOCK $BUNDLER install --verbose --no-deployment $frozen "$@"
fi