i18n.fallbacks: true
active_support.deprecation: :notify
profiling_enabled: false
+ log_level: info
arvados_insecure_https: false
end
def run_test_server
- env_script = nil
Dir.chdir PYTHON_TESTS_DIR do
- # These are no-ops if we're running within run-tests.sh (except
- # that we do get a useful env_script back from "start", even
- # though it doesn't need to start up a new server).
- env_script = check_output %w(python ./run_test_server.py start --auth admin)
- check_output %w(python ./run_test_server.py start_arv-git-httpd)
- check_output %w(python ./run_test_server.py start_keep-web)
- check_output %w(python ./run_test_server.py start_nginx)
- # This one isn't a no-op, even under run-tests.sh.
check_output %w(python ./run_test_server.py start_keep)
end
- test_env = {}
- env_script.each_line do |line|
- line = line.chomp
- if 0 == line.index('export ')
- toks = line.sub('export ', '').split '=', 2
- $stderr.puts "run_test_server.py: #{toks[0]}=#{toks[1]}"
- test_env[toks[0]] = toks[1]
- end
- end
- test_env
end
def stop_test_server
Dir.chdir PYTHON_TESTS_DIR do
check_output %w(python ./run_test_server.py stop_keep)
- # These are no-ops if we're running within run-tests.sh
- check_output %w(python ./run_test_server.py stop_nginx)
- check_output %w(python ./run_test_server.py stop_arv-git-httpd)
- check_output %w(python ./run_test_server.py stop_keep-web)
- check_output %w(python ./run_test_server.py stop)
end
@@server_is_running = false
end
stop_test_server
end
- test_env = run_test_server
- $application_config['arvados_login_base'] = "https://#{test_env['ARVADOS_API_HOST']}/login"
- $application_config['arvados_v1_base'] = "https://#{test_env['ARVADOS_API_HOST']}/arvados/v1"
+ run_test_server
+ $application_config['arvados_login_base'] = "https://#{ENV['ARVADOS_API_HOST']}/login"
+ $application_config['arvados_v1_base'] = "https://#{ENV['ARVADOS_API_HOST']}/arvados/v1"
$application_config['arvados_insecure_host'] = true
ActiveSupport::TestCase.reset_application_config
all|futures|3.0.5|2|python|all
all|future|0.16.0|2|python|all
all|future|0.16.0|2|python3|all
+all|mypy-extensions|0.3.0|1|python|all
go get github.com/kardianos/govendor
package_go_binary cmd/arvados-client arvados-client \
"Arvados command line tool (beta)"
+package_go_binary cmd/arvados-server arvados-server \
+ "Arvados server daemons"
+package_go_binary cmd/arvados-server arvados-controller \
+ "Arvados cluster controller daemon"
package_go_binary sdk/go/crunchrunner crunchrunner \
"Crunchrunner executes a command inside a container and uploads the output"
package_go_binary services/arv-git-httpd arvados-git-httpd \
# Arvados SDK and the SDK has changed.
declare -a checkdirs=(vendor)
if grep -qr git.curoverse.com/arvados .; then
- checkdirs+=(sdk/go)
- if [[ "$prog" -eq "crunch-dispatch-slurm" ]]; then
- checkdirs+=(lib/dispatchcloud)
- fi
+ checkdirs+=(sdk/go lib)
fi
for dir in ${checkdirs[@]}; do
cd "$GOPATH/src/git.curoverse.com/arvados.git/$dir"
apps/workbench_benchmark
apps/workbench_profile
cmd/arvados-client
+cmd/arvados-server
doc
lib/cli
lib/cmd
+lib/controller
lib/crunchstat
lib/dispatchcloud
services/api
rm -f "$WORKSPACE/tmp/api.pid"
fi
cd "$WORKSPACE" \
- && eval $(python sdk/python/tests/run_test_server.py start --auth admin) \
+ && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo fail=1) \
&& export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
&& export ARVADOS_TEST_API_INSTALLED="$$" \
+ && python sdk/python/tests/run_test_server.py start_controller \
&& python sdk/python/tests/run_test_server.py start_keep_proxy \
&& python sdk/python/tests/run_test_server.py start_keep-web \
&& python sdk/python/tests/run_test_server.py start_arv-git-httpd \
&& python sdk/python/tests/run_test_server.py start_ws \
- && python sdk/python/tests/run_test_server.py start_nginx \
+ && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo fail=1) \
&& (env | egrep ^ARVADOS)
+ if [[ -n "$fail" ]]; then
+ return 1
+ fi
}
stop_services() {
&& python sdk/python/tests/run_test_server.py stop_ws \
&& python sdk/python/tests/run_test_server.py stop_keep-web \
&& python sdk/python/tests/run_test_server.py stop_keep_proxy \
+ && python sdk/python/tests/run_test_server.py stop_controller \
&& python sdk/python/tests/run_test_server.py stop
}
fi
done
+rm -vf "${WORKSPACE}/tmp/*.log"
+
setup_ruby_environment() {
if [[ -s "$HOME/.rvm/scripts/rvm" ]] ; then
source "$HOME/.rvm/scripts/rvm"
gem install --user-install bundler || fatal 'Could not install bundler'
fi
+# Jenkins config requires that glob tmp/*.log match something. Ensure
+# that happens even if we don't end up running services that set up
+# logging.
+mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
+touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
+
retry() {
remain="${repeat}"
while :
;;
esac
if [[ -z "${skip[$suite]}" && -z "${skip[$1]}" && \
- (-z "${only}" || "${only}" == "${suite}" || \
- "${only}" == "${1}") ]]; then
+ (-z "${only}" || "${only}" == "${suite}" || \
+ "${only}" == "${1}") ||
+ "${only}" == "${2}" ]]; then
retry do_test_once ${@}
else
title "Skipping ${1} tests"
declare -a gostuff
gostuff=(
cmd/arvados-client
+ cmd/arvados-server
lib/cli
lib/cmd
+ lib/controller
lib/crunchstat
lib/dispatchcloud
sdk/go/arvados
package main
import (
- "fmt"
- "io"
"os"
- "regexp"
- "runtime"
"git.curoverse.com/arvados.git/lib/cli"
"git.curoverse.com/arvados.git/lib/cmd"
)
var (
- version = "dev"
- cmdVersion cmd.Handler = versionCmd{}
- handler = cmd.Multi(map[string]cmd.Handler{
- "-e": cmdVersion,
- "version": cmdVersion,
- "-version": cmdVersion,
- "--version": cmdVersion,
+ version = "dev"
+ handler = cmd.Multi(map[string]cmd.Handler{
+ "-e": cmd.Version(version),
+ "version": cmd.Version(version),
+ "-version": cmd.Version(version),
+ "--version": cmd.Version(version),
"copy": cli.Copy,
"create": cli.Create,
})
)
-type versionCmd struct{}
-
-func (versionCmd) RunCommand(prog string, args []string, _ io.Reader, stdout, _ io.Writer) int {
- prog = regexp.MustCompile(` -*version$`).ReplaceAllLiteralString(prog, "")
- fmt.Fprintf(stdout, "%s %s (%s)\n", prog, version, runtime.Version())
- return 0
-}
-
func fixLegacyArgs(args []string) []string {
flags, _ := cli.LegacyFlagSet()
return cmd.SubcommandToFront(args, flags)
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=Arvados controller
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-controller
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "os"
+
+ "git.curoverse.com/arvados.git/lib/cmd"
+ "git.curoverse.com/arvados.git/lib/controller"
+)
+
+var (
+ version = "dev"
+ handler = cmd.Multi(map[string]cmd.Handler{
+ "version": cmd.Version(version),
+ "-version": cmd.Version(version),
+ "--version": cmd.Version(version),
+
+ "controller": controller.Command,
+ })
+)
+
+func main() {
+ os.Exit(handler.RunCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
+}
- user/tutorials/tutorial-keep-mount.html.textile.liquid
- user/topics/keep.html.textile.liquid
- user/topics/arv-copy.html.textile.liquid
+ - user/topics/storage-classes.html.textile.liquid
- Running workflows at the command line:
- user/cwl/cwl-runner.html.textile.liquid
- user/cwl/cwl-run-options.html.textile.liquid
- admin/upgrading.html.textile.liquid
- install/cheat_sheet.html.textile.liquid
- user/topics/arvados-sync-groups.html.textile.liquid
+ - admin/storage-classes.html.textile.liquid
+ - admin/activation.html.textile.liquid
- admin/migrating-providers.html.textile.liquid
- admin/merge-remote-account.html.textile.liquid
+ - admin/spot-instances.html.textile.liquid
- install/migrate-docker19.html.textile.liquid
installguide:
- Overview:
- install/index.html.textile.liquid
- Docker quick start:
- install/arvbox.html.textile.liquid
+ - Arvados on Kubernetes:
+ - install/arvados-on-kubernetes.html.textile.liquid
- Manual installation:
- install/install-manual-prerequisites.html.textile.liquid
- install/install-postgresql.html.textile.liquid
- install/install-shell-server.html.textile.liquid
- install/create-standard-objects.html.textile.liquid
- install/install-keepstore.html.textile.liquid
+ - install/configure-fs-storage.html.textile.liquid
+ - install/configure-s3-object-storage.html.textile.liquid
- install/configure-azure-blob-storage.html.textile.liquid
- install/install-keepproxy.html.textile.liquid
- install/install-keep-web.html.textile.liquid
table(table table-bordered table-condensed).
|_. Key|_. Type|_. Description|_. Notes|
|partitions|array of strings|The names of one or more compute partitions that may run this container. If not provided, the system will choose where to run the container.|Optional.|
+|preemptible|boolean|If true, the dispatcher will ask for a preemptible cloud node instance (eg: AWS Spot Instance) to run this container.|Optional. Default is false.|
+|max_run_time|integer|Maximum running time (in seconds) that this container will be allowed to run before being cancelled.|Optional. Default is 0 (no limit).|
--- /dev/null
+---
+layout: default
+navsection: admin
+title: User activation
+...
+
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+This page describes how new users are created and activated.
+
+"Browser login and management of API tokens is described here.":{{site.baseurl}}/api/tokens.html
+
+h3. Authentication
+
+After completing the authentication process, a callback is made from the SSO server to the API server, providing a user record and @identity_url@ (despite the name, this is actually an Arvados user uuid).
+
+The API server searches for a user record with the @identity_url@ supplied by the SSO. If found, that user account will be used, unless the account has @redirect_to_user_uuid@ set, in which case it will use the user in @redirect_to_user_uuid@ instead (this is used for the "link account":{{site.baseurl}}/user/topics/link-accounts.html feature).
+
+Next, it searches by email address for a "pre-activated account.":#pre-activated
+
+If no existing user record is found, a new user object will be created.
+
+A federated user follows a slightly different flow, whereby a special token is presented and the API server verifies user's identity with the home cluster, however it also results in a user object (representing the remote user) being created.
+
+h3. User setup
+
+If @auto_setup_new_users@ is true, as part of creating the new user object, the user is immediately set up with:
+
+* @can_login@ @permission@ link going (email address → user uuid) which records @identity_url_prefix@
+* Membership in the "All users" group (can read all users, all users can see new user)
+* A new git repo and @can_manage@ permission if @auto_setup_new_users_with_repository@ is true
+* @can_login@ permission to a shell node if @auto_setup_new_users_with_vm_uuid@ is set to the uuid of a vm
+
+Otherwise, an admin must explicitly invoke "setup" on the user via workbench or the API.
+
+h3. User activation
+
+A newly created user is inactive (@is_active@ is false) by default unless @new_users_are_active@.
+
+An inactive user cannot create or update any object, but can read Arvados objects that the user account has permission to read. This implies that if @auto_setup_new_users@ is true, an "inactive" user who has been set up may still be able to do things, such as read things shared with "All users", clone and push to the git repository, or login to a VM.
+
+{% comment %}
+Maybe these services should check is_active.
+
+I believe that when this was originally designed, being able to access git and VM required an ssh key, and an inactive user could not register an ssh key because that required creating a record. However, it is now possible to authenticate to shell VMs and http+git with just an API token.
+{% endcomment %}
+
+At this point, there are two ways a user can be activated.
+
+# An admin can set the @is_active@ field directly. This runs @setup_on_activate@ which sets up oid_login_perm and group membership, but does not set repo or vm (even if if @auto_setup_new_users_with_repository@ and/or @auto_setup_new_users_with_vm_uuid@ are set).
+# Self-activation using the @activate@ method of the users controller.
+
+h3. User agreements
+
+The @activate@ method of the users controller checks if the user @is_invited@ and whether the user has "signed" all the user agreements.
+
+@is_invited@ is true if any of these are true:
+* @is_active@ is true
+* @new_users_are_active@ is true
+* the user account has a permission link to read the system "all users" group.
+
+User agreements are accessed by getting a listing on the @user_agreements@ endpoint. This returns a list of collection uuids. This is executed as a system user, so it bypasses normal read permission checks.
+
+The available user agreements are represented in the Links table as
+
+<pre>
+{
+ "link_class": "signature",
+ "name": "require",
+ "tail_uuid": "*system user uuid*",
+ "head_uuid: "*collection uuid*"
+}
+</pre>
+
+The collection contains the user agreement text file.
+
+On workbench, it checks @is_invited@. If true, it displays the clickthrough agreements which the user can "sign". If @is_invited@ is false, the user ends up at the "inactive user" page.
+
+The @user_agreements/sign@ endpoint creates a Link object:
+
+<pre>
+{
+ "link_class": "signature"
+ "name": "click",
+ "tail_uuid": "*user uuid*",
+ "head_uuid: "*collection uuid*"
+}
+</pre>
+
+This is executed as a system user, so it bypasses the restriction that inactive users cannot create objects.
+
+The @user_agreements/signatures@ endpoint returns the list of Link objects that represent signatures by the current user (created by @sign@).
+
+h3. User profile
+
+The user profile is checked by workbench after checking if user agreements need to be signed. The requirement to fill out the user profile is not enforced by the API server.
+
+h3(#pre-activated). Pre-activate user by email address
+
+You may create a user account for a user that has not yet logged in, and identify the user by email address.
+
+1. As an admin, create a user object:
+
+<pre>
+{
+ "email": "foo@example.com",
+ "username": "barney",
+ "is_active": true
+}
+</pre>
+
+2. Create a link object, where @tail_uuid@ is the user's email address, @head_uuid@ is the user object created in the previous step, and @xxxxx@ is the value of @uuid_prefix@ of the SSO server.
+
+<pre>
+{
+ "link_class": "permission",
+ "name": "can_login",
+ "tail_uuid": "email address",
+ "head_uuid: "user uuid",
+ "properties": {
+ "identity_url_prefix": "xxxxx-tpzed-"
+ }
+}
+</pre>
+
+3. When the user logs in the first time, the email address will be recognized and the user will be associated with the linked user object.
+
+h3. Pre-activate federated user
+
+1. As admin, create a user object with the @uuid@ of the federated user (this is the user's uuid on their home cluster):
+
+<pre>
+{
+ "uuid": "home1-tpzed-000000000000000",
+ "email": "foo@example.com",
+ "username": "barney",
+ "is_active": true
+}
+</pre>
+
+2. When the user logs in, they will be associated with the existing user object.
+
+h3. Auto-activate federated users from trusted clusters
+
+In the API server config, configure @auto_activate_users_from@ with a list of one or more five-character cluster ids. A federated user from one of the listed clusters which @is_active@ on the home cluster will be automatically set up and activated on this cluster.
+
+h3(#deactivating_users). Deactivating users
+
+Setting @is_active@ is not sufficient to lock out a user. The user can call @activate@ to become active again. Instead, use @unsetup@:
+
+* Delete oid_login_perms
+* Delete git repository permission links
+* Delete VM login permission links
+* Remove from "All users" group
+* Delete any "signatures"
+* Clear preferences / profile
+* Mark as inactive
+
+{% comment %}
+Does not revoke @is_admin@, so you can't unsetup an admin unless you turn admin off first.
+
+"inactive" does not prevent user from reading things they previously had access to.
+
+Does not revoke API tokens.
+{% endcomment %}
+
+h3. Activation flows
+
+h4. Private instance
+
+Policy: users must be manually approved.
+
+<pre>
+auto_setup_new_users: false
+new_users_are_active: false
+</pre>
+
+# User is created. Not set up. @is_active@ is false.
+# Workbench checks @is_invited@ and finds it is false. User gets "inactive user" page.
+# Admin goes to user page and clicks either "setup user" or manually @is_active@ to true.
+# Clicking "setup user" sets up the user. This includes adding the user to "All users" which qualifies the user as @is_invited@.
+# On refreshing workbench, the user is still inactive, but is able to self-activate after signing clickthrough agreements (if any).
+# Alternately, directly setting @is_active@ to true also sets up the user, but workbench won't display clickthrough agreements (because the user is already active).
+
+h4. Federated instance
+
+Policy: users from other clusters in the federation are activated, users from outside the federation must be manually approved
+
+<pre>
+auto_setup_new_users: false
+new_users_are_active: false
+auto_activate_users_from: [home1]
+</pre>
+
+# Federated user arrives claiming to be from cluster 'home1'
+# API server authenticates user as being from cluster 'home1'
+# Because 'home1' is in @auto_activate_users_from@ the user is set up and activated.
+# User can immediately start using workbench.
+
+h4. Open instance
+
+Policy: anybody who shows up and signs the agreements is activated.
+
+<pre>
+auto_setup_new_users: true
+new_users_are_active: false
+</pre>
+
+# User is created and auto-setup. At this point, @is_active@ is false, but user has been added to "All users" group.
+# Workbench checks @is_invited@ and finds it is true, because the user is a member of "All users" group.
+# Workbench presents user with list of user agreements, user reads and clicks "sign" for each one.
+# Workbench tries to activate user.
+# User is activated.
+
+h4. Developer instance
+
+Policy: avoid wasting developer's time during development/testing
+
+<pre>
+auto_setup_new_users: true
+new_users_are_active: true
+</pre>
+
+# User is created, immediately auto-setup, and auto-activated.
+# User can immediately start using workbench.
--- /dev/null
+---
+layout: default
+navsection: admin
+title: Using AWS Spot instances
+...
+
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+This page describes how to set up the system to take advantage of "Amazon's EC2 spot instances":https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-spot-instances.html.
+
+h3. Nodemanager
+
+Nodemanager should have configured cloud sizes that include the @preemptible@ boolean parameter. For example, for every on-demand cloud node size, you could create a @.spot@ variant, like this:
+
+<pre>
+[Size m4.large]
+cores = 2
+scratch = 32000
+
+[Size m4.large.spot]
+cores = 2
+instance_type = m4.large
+preemptible = true
+scratch = 32000
+</pre>
+
+h3. Slurm dispatcher
+
+The @crunch-dispatch-slurm@ service needs a matching instance type configuration on @/etc/arvados/config.yml@, following the previous example:
+
+<pre>
+Clusters:
+ uuid_prefix:
+ InstanceTypes:
+ - Name: m4.large
+ VCPUs: 2
+ RAM: 7782000000
+ Scratch: 32000000000
+ Price: 0.1
+ - Name: m4.large.spot
+ Preemptible: true
+ VCPUs: 2
+ RAM: 7782000000
+ Scratch: 32000000000
+ Price: 0.1
+</pre>
+
+@InstanceType@ names should match those defined on nodemanager's config file because it's @crunch-dispatch-slurm@'s job to select the instance type and communicate the decision to @nodemanager@ via Slurm.
+
+h3. API Server
+
+Container requests will need the @preemptible@ scheduling parameter included, to make the dispatcher request a spot instance. The API Server configuration file includes an option that when active, will auto assign the @preemptible@ parameter to any new child container request if it doesn't have it already. To activate this feature, the following should be added to the @application.yml@ file:
+
+<pre>
+preemptible_instances: true
+</pre>
+
+With this configuration active, child container requests should include the @preemptible = false@ parameter at creation time to avoid being scheduled for spot instance usage.
+
+h3. AWS Permissions
+
+When requesting spot instances, Amazon's API may return an authorization error depending on how users and permissions are set on the account. If this is the case check nodemanager's log for:
+
+<pre>
+BaseHTTPError: AuthFailure.ServiceLinkedRoleCreationNotPermitted: The provided credentials do not have permission to create the service-linked role for EC2 Spot Instances.
+</pre>
+
+The account needs to have a service linked role created. This can be done by logging into the AWS account, go to _IAM Management_ → _Roles_ and create the @AWSServiceRoleForEC2Spot@ role by clicking on the @Create@ button, selecting @EC2@ service and @EC2 - Spot Instances@ use case.
+
+h3. Cost Tracking
+
+Amazon's Spot instances prices are declared at instance request time and defined by the maximum price that the user is willing to pay per hour. By default, this price is the same amount as the on-demand version of each instance type, and this setting is the one that nodemanager uses for now, as it doesn't include any pricing data to the spot instance request.
+
+The real price that a spot instance has at any point in time is discovered at the end of each usage hour, depending on instance demand. For this reason, AWS provides a data feed subscription to get hourly logs, as described on "Amazon's User Guide":https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-data-feeds.html.
\ No newline at end of file
--- /dev/null
+---
+layout: default
+navsection: admin
+title: Configuring storage classes
+...
+
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Storage classes (alternately known as "storage tiers") allow you to control which volumes should be used to store particular collection data blocks. This can be used to implement data storage policies such as moving data to archival storage.
+
+The storage classes for each volume are set in the per-volume "keepstore configuration":{{site.baseurl}}/install/install-keepstore.html
+
+<pre>
+Volumes:
+ - ... Volume configuration ...
+ #
+ # If no storage classes are specified, will use [default]
+ #
+ StorageClasses: null
+
+ - ... Volume configuration ...
+ #
+ # Specify this volume is in the "archival" storage class.
+ #
+ StorageClasses: [archival]
+
+</pre>
+
+Names of storage classes are internal to the cluster and decided by the administrator. Aside from "default", Arvados currently does not define any standard storage class names.
+
+h3. Using storage classes
+
+"Discussed in the user guide":{{site.baseurl}}/user/topics/storage-classes.html
+
+h3. Storage management notes
+
+The "keep-balance":{{site.baseurl}}/install/install-keep-balance.html service is responsible for deciding which blocks should be placed on which keepstore volumes. As part of the rebalancing behavior, it will determine where a block should go in order to satisfy the desired storage classes, and issue pull requests to copy the block from its original volume to the desired volume. The block will subsequently be moved to trash on the original volume.
+
+If a block appears in multiple collections with different storage classes, the block will be stored in separate volumes for each storage class, even if that results in overreplication, unless there is a volume which has all the desired storage classes.
+
+If a collection has a desired storage class which is not available in any keepstore volume, the collection's blocks will remain in place, and an error will appear in the @keep-balance@ logs.
+
+This feature does not provide a hard guarantee on where data will be stored. Data may be written to default storage and moved to the desired storage class later. If controlling data locality is a hard requirement (such as legal restrictions on the location of data) we recommend setting up multiple Arvados clusters.
The "browser authentication process is documented in detail on the Arvados wiki.":https://dev.arvados.org/projects/arvados/wiki/Workbench_authentication_process
+h2. User activation
+
+"Creation and activation of new users is described here.":{{site.baseurl}}/admin/activation.html
+
h2. Creating tokens via the API
The browser login method above issues a new token. Using that token, it is possible to make API calls to create additional tokens. To do so, use the @create@ method of the "API client authorizations":{{site.baseurl}}/api/methods/api_client_authorizations.html resource.
--- /dev/null
+---
+layout: default
+navsection: installguide
+title: Arvados on Kubernetes - Google Kubernetes Engine
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+This page documents the setup of the prerequisites to run the "Arvados on Kubernetes":/install/arvados-on-kubernetes.html @Helm@ chart on @Google Kubernetes Engine@ (GKE).
+
+h3. Install tooling
+
+Install @gcloud@:
+
+* Follow the instructions at "https://cloud.google.com/sdk/downloads":https://cloud.google.com/sdk/downloads
+
+Install @kubectl@:
+
+<pre>
+$ gcloud components install kubectl
+</pre>
+
+Install @helm@:
+
+* Follow the instructions at "https://docs.helm.sh/using_helm/#installing-helm":https://docs.helm.sh/using_helm/#installing-helm
+
+h3. Boot the GKE cluster
+
+This can be done via the "cloud console":https://console.cloud.google.com/kubernetes/ or via the command line:
+
+<pre>
+$ gcloud container clusters create <CLUSTERNAME> --zone us-central1-a --machine-type n1-standard-2 --cluster-version 1.10
+</pre>
+
+It takes a few minutes for the cluster to be initialized.
+
+h3. Reserve a static IP
+
+Reserve a "static IP":https://console.cloud.google.com/networking/addresses in GCE. Make sure the IP is in the same region as your GKE cluster, and is of the "Regional" type.
+
+h3. Connect to the GKE cluster.
+
+Via the web:
+* Click the "Connect" button next to your "GKE cluster"https://console.cloud.google.com/kubernetes/.
+* Execute the "Command-line access" command on your development machine.
+
+Alternatively, use this command:
+
+<pre>
+$ gcloud container clusters get-credentials <CLUSTERNAME> --zone us-central1-a --project <YOUR-PROJECT>
+</pre>
+
+Test the connection:
+
+<pre>
+$ kubectl get nodes
+</pre>
+
+Now proceed to the "Initialize helm on the Kubernetes cluster":/install/arvados-on-kubernetes.html#helm section.
--- /dev/null
+---
+layout: default
+navsection: installguide
+title: Arvados on Kubernetes - Minikube
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+This page documents the setup of the prerequisites to run the "Arvados on Kubernetes":/install/arvados-on-kubernetes.html @Helm@ chart on @Minikube@.
+
+h3. Install tooling
+
+Install @kubectl@:
+
+* Follow the instructions at "https://kubernetes.io/docs/tasks/tools/install-kubectl/":https://kubernetes.io/docs/tasks/tools/install-kubectl/
+
+Install @helm@:
+
+* Follow the instructions at "https://docs.helm.sh/using_helm/#installing-helm":https://docs.helm.sh/using_helm/#installing-helm
+
+h3. Install Minikube
+
+Follow the instructions at "https://kubernetes.io/docs/setup/minikube/":https://kubernetes.io/docs/setup/minikube/
+
+Test the connection:
+
+<pre>
+$ kubectl get nodes
+</pre>
+
+Now proceed to the "Initialize helm on the Kubernetes cluster":/install/arvados-on-kubernetes.html#helm section.
--- /dev/null
+---
+layout: default
+navsection: installguide
+title: Arvados on Kubernetes
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Arvados on Kubernetes is implemented as a Helm Chart.
+
+{% include 'notebox_begin_warning' %}
+This Helm Chart does not retain any state after it is deleted. An Arvados cluster created with this Helm Chart is entirely ephemeral, and all data stored on the cluster will be deleted when it is shut down. This will be fixed in a future version.
+{% include 'notebox_end' %}
+
+h2(#overview). Overview
+
+This Helm Chart provides a basic, small Arvados cluster.
+
+Current limitations, to be addressed in the future:
+
+* An Arvados cluster created with this Helm Chart is entirely ephemeral, and all data stored on the cluster will be deleted when it is shut down.
+* No dynamic scaling of compute nodes (but you can adjust @values.yaml@ and "reload the Helm Chart":#reload
+* All compute nodes are the same size
+* Compute nodes have no cpu/memory/disk constraints yet
+* No git server
+
+h2. Requirements
+
+* Kubernetes 1.10+ cluster with at least 3 nodes, 2 or more cores per node
+* @kubectl@ and @helm@ installed locally, and able to connect to your Kubernetes cluster
+
+If you do not have a Kubernetes cluster already set up, you can use "Google Kubernetes Engine":/install/arvados-on-kubernetes-GKE.html for multi-node development and testing or "another Kubernetes solution":https://kubernetes.io/docs/setup/pick-right-solution/. Minikube is not supported yet.
+
+h2(#helm). Initialize helm on the Kubernetes cluster
+
+If you already have helm running on the Kubernetes cluster, proceed directly to "Start the Arvados cluster":#Start below.
+
+<pre>
+$ helm init
+$ kubectl create serviceaccount --namespace kube-system tiller
+$ kubectl create clusterrolebinding tiller-cluster-rule --clusterrole=cluster-admin --serviceaccount=kube-system:tiller
+$ kubectl patch deploy --namespace kube-system tiller-deploy -p '{"spec":{"template":{"spec":{"serviceAccount":"tiller"}}}}'
+</pre>
+
+Test @helm@ by running
+
+<pre>
+$ helm ls
+</pre>
+
+There should be no errors. The command will return nothing.
+
+h2(#git). Clone the repository
+
+Clone the repository and nagivate to the @arvados-kubernetes/charts/arvados@ directory:
+
+<pre>
+$ git clone https://github.com/curoverse/arvados-kubernetes.git
+$ cd arvados-kubernetes/charts/arvados
+</pre>
+
+h2(#Start). Start the Arvados cluster
+
+Next, determine the IP address that the Arvados cluster will use to expose its API, Workbench, etc. If you want this Arvados cluster to be reachable from places other than the local machine, the IP address will need to be routable as appropriate.
+
+<pre>
+$ ./cert-gen.sh <IP ADDRESS>
+</pre>
+
+The @values.yaml@ file contains a number of variables that can be modified. At a minimum, review and/or modify the values for
+
+<pre>
+ adminUserEmail
+ adminUserPassword
+ superUserSecret
+ anonymousUserSecret
+</pre>
+
+Now start the Arvados cluster:
+
+<pre>
+$ helm install --name arvados . --set externalIP=<IP ADDRESS>
+</pre>
+
+At this point, you can use kubectl to see the Arvados cluster boot:
+
+<pre>
+$ kubectl get pods
+$ kubectl get svc
+</pre>
+
+After a few minutes, you can access Arvados Workbench at the IP address specified
+
+* https://<IP ADDRESS>
+
+with the username and password specified in the @values.yaml@ file.
+
+Alternatively, use the Arvados cli tools or SDKs:
+
+Set the environment variables:
+
+<pre>
+$ export ARVADOS_API_TOKEN=<superUserSecret from values.yaml>
+$ export ARVADOS_API_HOST=<STATIC IP>:444
+$ export ARVADOS_API_HOST_INSECURE=true
+</pre>
+
+Test access with:
+
+<pre>
+$ arv user current
+</pre>
+
+h2(#reload). Reload
+
+If you make changes to the Helm Chart (e.g. to @values.yaml@), you can reload Arvados with
+
+<pre>
+$ helm upgrade arvados .
+</pre>
+
+h2. Shut down
+
+{% include 'notebox_begin_warning' %}
+This Helm Chart does not retain any state after it is deleted. An Arvados cluster created with this Helm Chart is entirely ephemeral, and <strong>all data stored on the Arvados cluster will be deleted</strong> when it is shut down. This will be fixed in a future version.
+{% include 'notebox_end' %}
+
+<pre>
+$ helm del arvados --purge
+</pre>
SPDX-License-Identifier: CC-BY-SA-3.0
{% endcomment %}
-As an alternative to local and network-attached POSIX filesystems, Keepstore can store data in an Azure Storage container.
+Keepstore can store data in one or more Azure Storage containers.
-h2. Create a container
+h2. Set up VMs and Storage Accounts
-Normally, all keepstore services are configured to share a single Azure Storage container.
+Before starting the configuration of individual keepstore servers is good to have an idea of the keepstores servers' final layout. One key decision is the amount of servers and type of VM to run. Azure may change over time the bandwith capacity of each type. After conducting some empirical saturation tests, the conclusion was that the bandwith is proportional to the amount of cores with some exceptions. As a rule of thumb, is better to invest resources in more cores instead of memory or IOps.
+
+Another decision is how many VMs should be running keepstore. For example there could be 8 VMs with one core each or one machine with 8 cores. Or anything in between. Assuming is the same cost for Cloud resources, there is always the benefit of distributing the risk of faulty VMs. The recommendation is to start with 2 VMs and expand in pairs. Having a minimum of 2 cores each. The total amount of VMs will be a function of the budget and the pipeline traffic to avoid saturation during periods of high usage. Standard D v3 family is a balanced choice, making Standard_D2_v3 the 2-core option
+
+There are many options for storage accounts. You can read details from Azure on their documentation https://docs.microsoft.com/en-us/azure/storage/common/storage-introduction. The type of storage and access tier will be a function of the budget and desired responsiveness. A balanced option is to have General-purpose Standard Storage account and use Blob storage, hot access tiers.
+
+Keepstore can be configure to reflect the level of underlaying redundancy the storage will have. This is call data replication option. For example LRS (Locally Redundant Storage) saves 3 copies of the data. There desired redundancy can be chosen at the keepstore layer or at the Storage Accunt layer. The decision where the redundancy will be done and the type of Storage Account data replication (LRS, ZRS, GRS and RA-GRS) has trade-offs. Please read more on https://docs.microsoft.com/en-us/azure/storage/common/storage-redundancy and decide what is best for your needs.
+
+h2. Create a storage container
Using the Azure web portal or command line tool, create or choose a storage account with a suitable redundancy profile and availability region. Use the storage account keys to create a new container.
</code></pre>
</notextile>
+Note that Keepstore services may be configued to use multiple Azure Storage accounts and multiple containers within a storage account.
+
h2. Configure keepstore
Copy the primary storage account key to a file where it will be accessible to keepstore at startup time.
<notextile>
-<pre><code>~$ <span class="userinput">sudo sh -c 'cat >/etc/sv/keepstore/exampleStorageAccountName.key <<EOF'
+<pre><code>~$ <span class="userinput">sudo sh -c 'cat >/etc/arvados/keepstore/azure_storage_account_key.txt <<EOF'
zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz==
EOF</span>
-~$ <span class="userinput">sudo chmod 0400 /etc/sv/keepstore/exampleStorageAccountName.key</span>
+~$ <span class="userinput">sudo chmod 0400 /etc/arvados/keepstore/azure_storage_account_key.txt</span>
</code></pre>
</notextile>
-In your keepstore startup script, instead of specifying a local storage using @-volume /path@ or discovering mount points automatically, use @-azure-*@ arguments to specify the storage container:
+Next, edit the @Volumes@ section of the @keepstore.yml@ config file:
-<notextile>
-<pre><code>#!/bin/sh
+<pre>
+Volumes:
+- # The volume type, this indicates Azure blob storage
+ Type: Azure
-exec 2>&1
-exec keepstore \
- -azure-storage-account-key-file <span class="userinput">/etc/sv/keepstore/exampleStorageAccountName.key</span> \
- -azure-storage-account-name <span class="userinput">exampleStorageAccountName</span> \
- -azure-storage-container-volume <span class="userinput">exampleContainerName</span>
-</code></pre>
-</notextile>
+ # How much replication is performed by the underlying container.
+ # This is used to inform replication decisions at the Keep layer.
+ AzureReplication: 3
-Start (or restart) keepstore, and check its log file to confirm it is using the new configuration.
+ # The storage container to use for the backing store.
+ ContainerName: exampleContainerName
-<notextile>
-<pre><code>2015/10/26 21:06:24 Using volume azure-storage-container:"exampleContainerName" (writable=true)
-</code></pre>
-</notextile>
+ # If true, do not accept write or trash operations, only reads.
+ ReadOnly: false
+
+ # Amount of time to wait for a response before failing the request
+ RequestTimeout: 2m0s
+
+ # The storage account name, used for authentication
+ StorageAccountName: exampleStorageAccountName
+
+ # The storage account secret key, used for authentication
+ StorageAccountKeyFile: /etc/arvados/keepstore/azure_storage_account_key.txt
+
+ # The cloud environment to use. If blank, use the default cloud
+ # environment. See below for an example of an alternate cloud environment.
+ StorageBaseURL: ""
+
+ # Storage classes to associate with this volume. See "Storage
+ # classes" in the "Admin" section of doc.arvados.org.
+ StorageClasses: null
+
+- # Example configuration to use Azure China.
+ #
+ # The alternate cloud environment to use.
+ # Note that cloud environments are different from regions. A
+ # cloud environment is an entirely separate instance of Azure with
+ # separate accounts, requiring separate credentials.
+ #
+ StorageBaseURL: core.chinacloudapi.cn
+ StorageAccountKeyFile: /etc/arvados/keepstore/azure_cn_storage_account_key.txt
+ StorageAccountName: cn-account-name
+ ContainerName: exampleChinaContainerName
+
+ # The rest are the same as above
+ Type: Azure
+ AzureReplication: 3
+ ReadOnly: false
+ RequestTimeout: 10m0s
+ StorageClasses: null
+</pre>
+
+Start (or restart) keepstore, and check its log file to confirm it is using the new configuration.
--- /dev/null
+---
+layout: default
+navsection: installguide
+title: Filesystem storage
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Keepstore can store data in local and network-attached POSIX filesystems.
+
+h2. Setting up filesystem mounts
+
+Volumes are configured in the @Volumes@ section of the configuration file. You may provide multiple volumes for a single keepstore process to manage multiple disks. Keepstore distributes blocks among volumes in round-robin fashion.
+
+<pre>
+Volumes:
+- # The volume type, indicates this is a filesystem directory.
+ Type: Directory
+
+ # The directory that will be used as the backing store.
+ Root: /mnt/local-disk
+
+ # How much replication is performed by the underlying filesystem.
+ # (for example, a network filesystem may provide its own replication).
+ # This is used to inform replication decisions at the Keep layer.
+ DirectoryReplication: 1
+
+ # If true, do not accept write or trash operations, only reads.
+ ReadOnly: false
+
+ # When true, read and write operations (for whole 64MiB blocks) on
+ # an individual volume will queued and issued serially. When
+ # false, read and write operations will be issued concurrently.
+ #
+ # May improve throughput if you experience contention when there are
+ # multiple requests to the same volume.
+ #
+ # When using SSDs, RAID, or a parallel network filesystem, you probably
+ # don't want this.
+ Serialize: false
+
+ # Storage classes to associate with this volume. See "Storage
+ # classes" in the "Admin" section of doc.arvados.org.
+ StorageClasses: null
+
+ # Example of a second volume section
+- DirectoryReplication: 2
+ ReadOnly: false
+ Root: /mnt/network-disk
+ Serialize: false
+ StorageClasses: null
+ Type: Directory
+</pre>
--- /dev/null
+---
+layout: default
+navsection: installguide
+title: Configure S3 object storage
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Keepstore can store data in object storage compatible with the S3 API, such as Amazon S3, Google Cloud Storage, or Ceph RADOS.
+
+h2. Configure keepstore
+
+Copy the "access key" and "secret key" to files where they will be accessible to keepstore at startup time.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sh -c 'cat >/etc/arvados/keepstore/aws_s3_access_key.txt <<EOF'
+zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz==
+EOF</span>
+~$ <span class="userinput">sudo chmod 0400 /etc/arvados/keepstore/aws_s3_access_key.txt</span>
+</code></pre>
+</notextile>
+
+Next, edit the @Volumes@ section of the @keepstore.yml@ config file.
+
+h3. Example config for Amazon S3
+
+<pre>
+Volumes:
+- # The volume type, this indicates object storage compatible with the S3 API
+ Type: S3
+
+ # Storage provider. If blank, uses Amazon S3 by default.
+ # See below for example alternate configuration for Google cloud
+ # storage.
+ Endpoint: ""
+
+ # The bucket to use for the backing store.
+ Bucket: example-bucket-name
+
+ # The region where the bucket is located.
+ Region: us-east-1
+
+ # The credentials to use to access the bucket.
+ AccessKeyFile: /etc/arvados/keepstore/aws_s3_access_key.txt
+ SecretKeyFile: /etc/arvados/keepstore/aws_s3_secret_key.txt
+
+ # Maximum time to wait making the initial connection to the backend before
+ # failing the request.
+ ConnectTimeout: 1m0s
+
+ # Page size for s3 "list bucket contents" requests
+ IndexPageSize: 1000
+
+ # True if the region requires a LocationConstraint declaration
+ LocationConstraint: false
+
+ # Maximum eventual consistency latency
+ RaceWindow: 24h0m0s
+
+ # If true, do not accept write or trash operations, only reads.
+ ReadOnly: false
+
+ # Maximum time to wait for a complete response from the backend before
+ # failing the request.
+ ReadTimeout: 2m0s
+
+ # How much replication is performed by the underlying bucket.
+ # This is used to inform replication decisions at the Keep layer.
+ S3Replication: 2
+
+ # Storage classes to associate with this volume. See
+ # "Storage classes" in the "Admin" section of doc.arvados.org.
+ StorageClasses: null
+
+ # Enable deletion (garbage collection) even when TrashLifetime is
+ # zero. WARNING: eventual consistency may result in race conditions
+ # that can cause data loss. Do not enable this unless you know what
+ # you are doing.
+ UnsafeDelete: false
+</pre>
+
+Start (or restart) keepstore, and check its log file to confirm it is using the new configuration.
+
+h3. Example config for Google cloud storage
+
+See previous section for documentation of configuration fields.
+
+<pre>
+Volumes:
+- # Example configuration using alternate storage provider
+ # Configuration for Google cloud storage
+ Endpoint: https://storage.googleapis.com
+ Region: ""
+
+ AccessKeyFile: /etc/arvados/keepstore/gce_s3_access_key.txt
+ SecretKeyFile: /etc/arvados/keepstore/gce_s3_secret_key.txt
+ Bucket: example-bucket-name
+ ConnectTimeout: 1m0s
+ IndexPageSize: 1000
+ LocationConstraint: false
+ RaceWindow: 24h0m0s
+ ReadOnly: false
+ ReadTimeout: 2m0s
+ S3Replication: 2
+ StorageClasses: null
+ UnsafeDelete: false
+</pre>
+
+Start (or restart) keepstore, and check its log file to confirm it is using the new configuration.
---
layout: default
navsection: installguide
-title: Installation overview
+title: Installation options
...
{% comment %}
Copyright (C) The Arvados Authors. All rights reserved.
Arvados components run on GNU/Linux systems, and do not depend on any particular cloud operating stack. Arvados supports Debian and derivatives such as Ubuntu, as well as Red Hat and derivatives such as CentOS.
-Arvados components can be installed and configured in a number of different ways. Step-by-step instructions are available to perform a production installation from packages with manual configuration. This method assumes you have several (virtual) machines at your disposal for running the various Arvados components.
+Arvados components can be installed and configured in a number of different ways.
-* "Docker quick start":arvbox.html
-* "Manual installation":install-manual-prerequisites.html
+<div class="offset1">
+table(table table-bordered table-condensed).
+||||\6=. _Appropriate for_|
+||_Ease of installation_|_Multiuser/Networked_|_Workflow Development_|_Workflow Testing_|_Large Scale Production_|_Developing Arvados_|_Arvados Software Development Testing_|
+|"Arvados-in-a-box":arvbox.html (arvbox)|Easy|no|no|no|no|yes|yes|
+|"Arvados on Kubernetes":arvados-on-kubernetes.html|Easy ^1^|yes|no ^2^|no ^2^|no ^2^|no|yes|
+|"Manual installation":install-manual-prerequisites.html|Complex|yes|yes|yes|yes|no|no|
+|"Cloud demo":https://cloud.curoverse.com by Veritas Genetics|N/A ^3^|yes|no|no|no|no|no|
+|"Cluster Operation Subscription":https://curoverse.com/products by Veritas Genetics|N/A ^3^|yes|yes|yes|yes|yes|yes|
+</div>
+
+* ^1^ Assumes a Kubernetes cluster is available
+* ^2^ Arvados on Kubernetes is under development and not yet ready for production use
+* ^3^ No installation necessary, Veritas Genetics run and managed
SPDX-License-Identifier: CC-BY-SA-3.0
{% endcomment %}
-We are going to install two Keepstore servers. By convention, we use the following hostname pattern:
+Keepstore provides access to underlying storage for reading and writing content-addressed blocks, with enforcement of Arvados permissions. Keepstore supports a variety of cloud object storage and POSIX filesystems for its backing store.
+
+We recommend starting off with two Keepstore servers. Exact server specifications will be site and workload specific, but in general keepstore will be I/O bound and should be set up to maximize aggregate bandwidth with compute nodes. To increase capacity (either space or throughput) it is straightforward to add additional servers, or (in cloud environments) to increase the machine size of the existing servers.
+
+By convention, we use the following hostname pattern:
<div class="offset1">
table(table table-bordered table-condensed).
|keep1.@uuid_prefix@.your.domain|
</div>
-Because the Keepstore servers are not directly accessible from the internet, these hostnames only need to resolve on the local network.
+Keepstore servers should not be directly accessible from the Internet (they are accessed via "keepproxy":install-keepproxy.html), so the hostnames only need to resolve on the private network.
h2. Install Keepstore
Verify that Keepstore is functional:
<notextile>
-<pre><code>~$ <span class="userinput">keepstore -h</span>
-2016/07/01 14:06:21 keepstore starting, pid 32339
-Usage of ./keepstore:
- -azure-max-get-bytes int
- Maximum bytes to request in a single GET request. If smaller than 67108864, use multiple concurrent range requests to retrieve a block. (default 67108864)
- -azure-storage-account-key-file string
- File containing the account key used for subsequent --azure-storage-container-volume arguments.
- -azure-storage-account-name string
- Azure storage account name used for subsequent --azure-storage-container-volume arguments.
- -azure-storage-container-volume value
- Use the given container as a storage volume. Can be given multiple times. (default [])
- -azure-storage-replication int
- Replication level to report to clients when data is stored in an Azure container. (default 3)
- -blob-signature-ttl int
- Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. See services/api/config/application.default.yml. (default 1209600)
- -blob-signing-key-file string
- File containing the secret key for generating and verifying blob permission signatures.
- -data-manager-token-file string
- File with the API token used by the Data Manager. All DELETE requests or GET /index requests must carry this token.
- -enforce-permissions
- Enforce permission signatures on requests.
- -listen string
- Listening address, in the form "host:port". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces. (default ":25107")
- -max-buffers int
- Maximum RAM to use for data buffers, given in multiples of block size (64 MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released. (default 128)
- -max-requests int
- Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)
- -never-delete
- If true, nothing will be deleted. Warning: the relevant features in keepstore and data manager have not been extensively tested. You should leave this option alone unless you can afford to lose data. (default true)
- -permission-key-file string
- Synonym for -blob-signing-key-file.
- -permission-ttl int
- Synonym for -blob-signature-ttl.
- -pid fuser -k pidfile
- Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so fuser -k pidfile is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.
- -readonly
- Do not write, delete, or touch anything on the following volumes.
- -s3-access-key-file string
- File containing the access key used for subsequent -s3-bucket-volume arguments.
- -s3-bucket-volume value
- Use the given bucket as a storage volume. Can be given multiple times. (default [])
- -s3-endpoint string
- Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use "https://storage.googleapis.com".
- -s3-region string
- AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are ["ap-southeast-1" "eu-west-1" "us-gov-west-1" "sa-east-1" "cn-north-1" "ap-northeast-1" "ap-southeast-2" "eu-central-1" "us-east-1" "us-west-1" "us-west-2"].
- -s3-replication int
- Replication level reported to clients for subsequent -s3-bucket-volume arguments. (default 2)
- -s3-secret-key-file string
- File containing the secret key used for subsequent -s3-bucket-volume arguments.
- -s3-unsafe-delete
- EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.
- -serialize
- Serialize read and write operations on the following volumes.
- -trash-check-interval duration
- Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day. (default 24h0m0s)
- -trash-lifetime duration
- Time duration after a block is trashed during which it can be recovered using an /untrash request
- -volume value
- Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named "keep" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead. (default [])
- -volumes value
- Deprecated synonym for -volume. (default [])
+<pre><code>~$ <span class="userinput">keepstore --version</span>
</code></pre>
</notextile>
-h3. Prepare storage volumes
-
-{% include 'notebox_begin' %}
-This section uses a local filesystem as a backing store. If you are using Azure Storage, follow the setup instructions on the "Azure Blob Storage":configure-azure-blob-storage.html page instead.
-{% include 'notebox_end' %}
-
-There are two ways to specify a set of local directories where keepstore should store its data files.
-# Implicitly, by creating a directory called @keep@ at the top level of each filesystem you intend to use, and omitting @-volume@ arguments.
-# Explicitly, by providing a @-volume@ argument for each directory.
-
-For example, if there are filesystems mounted at @/mnt@ and @/mnt2@:
-
-<notextile>
-<pre><code>~$ <span class="userinput">mkdir /mnt/keep /mnt2/keep</span>
-~$ <span class="userinput">keepstore</span>
-2015/05/08 13:44:26 keepstore starting, pid 2765
-2015/05/08 13:44:26 Using volume [UnixVolume /mnt/keep] (writable=true)
-2015/05/08 13:44:26 Using volume [UnixVolume /mnt2/keep] (writable=true)
-2015/05/08 13:44:26 listening at :25107
-</code></pre>
-</notextile>
-
-Equivalently:
-
-<notextile>
-<pre><code>~$ <span class="userinput">mkdir /mnt/keep /mnt2/keep</span>
-~$ <span class="userinput">keepstore -volume=/mnt/keep -volume=/mnt2/keep</span>
-2015/05/08 13:44:26 keepstore starting, pid 2765
-2015/05/08 13:44:26 Using volume [UnixVolume /mnt/keep] (writable=true)
-2015/05/08 13:44:26 Using volume [UnixVolume /mnt2/keep] (writable=true)
-2015/05/08 13:44:26 listening at :25107
-</code></pre>
-</notextile>
+h3. Create config file
+
+By default, keepstore will look for its configuration file at @/etc/arvados/keepstore/keepstore.yml@
+
+You can override the configuration file location using the @-config@ command line option to keepstore.
+
+The following is a sample configuration file:
+
+<pre>
+# Duration for which new permission signatures (returned in PUT
+# responses) will be valid. This should be equal to the API
+# server's blob_signature_ttl configuration entry.
+BlobSignatureTTL: 336h0m0s
+
+# Local file containing the secret blob signing key (used to generate
+# and verify blob signatures). The contents of the key file must be
+# identical to the API server's blob_signing_key configuration entry.
+BlobSigningKeyFile: ""
+
+# Print extra debug logging
+Debug: false
+
+# Maximum number of concurrent block deletion operations (per
+# volume) when emptying trash. Default is 1.
+EmptyTrashWorkers: 1
+
+# Enable trash and delete features. If false, trash lists will be
+# accepted but blocks will not be trashed or deleted.
+# Keepstore does not delete data on its own. The keep-balance
+# service determines which blocks are candidates for deletion
+# and instructs the keepstore to move those blocks to the trash.
+EnableDelete: true
+
+# Local port to listen on. Can be 'address:port' or ':port', where
+# 'address' is a host IP address or name and 'port' is a port number
+# or name.
+Listen: :25107
+
+# Format of request/response and error logs: "json" or "text".
+LogFormat: json
+
+# The secret key that must be provided by monitoring services
+# wishing to access the health check endpoint (/_health).
+ManagementToken: ""
+
+# Maximum RAM to use for data buffers, given in multiples of block
+# size (64 MiB). When this limit is reached, HTTP requests requiring
+# buffers (like GET and PUT) will wait for buffer space to be
+# released.
+#
+# It should be set such that MaxBuffers * 64MiB + 10% fits
+# comfortably in memory. On a host dedicated to running keepstore,
+# divide total memory by 88MiB to suggest a suitable value. For example,
+# if grep MemTotal /proc/meminfo reports MemTotal: 7125440 kB,
+# compute 7125440 / (88 * 1024)=79 and configure MaxBuffers: 79
+MaxBuffers: 128
+
+# Maximum concurrent requests. When this limit is reached, new
+# requests will receive 503 responses. Note: this limit does not
+# include idle connections from clients using HTTP keepalive, so it
+# does not strictly limit the number of concurrent connections. If
+# omitted or zero, the default is 2 * MaxBuffers.
+MaxRequests: 0
+
+# Path to write PID file during startup. This file is kept open and
+# locked with LOCK_EX until keepstore exits, so "fuser -k pidfile" is
+# one way to shut down. Exit immediately if there is an error
+# opening, locking, or writing the PID file.
+PIDFile: ""
+
+# Maximum number of concurrent pull operations. Default is 1, i.e.,
+# pull lists are processed serially. A pull operation copies a block
+# from another keepstore server.
+PullWorkers: 1
+
+# Honor read requests only if a valid signature is provided. This
+# should be true, except for development use and when migrating from
+# a very old version.
+RequireSignatures: true
+
+# Local file containing the Arvados API token used by keep-balance
+# or data manager. Delete, trash, and index requests are honored
+# only for this token.
+SystemAuthTokenFile: ""
+
+# Path to server certificate file in X509 format. Enables TLS mode.
+#
+# Example: /var/lib/acme/live/keep0.example.com/fullchain
+TLSCertificateFile: ""
+
+# Path to server key file in X509 format. Enables TLS mode.
+#
+# The key pair is read from disk during startup, and whenever SIGHUP
+# is received.
+#
+# Example: /var/lib/acme/live/keep0.example.com/privkey
+TLSKeyFile: ""
+
+# How often to check for (and delete) trashed blocks whose
+# TrashLifetime has expired.
+TrashCheckInterval: 24h0m0s
+
+# Time duration after a block is trashed during which it can be
+# recovered using an /untrash request.
+TrashLifetime: 336h0m0s
+
+# Maximum number of concurrent trash operations (moving a block to the
+# trash, or permanently deleting it) . Default is 1, i.e., trash lists
+# are processed serially. If individual trash operations have high
+# latency (eg some cloud platforms) you should increase this.
+TrashWorkers: 1
+</pre>
+
+h3. Notes on storage management
+
+On its own, a keepstore server never deletes data. The "keep-balance":install-keep-balance.html service determines which blocks are candidates for deletion and instructs the keepstore to move those blocks to the trash.
+
+When a block is newly written, it is protected from deletion for the duration in @BlobSignatureTTL@. During this time, it cannot be trashed.
+
+If keep-balance instructs keepstore to trash a block which is older than @BlobSignatureTTL@, and @EnableDelete@ is true, the block will be moved to "trash". A block which is in the trash is no longer accessible by read requests, but has not yet been permanently deleted. Blocks which are in the trash may be recovered using the "untrash" API endpoint. Blocks are permanently deleted after they have been in the trash for the duration in @TrashLifetime@.
+
+Keep-balance is also responsible for balancing the distribution of blocks across keepstore servers by asking servers to pull blocks from other servers (as determined by their "storage class":{{site.baseurl}}/admin/storage-classes.html and "rendezvous hashing order":{{site.baseurl}}/api/storage.html). Pulling a block makes a copy. If a block is overreplicated (i.e. there are excess copies) after pulling, it will be subsequently trashed on the original server.
+
+h3. Configure storage volumes
+
+Available storage volume types include POSIX filesystems and cloud object storage.
+
+* To use a POSIX filesystem, including both local filesystems (ext4, xfs) and network file system such as GPFS or Lustre, follow the setup instructions on "Filesystem storage":configure-fs-storage.html
+* If you are using S3-compatible object storage (including Amazon S3, Google Cloud Storage, and Ceph RADOS), follow the setup instructions on "S3 Object Storage":configure-s3-object-storage.html
+* If you are using Azure Blob Storage, follow the setup instructions on "Azure Blob Storage":configure-azure-blob-storage.html
h3. Run keepstore as a supervised service
Install runit to supervise the keepstore daemon. {% include 'install_runit' %}
-Install this script as the run script for the keepstore service, modifying it as directed below.
+Install this script as the run script @/etc/sv/keepstore/run@ for the keepstore service:
<notextile>
<pre><code>#!/bin/sh
exec 2>&1
-GOGC=10 exec keepstore \
- -enforce-permissions=true \
- -blob-signing-key-file=<span class="userinput">/etc/keepstore/blob-signing.key</span> \
- -max-buffers=<span class="userinput">100</span> \
- -serialize=true \
- -never-delete=false \
- -volume=<span class="userinput">/mnt/keep</span> \
- -volume=<span class="userinput">/mnt2/keep</span>
+GOGC=10 exec keepstore -config /etc/arvados/keepstore/keepstore.yml
</code></pre>
</notextile>
-p(#max-buffers). The @-max-buffers@ argument limits keepstore's memory usage. It should be set such that @max-buffers * 64MiB + 10%@ fits comfortably in memory. On a host dedicated to running keepstore, divide total memory by 88MiB to suggest a suitable value. For example, if @grep MemTotal /proc/meminfo@ reports @MemTotal: 7125440 kB@, compute 7125440÷(88×1024)=79 and configure @-max-buffers=79@.
-
-If you want access control on your Keepstore server(s), you must specify the @-enforce-permissions@ flag and provide a signing key. The @-blob-signing-key-file@ argument should be a file containing a long random alphanumeric string with no internal line breaks (it is also possible to use a socket or FIFO: keepstore reads it only once, at startup). This key must be the same as the @blob_signing_key@ configured in the "API server's":install-api-server.html configuration file, @/etc/arvados/api/application.yml@.
-
-The @-serialize=true@ (default: @false@) argument limits keepstore to one reader/writer process per storage partition. This avoids thrashing by allowing the storage device underneath the storage partition to do read/write operations sequentially. Enabling @-serialize@ can improve Keepstore performance if the storage partitions map 1:1 to physical disks that are dedicated to Keepstore, particularly so for mechanical disks. In some cloud environments, enabling @-serialize@ has also also proven to be beneficial for performance, but YMMV. If your storage partition(s) are backed by network or RAID storage that can handle many simultaneous reader/writer processes without thrashing, you probably do not want to set @-serialize@.
-
h3. Set up additional servers
Repeat the above sections to prepare volumes and bring up supervised services on each Keepstore server you are setting up.
--- /dev/null
+---
+layout: default
+navsection: userguide
+title: Using storage classes
+...
+
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Storage classes (alternately known as "storage tiers") allow you to control which volumes should be used to store particular collection data blocks. This can be used to implement data storage policies such as moving data to archival storage.
+
+Names of storage classes are internal to the cluster and decided by the administrator. Aside from "default", Arvados currently does not define any standard storage class names.
+
+h3. arv-put
+
+You may specify the desired storage class for a collection uploaded using @arv-put@:
+
+<pre>
+$ arv-put --storage-classes=hot myfile.txt
+</pre>
+
+h3. arvados-cwl-runner
+
+You may also specify the desired storage class for the final output collection produced by @arvados-cwl-runner@:
+
+<pre>
+$ arvados-cwl-runner --storage-classes=hot myworkflow.cwl myinput.yml
+</pre>
+
+(Note: intermediate collections produced by a workflow run will have "default" storage class.)
+
+h3. arv command line
+
+You may set the storage class on an existing collection by setting the "storage_classes_desired" field of a Collection. For example, at the command line:
+
+<pre>
+$ arv collection update --uuid zzzzz-4zz18-dhhm0ay8k8cqkvg --collection '{"storage_classes_desired": ["archival"]}'
+</pre>
+
+By setting "storage_classes_desired" to "archival", the blocks that make up the collection will be preferentially moved to keepstore volumes which are configured with the "archival" storage class.
+
+h3. Storage class notes
+
+Collection blocks will be in the "default" storage class if not otherwise specified.
+
+Currently, a collection may only have one desired storage class.
+
+Any user with write access to a collection may set any storage class on that collection.
+
+Names of storage classes are internal to the cluster and decided by the administrator. Aside from "default", Arvados currently does not define any standard storage class names.
"fmt"
"io"
"io/ioutil"
+ "path/filepath"
+ "regexp"
+ "runtime"
"sort"
"strings"
)
return f(prog, args, stdin, stdout, stderr)
}
+type Version string
+
+func (v Version) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ prog = regexp.MustCompile(` -*version$`).ReplaceAllLiteralString(prog, "")
+ fmt.Fprintf(stdout, "%s %s (%s)\n", prog, v, runtime.Version())
+ return 0
+}
+
// Multi is a Handler that looks up its first argument in a map, and
// invokes the resulting Handler with the remaining args.
//
m.Usage(stderr)
return 2
}
- if cmd, ok := m[args[0]]; !ok {
- fmt.Fprintf(stderr, "unrecognized command %q\n", args[0])
+ _, basename := filepath.Split(prog)
+ if strings.HasPrefix(basename, "arvados-") {
+ basename = basename[8:]
+ } else if strings.HasPrefix(basename, "crunch-") {
+ basename = basename[7:]
+ }
+ if cmd, ok := m[basename]; ok {
+ return cmd.RunCommand(prog, args, stdin, stdout, stderr)
+ } else if cmd, ok = m[args[0]]; ok {
+ return cmd.RunCommand(prog+" "+args[0], args[1:], stdin, stdout, stderr)
+ } else {
+ fmt.Fprintf(stderr, "%s: unrecognized command %q\n", prog, args[0])
m.Usage(stderr)
return 2
- } else {
- return cmd.RunCommand(prog+" "+args[0], args[1:], stdin, stdout, stderr)
}
}
c.Check(stderr.String(), check.Equals, "")
}
+func (s *CmdSuite) TestHelloViaProg(c *check.C) {
+ defer cmdtest.LeakCheck(c)()
+ stdout := bytes.NewBuffer(nil)
+ stderr := bytes.NewBuffer(nil)
+ exited := testCmd.RunCommand("/usr/local/bin/echo", []string{"hello", "world"}, bytes.NewReader(nil), stdout, stderr)
+ c.Check(exited, check.Equals, 0)
+ c.Check(stdout.String(), check.Equals, "hello world\n")
+ c.Check(stderr.String(), check.Equals, "")
+}
+
func (s *CmdSuite) TestUsage(c *check.C) {
defer cmdtest.LeakCheck(c)()
stdout := bytes.NewBuffer(nil)
exited := testCmd.RunCommand("prog", []string{"nosuchcommand", "hi"}, bytes.NewReader(nil), stdout, stderr)
c.Check(exited, check.Equals, 2)
c.Check(stdout.String(), check.Equals, "")
- c.Check(stderr.String(), check.Matches, `(?ms)^unrecognized command "nosuchcommand"\n.*echo.*\n`)
+ c.Check(stderr.String(), check.Matches, `(?ms)^prog: unrecognized command "nosuchcommand"\n.*echo.*\n`)
}
func (s *CmdSuite) TestSubcommandToFront(c *check.C) {
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "git.curoverse.com/arvados.git/lib/cmd"
+ "git.curoverse.com/arvados.git/lib/service"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
+
+func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+ return &Handler{Cluster: cluster, NodeProfile: np}
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "context"
+ "io"
+ "net"
+ "net/http"
+ "net/url"
+ "strings"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/health"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+type Handler struct {
+ Cluster *arvados.Cluster
+ NodeProfile *arvados.NodeProfile
+
+ setupOnce sync.Once
+ handlerStack http.Handler
+ proxyClient *arvados.Client
+}
+
+func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ h.setupOnce.Do(h.setup)
+ h.handlerStack.ServeHTTP(w, req)
+}
+
+func (h *Handler) CheckHealth() error {
+ h.setupOnce.Do(h.setup)
+ _, err := findRailsAPI(h.Cluster, h.NodeProfile)
+ return err
+}
+
+func (h *Handler) setup() {
+ mux := http.NewServeMux()
+ mux.Handle("/_health/", &health.Handler{
+ Token: h.Cluster.ManagementToken,
+ Prefix: "/_health/",
+ })
+ mux.Handle("/", http.HandlerFunc(h.proxyRailsAPI))
+ h.handlerStack = mux
+}
+
+// headers that shouldn't be forwarded when proxying. See
+// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
+var dropHeaders = map[string]bool{
+ "Connection": true,
+ "Keep-Alive": true,
+ "Proxy-Authenticate": true,
+ "Proxy-Authorization": true,
+ "TE": true,
+ "Trailer": true,
+ "Transfer-Encoding": true,
+ "Upgrade": true,
+}
+
+func (h *Handler) proxyRailsAPI(w http.ResponseWriter, reqIn *http.Request) {
+ urlOut, err := findRailsAPI(h.Cluster, h.NodeProfile)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ urlOut = &url.URL{
+ Scheme: urlOut.Scheme,
+ Host: urlOut.Host,
+ Path: reqIn.URL.Path,
+ RawPath: reqIn.URL.RawPath,
+ RawQuery: reqIn.URL.RawQuery,
+ }
+
+ // Copy headers from incoming request, then add/replace proxy
+ // headers like Via and X-Forwarded-For.
+ hdrOut := http.Header{}
+ for k, v := range reqIn.Header {
+ if !dropHeaders[k] {
+ hdrOut[k] = v
+ }
+ }
+ xff := reqIn.RemoteAddr
+ if xffIn := reqIn.Header.Get("X-Forwarded-For"); xffIn != "" {
+ xff = xffIn + "," + xff
+ }
+ hdrOut.Set("X-Forwarded-For", xff)
+ hdrOut.Add("Via", reqIn.Proto+" arvados-controller")
+
+ ctx := reqIn.Context()
+ if timeout := h.Cluster.HTTPRequestTimeout; timeout > 0 {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(timeout)))
+ defer cancel()
+ }
+
+ reqOut := (&http.Request{
+ Method: reqIn.Method,
+ URL: urlOut,
+ Header: hdrOut,
+ Body: reqIn.Body,
+ }).WithContext(ctx)
+ resp, err := arvados.InsecureHTTPClient.Do(reqOut)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ for k, v := range resp.Header {
+ for _, v := range v {
+ w.Header().Add(k, v)
+ }
+ }
+ w.WriteHeader(resp.StatusCode)
+ n, err := io.Copy(w, resp.Body)
+ if err != nil {
+ httpserver.Logger(reqIn).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
+ }
+}
+
+// For now, findRailsAPI always uses the rails API running on this
+// node.
+func findRailsAPI(cluster *arvados.Cluster, np *arvados.NodeProfile) (*url.URL, error) {
+ hostport := np.RailsAPI.Listen
+ if len(hostport) > 1 && hostport[0] == ':' && strings.TrimRight(hostport[1:], "0123456789") == "" {
+ // ":12345" => connect to indicated port on localhost
+ hostport = "localhost" + hostport
+ } else if _, _, err := net.SplitHostPort(hostport); err == nil {
+ // "[::1]:12345" => connect to indicated address & port
+ } else {
+ return nil, err
+ }
+ proto := "http"
+ if np.RailsAPI.TLS {
+ proto = "https"
+ }
+ return url.Parse(proto + "://" + hostport)
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&HandlerSuite{})
+
+type HandlerSuite struct {
+ cluster *arvados.Cluster
+ handler http.Handler
+}
+
+func (s *HandlerSuite) SetUpTest(c *check.C) {
+ s.cluster = &arvados.Cluster{
+ ClusterID: "zzzzz",
+ NodeProfiles: map[string]arvados.NodeProfile{
+ "*": {
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"), TLS: true},
+ },
+ },
+ }
+ node := s.cluster.NodeProfiles["*"]
+ s.handler = newHandler(s.cluster, &node)
+}
+
+func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) {
+ req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil)
+ resp := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ var dd arvados.DiscoveryDocument
+ err := json.Unmarshal(resp.Body.Bytes(), &dd)
+ c.Check(err, check.IsNil)
+ c.Check(dd.BlobSignatureTTL, check.Not(check.Equals), int64(0))
+ c.Check(dd.BlobSignatureTTL > 0, check.Equals, true)
+ c.Check(len(dd.Resources), check.Not(check.Equals), 0)
+ c.Check(len(dd.Schemas), check.Not(check.Equals), 0)
+}
+
+func (s *HandlerSuite) TestRequestTimeout(c *check.C) {
+ s.cluster.HTTPRequestTimeout = arvados.Duration(time.Nanosecond)
+ req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil)
+ resp := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusInternalServerError)
+ var jresp httpserver.ErrorResponse
+ err := json.Unmarshal(resp.Body.Bytes(), &jresp)
+ c.Check(err, check.IsNil)
+ c.Assert(len(jresp.Errors), check.Equals, 1)
+ c.Check(jresp.Errors[0], check.Matches, `.*context deadline exceeded`)
+}
+
+func (s *HandlerSuite) TestProxyWithoutToken(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
+ resp := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+ jresp := map[string]interface{}{}
+ err := json.Unmarshal(resp.Body.Bytes(), &jresp)
+ c.Check(err, check.IsNil)
+ c.Check(jresp["errors"], check.FitsTypeOf, []interface{}{})
+}
+
+func (s *HandlerSuite) TestProxyWithToken(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ var u arvados.User
+ err := json.Unmarshal(resp.Body.Bytes(), &u)
+ c.Check(err, check.IsNil)
+ c.Check(u.UUID, check.Equals, arvadostest.ActiveUserUUID)
+}
+
+func (s *HandlerSuite) TestProxyWithTokenInRequestBody(c *check.C) {
+ req := httptest.NewRequest("POST", "/arvados/v1/users/current", strings.NewReader(url.Values{
+ "_method": {"GET"},
+ "api_token": {arvadostest.ActiveToken},
+ }.Encode()))
+ resp := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ var u arvados.User
+ err := json.Unmarshal(resp.Body.Bytes(), &u)
+ c.Check(err, check.IsNil)
+ c.Check(u.UUID, check.Equals, arvadostest.ActiveUserUUID)
+}
+
+func (s *HandlerSuite) TestProxyNotFound(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/xyzzy", nil)
+ resp := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+ jresp := map[string]interface{}{}
+ err := json.Unmarshal(resp.Body.Bytes(), &jresp)
+ c.Check(err, check.IsNil)
+ c.Check(jresp["errors"], check.FitsTypeOf, []interface{}{})
+}
needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
- availableTypes := make([]arvados.InstanceType, len(cc.InstanceTypes))
- copy(availableTypes, cc.InstanceTypes)
- sort.Slice(availableTypes, func(a, b int) bool {
- return availableTypes[a].Price < availableTypes[b].Price
- })
- err = ConstraintsNotSatisfiableError{
- errors.New("constraints not satisfiable by any configured instance type"),
- availableTypes,
- }
+ ok := false
for _, it := range cc.InstanceTypes {
switch {
- case err == nil && it.Price > best.Price:
- case it.Scratch < needScratch:
- case it.RAM < needRAM:
+ case ok && it.Price > best.Price:
+ case int64(it.Scratch) < needScratch:
+ case int64(it.RAM) < needRAM:
case it.VCPUs < needVCPUs:
- case it.Preemptable != ctr.SchedulingParameters.Preemptable:
+ case it.Preemptible != ctr.SchedulingParameters.Preemptible:
case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs):
// Equal price, but worse specs
default:
// Lower price || (same price && better specs)
best = it
- err = nil
+ ok = true
+ }
+ }
+ if !ok {
+ availableTypes := make([]arvados.InstanceType, 0, len(cc.InstanceTypes))
+ for _, t := range cc.InstanceTypes {
+ availableTypes = append(availableTypes, t)
}
+ sort.Slice(availableTypes, func(a, b int) bool {
+ return availableTypes[a].Price < availableTypes[b].Price
+ })
+ err = ConstraintsNotSatisfiableError{
+ errors.New("constraints not satisfiable by any configured instance type"),
+ availableTypes,
+ }
+ return
}
return
}
var _ = check.Suite(&NodeSizeSuite{})
-const GiB = int64(1 << 30)
+const GiB = arvados.ByteSize(1 << 30)
type NodeSizeSuite struct{}
func (*NodeSizeSuite) TestChooseUnsatisfiable(c *check.C) {
checkUnsatisfiable := func(ctr *arvados.Container) {
- _, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: []arvados.InstanceType{
- {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small1"},
- {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "small2"},
- {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "small4", Scratch: GiB},
+ _, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: map[string]arvados.InstanceType{
+ "small1": {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small1"},
+ "small2": {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "small2"},
+ "small4": {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "small4", Scratch: GiB},
}}, ctr)
c.Check(err, check.FitsTypeOf, ConstraintsNotSatisfiableError{})
}
checkUnsatisfiable(&arvados.Container{RuntimeConstraints: rc})
}
checkUnsatisfiable(&arvados.Container{
- Mounts: map[string]arvados.Mount{"/tmp": {Kind: "tmp", Capacity: 2 * GiB}},
+ Mounts: map[string]arvados.Mount{"/tmp": {Kind: "tmp", Capacity: int64(2 * GiB)}},
RuntimeConstraints: arvados.RuntimeConstraints{RAM: 12345, VCPUs: 1},
})
}
func (*NodeSizeSuite) TestChoose(c *check.C) {
- for _, menu := range [][]arvados.InstanceType{
+ for _, menu := range []map[string]arvados.InstanceType{
{
- {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
- {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
- {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
+ "costly": {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+ "best": {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+ "small": {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
},
{
- {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
- {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "goodenough"},
- {Price: 2.2, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
- {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
+ "costly": {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+ "goodenough": {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "goodenough"},
+ "best": {Price: 2.2, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+ "small": {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
},
{
- {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
- {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "goodenough"},
- {Price: 2.2, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
- {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+ "small": {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
+ "goodenough": {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "goodenough"},
+ "best": {Price: 2.2, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+ "costly": {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
},
{
- {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: GiB, Name: "small"},
- {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: GiB, Name: "nearly"},
- {Price: 3.3, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
- {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+ "small": {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: GiB, Name: "small"},
+ "nearly": {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: GiB, Name: "nearly"},
+ "best": {Price: 3.3, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+ "costly": {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
},
} {
best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu}, &arvados.Container{
Mounts: map[string]arvados.Mount{
- "/tmp": {Kind: "tmp", Capacity: 2 * GiB},
+ "/tmp": {Kind: "tmp", Capacity: 2 * int64(GiB)},
},
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 2,
}
func (*NodeSizeSuite) TestChoosePreemptable(c *check.C) {
- menu := []arvados.InstanceType{
- {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Preemptable: true, Name: "costly"},
- {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "almost best"},
- {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Preemptable: true, Name: "best"},
- {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Preemptable: true, Name: "small"},
+ menu := map[string]arvados.InstanceType{
+ "costly": {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Preemptible: true, Name: "costly"},
+ "almost best": {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "almost best"},
+ "best": {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Preemptible: true, Name: "best"},
+ "small": {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Preemptible: true, Name: "small"},
}
best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu}, &arvados.Container{
Mounts: map[string]arvados.Mount{
- "/tmp": {Kind: "tmp", Capacity: 2 * GiB},
+ "/tmp": {Kind: "tmp", Capacity: 2 * int64(GiB)},
},
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 2,
KeepCacheRAM: 123456789,
},
SchedulingParameters: arvados.SchedulingParameters{
- Preemptable: true,
+ Preemptible: true,
},
})
c.Check(err, check.IsNil)
c.Check(best.RAM >= 1234567890, check.Equals, true)
c.Check(best.VCPUs >= 2, check.Equals, true)
c.Check(best.Scratch >= 2*GiB, check.Equals, true)
- c.Check(best.Preemptable, check.Equals, true)
+ c.Check(best.Preemptible, check.Equals, true)
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+// package service provides a cmd.Handler that brings up a system service.
+package service
+
+import (
+ "flag"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+
+ "git.curoverse.com/arvados.git/lib/cmd"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/Sirupsen/logrus"
+ "github.com/coreos/go-systemd/daemon"
+)
+
+type Handler interface {
+ http.Handler
+ CheckHealth() error
+}
+
+type NewHandlerFunc func(*arvados.Cluster, *arvados.NodeProfile) Handler
+
+type command struct {
+ newHandler NewHandlerFunc
+ svcName arvados.ServiceName
+}
+
+// Command returns a cmd.Handler that loads site config, calls
+// newHandler with the current cluster and node configs, and brings up
+// an http server with the returned handler.
+//
+// The handler is wrapped with server middleware (adding X-Request-ID
+// headers, logging requests/responses, etc).
+func Command(svcName arvados.ServiceName, newHandler NewHandlerFunc) cmd.Handler {
+ return &command{
+ newHandler: newHandler,
+ svcName: svcName,
+ }
+}
+
+func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ log := logrus.New()
+ log.Formatter = &logrus.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ }
+ log.Out = stderr
+
+ var err error
+ defer func() {
+ if err != nil {
+ log.WithError(err).Info("exiting")
+ }
+ }()
+ flags := flag.NewFlagSet("", flag.ContinueOnError)
+ flags.SetOutput(stderr)
+ configFile := flags.String("config", arvados.DefaultConfigFile, "Site configuration `file`")
+ nodeProfile := flags.String("node-profile", "", "`Name` of NodeProfiles config entry to use (if blank, use $ARVADOS_NODE_PROFILE or hostname reported by OS)")
+ err = flags.Parse(args)
+ if err == flag.ErrHelp {
+ err = nil
+ return 0
+ } else if err != nil {
+ return 2
+ }
+ cfg, err := arvados.GetConfig(*configFile)
+ if err != nil {
+ return 1
+ }
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ return 1
+ }
+ profileName := *nodeProfile
+ if profileName == "" {
+ profileName = os.Getenv("ARVADOS_NODE_PROFILE")
+ }
+ profile, err := cluster.GetNodeProfile(profileName)
+ if err != nil {
+ return 1
+ }
+ listen := profile.ServicePorts()[c.svcName]
+ if listen == "" {
+ err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
+ return 1
+ }
+ handler := c.newHandler(cluster, profile)
+ if err = handler.CheckHealth(); err != nil {
+ return 1
+ }
+ srv := &httpserver.Server{
+ Server: http.Server{
+ Handler: httpserver.AddRequestIDs(httpserver.LogRequests(log, handler)),
+ },
+ Addr: listen,
+ }
+ err = srv.Start()
+ if err != nil {
+ return 1
+ }
+ log.WithFields(logrus.Fields{
+ "Listen": srv.Addr,
+ "Service": c.svcName,
+ }).Info("listening")
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+ log.WithError(err).Errorf("error notifying init daemon")
+ }
+ err = srv.Wait()
+ if err != nil {
+ return 1
+ }
+ return 0
+}
+
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
--- /dev/null
+.onLoad <- function(libName, pkgName)
+{
+ minAllowedRVersion <- "3.3.0"
+ currentRVersion <- getRversion()
+
+ if(currentRVersion < minAllowedRVersion)
+ print(paste0("Minimum R version required to run ", pkgName, " is ",
+ minAllowedRVersion, ". Your current version is ",
+ toString(currentRVersion), ". Please update R and try again."))
+}
apt-get install build-essential libxml2-dev libssl-dev libcurl4-gnutls-dev
```
+Minimum R version required to run ArvadosR is 3.3.0.
+
### Usage
import cwltool.process
from schema_salad.sourceline import SourceLine
import schema_salad.validate as validate
+import cwltool.argparser
import arvados
import arvados.config
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
+from .context import ArvLoadingContext, ArvRuntimeContext
from ._version import __version__
from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
from cwltool.command_line_tool import compute_checksums
+
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
"""
- def __init__(self, api_client, work_api=None, keep_client=None,
- output_name=None, output_tags=None, num_retries=4,
+ def __init__(self, api_client,
+ arvargs=None,
+ keep_client=None,
+ num_retries=4,
thread_count=4):
+
+ if arvargs is None:
+ arvargs = argparse.Namespace()
+ arvargs.work_api = None
+ arvargs.output_name = None
+ arvargs.output_tags = None
+ arvargs.thread_count = 1
+
self.api = api_client
self.processes = {}
self.workflow_eval_lock = threading.Condition(threading.RLock())
self.poll_api = None
self.pipeline = None
self.final_output_collection = None
- self.output_name = output_name
- self.output_tags = output_tags
+ self.output_name = arvargs.output_name
+ self.output_tags = arvargs.output_tags
self.project_uuid = None
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
- self.thread_count = thread_count
+ self.thread_count = arvargs.thread_count
self.poll_interval = 12
+ self.loadingContext = None
if keep_client is not None:
self.keep_client = keep_client
try:
methods = self.api._rootDesc.get('resources')[api]['methods']
if ('httpMethod' in methods['create'] and
- (work_api == api or work_api is None)):
+ (arvargs.work_api == api or arvargs.work_api is None)):
self.work_api = api
break
except KeyError:
pass
if not self.work_api:
- if work_api is None:
+ if arvargs.work_api is None:
raise Exception("No supported APIs")
else:
raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
- def arv_make_tool(self, toolpath_object, **kwargs):
- kwargs["work_api"] = self.work_api
- kwargs["fetcher_constructor"] = self.fetcher_constructor
- kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
+ self.loadingContext = ArvLoadingContext(vars(arvargs))
+ self.loadingContext.fetcher_constructor = self.fetcher_constructor
+ self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
+ self.loadingContext.construct_tool_object = self.arv_make_tool
+
+
+ def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
- return ArvadosCommandTool(self, toolpath_object, **kwargs)
+ return ArvadosCommandTool(self, toolpath_object, loadingContext)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
- return ArvadosWorkflow(self, toolpath_object, **kwargs)
+ return ArvadosWorkflow(self, toolpath_object, loadingContext)
else:
- return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
+ return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
def output_callback(self, out, processStatus):
with self.workflow_eval_lock:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Complete"}).execute(num_retries=self.num_retries)
else:
- logger.warn("Overall process status is %s", processStatus)
+ logger.error("Overall process status is %s", processStatus)
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
self.workflow_eval_lock.notifyAll()
- def start_run(self, runnable, kwargs):
- self.task_queue.add(partial(runnable.run, **kwargs))
+ def start_run(self, runnable, runtimeContext):
+ self.task_queue.add(partial(runnable.run, runtimeContext))
def process_submitted(self, container):
with self.workflow_eval_lock:
with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
self.check_features(v)
- def make_output_collection(self, name, tagsString, outputObj):
+ def make_output_collection(self, name, storage_classes, tagsString, outputObj):
outputObj = copy.deepcopy(outputObj)
files = []
with final.open("cwl.output.json", "w") as f:
json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
- final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
+ final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
final.api_response()["name"],
'progress':1.0
}).execute(num_retries=self.num_retries)
- def arv_executor(self, tool, job_order, **kwargs):
- self.debug = kwargs.get("debug")
+ def arv_executor(self, tool, job_order, runtimeContext, logger=None):
+ self.debug = runtimeContext.debug
tool.visit(self.check_features)
- self.project_uuid = kwargs.get("project_uuid")
+ self.project_uuid = runtimeContext.project_uuid
self.pipeline = None
- make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
- collection_cache=self.collection_cache)
- self.fs_access = make_fs_access(kwargs["basedir"])
- self.secret_store = kwargs.get("secret_store")
+ self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+ self.secret_store = runtimeContext.secret_store
- self.trash_intermediate = kwargs["trash_intermediate"]
+ self.trash_intermediate = runtimeContext.trash_intermediate
if self.trash_intermediate and self.work_api != "containers":
raise Exception("--trash-intermediate is only supported with --api=containers.")
- self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
+ self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
if self.intermediate_output_ttl and self.work_api != "containers":
raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
if self.intermediate_output_ttl < 0:
raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
- if kwargs.get("submit_request_uuid") and self.work_api != "containers":
+ if runtimeContext.submit_request_uuid and self.work_api != "containers":
raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
- if not kwargs.get("name"):
- kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+ if not runtimeContext.name:
+ runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
# Reload tool object which may have been updated by
# upload_workflow_deps
# Don't validate this time because it will just print redundant errors.
+ loadingContext = self.loadingContext.copy()
+ loadingContext.loader = tool.doc_loader
+ loadingContext.avsc_names = tool.doc_schema
+ loadingContext.metadata = tool.metadata
+ loadingContext.do_validate = False
+
tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
- makeTool=self.arv_make_tool,
- loader=tool.doc_loader,
- avsc_names=tool.doc_schema,
- metadata=tool.metadata,
- do_validate=False)
+ loadingContext)
# Upload local file references in the job order.
- job_order = upload_job_order(self, "%s input" % kwargs["name"],
+ job_order = upload_job_order(self, "%s input" % runtimeContext.name,
tool, job_order)
- existing_uuid = kwargs.get("update_workflow")
- if existing_uuid or kwargs.get("create_workflow"):
+ existing_uuid = runtimeContext.update_workflow
+ if existing_uuid or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
if self.work_api == "jobs":
tmpl = RunnerTemplate(self, tool, job_order,
- kwargs.get("enable_reuse"),
+ runtimeContext.enable_reuse,
uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"],
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
merged_map=merged_map)
tmpl.save()
# cwltool.main will write our return value to stdout.
return (upload_workflow(self, tool, job_order,
self.project_uuid,
uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"],
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
merged_map=merged_map),
"success")
- self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
- self.eval_timeout = kwargs.get("eval_timeout")
+ self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
+ self.eval_timeout = runtimeContext.eval_timeout
- kwargs["make_fs_access"] = make_fs_access
- kwargs["enable_reuse"] = kwargs.get("enable_reuse")
- kwargs["use_container"] = True
- kwargs["tmpdir_prefix"] = "tmp"
- kwargs["compute_checksum"] = kwargs.get("compute_checksum")
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.use_container = True
+ runtimeContext.tmpdir_prefix = "tmp"
+ runtimeContext.work_api = self.work_api
if self.work_api == "containers":
if self.ignore_docker_for_reuse:
raise Exception("--ignore-docker-for-reuse not supported with containers API.")
- kwargs["outdir"] = "/var/spool/cwl"
- kwargs["docker_outdir"] = "/var/spool/cwl"
- kwargs["tmpdir"] = "/tmp"
- kwargs["docker_tmpdir"] = "/tmp"
+ runtimeContext.outdir = "/var/spool/cwl"
+ runtimeContext.docker_outdir = "/var/spool/cwl"
+ runtimeContext.tmpdir = "/tmp"
+ runtimeContext.docker_tmpdir = "/tmp"
elif self.work_api == "jobs":
- if kwargs["priority"] != DEFAULT_PRIORITY:
+ if runtimeContext.priority != DEFAULT_PRIORITY:
raise Exception("--priority not implemented for jobs API.")
- kwargs["outdir"] = "$(task.outdir)"
- kwargs["docker_outdir"] = "$(task.outdir)"
- kwargs["tmpdir"] = "$(task.tmpdir)"
+ runtimeContext.outdir = "$(task.outdir)"
+ runtimeContext.docker_outdir = "$(task.outdir)"
+ runtimeContext.tmpdir = "$(task.tmpdir)"
- if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
+ if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
raise Exception("--priority must be in the range 1..1000.")
runnerjob = None
- if kwargs.get("submit"):
+ if runtimeContext.submit:
# Submit a runner job to run the workflow for us.
if self.work_api == "containers":
- if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
- kwargs["runnerjob"] = tool.tool["id"]
+ if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
+ runtimeContext.runnerjob = tool.tool["id"]
runnerjob = tool.job(job_order,
self.output_callback,
- **kwargs).next()
+ runtimeContext).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
+ runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"),
- on_error=kwargs.get("on_error"),
- submit_runner_image=kwargs.get("submit_runner_image"),
- intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ on_error=runtimeContext.on_error,
+ submit_runner_image=runtimeContext.submit_runner_image,
+ intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
merged_map=merged_map,
- priority=kwargs.get("priority"),
+ priority=runtimeContext.priority,
secret_store=self.secret_store)
elif self.work_api == "jobs":
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
+ runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"),
- on_error=kwargs.get("on_error"),
- submit_runner_image=kwargs.get("submit_runner_image"),
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ on_error=runtimeContext.on_error,
+ submit_runner_image=runtimeContext.submit_runner_image,
merged_map=merged_map)
- elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
+ elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
"owner_uuid": self.project_uuid,
- "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
+ "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
"components": {},
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
logger.info("Pipeline instance %s", self.pipeline["uuid"])
- if runnerjob and not kwargs.get("wait"):
- submitargs = kwargs.copy()
- submitargs['submit'] = False
- runnerjob.run(**submitargs)
+ if runnerjob and not runtimeContext.wait:
+ submitargs = runtimeContext.copy()
+ submitargs.submit = False
+ runnerjob.run(submitargs)
return (runnerjob.uuid, "success")
self.poll_api = arvados.api('v1')
if runnerjob:
jobiter = iter((runnerjob,))
else:
- if "cwl_runner_job" in kwargs:
- self.uuid = kwargs.get("cwl_runner_job").get('uuid')
+ if runtimeContext.cwl_runner_job is not None:
+ self.uuid = runtimeContext.cwl_runner_job.get('uuid')
jobiter = tool.job(job_order,
self.output_callback,
- **kwargs)
+ runtimeContext)
try:
self.workflow_eval_lock.acquire()
if runnable:
with Perf(metrics, "run"):
- self.start_run(runnable, kwargs)
+ self.start_run(runnable, runtimeContext)
else:
if (self.task_queue.in_flight + len(self.processes)) > 0:
self.workflow_eval_lock.wait(3)
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
- if kwargs.get("submit") and isinstance(runnerjob, Runner):
+ if runtimeContext.submit and isinstance(runnerjob, Runner):
logger.info("Final output collection %s", runnerjob.final_output)
else:
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
if self.output_tags is None:
self.output_tags = ""
- self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
+
+ storage_classes = runtimeContext.storage_classes.strip().split(",")
+ self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
self.set_crunch_output()
- if kwargs.get("compute_checksum"):
+ if runtimeContext.compute_checksum:
adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
parser.add_argument("--enable-dev", action="store_true",
help="Enable loading and running development versions "
"of CWL spec.", default=False)
+ parser.add_argument('--storage-classes', default="default", type=str,
+ help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
job_order_object = None
arvargs = parser.parse_args(args)
+ if len(arvargs.storage_classes.strip().split(',')) > 1:
+ logger.error("Multiple storage classes are not supported currently.")
+ return 1
+
+ arvargs.use_container = True
+ arvargs.relax_path_checks = True
+ arvargs.print_supported_versions = False
+
if install_sig_handlers:
arv_cmd.install_signal_handlers()
keep_client = api_client.keep
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
- runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
- num_retries=4, output_name=arvargs.output_name,
- output_tags=arvargs.output_tags,
- thread_count=arvargs.thread_count)
+ runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
except Exception as e:
logger.error(e)
return 1
else:
arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
- arvargs.conformance_test = None
- arvargs.use_container = True
- arvargs.relax_path_checks = True
- arvargs.print_supported_versions = False
+ for key, val in cwltool.argparser.get_default_args().items():
+ if not hasattr(arvargs, key):
+ setattr(arvargs, key, val)
- make_fs_access = partial(CollectionFsAccess,
- collection_cache=runner.collection_cache)
+ runtimeContext = ArvRuntimeContext(vars(arvargs))
+ runtimeContext.make_fs_access = partial(CollectionFsAccess,
+ collection_cache=runner.collection_cache)
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
executor=runner.arv_executor,
- makeTool=runner.arv_make_tool,
versionfunc=versionstring,
job_order_object=job_order_object,
- make_fs_access=make_fs_access,
- fetcher_constructor=partial(CollectionFetcher,
- api_client=api_client,
- fs_access=make_fs_access(""),
- num_retries=runner.num_retries),
- resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
logger_handler=arvados.log_handler,
- custom_schema_callback=add_arv_hints)
+ custom_schema_callback=add_arv_hints,
+ loadingContext=runner.loadingContext,
+ runtimeContext=runtimeContext)
import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
-from cwltool.process import get_feature, UnsupportedRequirement, shortname
+from cwltool.process import UnsupportedRequirement, shortname
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
+from cwltool.job import JobBase
import arvados.collection
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
-class ArvadosContainer(object):
+class ArvadosContainer(JobBase):
"""Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
- def __init__(self, runner):
+ def __init__(self, runner,
+ builder, # type: Builder
+ joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
+ make_path_mapper, # type: Callable[..., PathMapper]
+ requirements, # type: List[Dict[Text, Text]]
+ hints, # type: List[Dict[Text, Text]]
+ name # type: Text
+ ):
+ super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
self.arvrunner = runner
self.running = False
self.uuid = None
def update_pipeline_component(self, r):
pass
- def run(self, dry_run=False, pull_image=True, **kwargs):
+ def run(self, runtimeContext):
# ArvadosCommandTool subclasses from cwltool.CommandLineTool,
# which calls makeJobRunner() to get a new ArvadosContainer
# object. The fields that define execution such as
"name": self.name,
"output_path": self.outdir,
"cwd": self.outdir,
- "priority": kwargs.get("priority"),
+ "priority": runtimeContext.priority,
"state": "Committed",
"properties": {},
}
mounts["stdout"] = {"kind": "file",
"path": "%s/%s" % (self.outdir, self.stdout)}
- (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+ (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
if not docker_req:
docker_req = {"dockerImageId": "arvados/jobs"}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
- pull_image,
+ runtimeContext.pull_image,
self.arvrunner.project_uuid)
- api_req, _ = get_feature(self, "http://arvados.org/cwl#APIRequirement")
+ api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
if api_req:
runtime_constraints["API"] = True
- runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
+ runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
if "keep_cache" in runtime_req:
runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
"writable": True
}
- partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
+ partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
if partition_req:
scheduling_parameters["partitions"] = aslist(partition_req["partition"])
- intermediate_output_req, _ = get_feature(self, "http://arvados.org/cwl#IntermediateOutput")
+ intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
if intermediate_output_req:
self.output_ttl = intermediate_output_req["outputTTL"]
else:
if self.output_ttl < 0:
raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
+ if self.timelimit is not None:
+ scheduling_parameters["max_run_time"] = self.timelimit
+
container_request["output_ttl"] = self.output_ttl
container_request["mounts"] = mounts
container_request["secret_mounts"] = secret_mounts
container_request["runtime_constraints"] = runtime_constraints
container_request["scheduling_parameters"] = scheduling_parameters
- enable_reuse = kwargs.get("enable_reuse", True)
+ enable_reuse = runtimeContext.enable_reuse
if enable_reuse:
- reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement")
+ reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
container_request["use_existing"] = enable_reuse
- if kwargs.get("runnerjob", "").startswith("arvwf:"):
- wfuuid = kwargs["runnerjob"][6:kwargs["runnerjob"].index("#")]
+ if runtimeContext.runnerjob.startswith("arvwf:"):
+ wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
if container_request["name"] == "main":
container_request["name"] = wfrecord["name"]
self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
try:
- if kwargs.get("submit_request_uuid"):
+ if runtimeContext.submit_request_uuid:
response = self.arvrunner.api.container_requests().update(
- uuid=kwargs["submit_request_uuid"],
+ uuid=runtimeContext.submit_request_uuid,
body=container_request
).execute(num_retries=self.arvrunner.num_retries)
else:
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
+ done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
if record["output_uuid"]:
if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
class RunnerContainer(Runner):
"""Submit and manage a container that runs arvados-cwl-runner."""
- def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+ def arvados_job_spec(self, runtimeContext):
"""Create an Arvados container request for this workflow.
The returned dict can be used to create a container passed as
if self.output_tags:
command.append("--output-tags=" + self.output_tags)
- if kwargs.get("debug"):
+ if runtimeContext.debug:
command.append("--debug")
+ if runtimeContext.storage_classes != "default":
+ command.append("--storage-classes=" + runtimeContext.storage_classes)
+
if self.on_error:
command.append("--on-error=" + self.on_error)
return container_req
- def run(self, **kwargs):
- kwargs["keepprefix"] = "keep:"
- job_spec = self.arvados_job_spec(**kwargs)
+ def run(self, runtimeContext):
+ runtimeContext.keepprefix = "keep:"
+ job_spec = self.arvados_job_spec(runtimeContext)
if self.arvrunner.project_uuid:
job_spec["owner_uuid"] = self.arvrunner.project_uuid
- if kwargs.get("submit_request_uuid"):
+ if runtimeContext.submit_request_uuid:
response = self.arvrunner.api.container_requests().update(
- uuid=kwargs["submit_request_uuid"],
+ uuid=runtimeContext.submit_request_uuid,
body=job_spec
).execute(num_retries=self.arvrunner.num_retries)
else:
import datetime
import time
-from cwltool.process import get_feature, shortname, UnsupportedRequirement
+from cwltool.process import shortname, UnsupportedRequirement
from cwltool.errors import WorkflowException
from cwltool.command_line_tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
+from cwltool.job import JobBase
from schema_salad.sourceline import SourceLine
crunchrunner_git_commit = 'a3f2cb186e437bfce0031b024b2157b73ed2717d'
-class ArvadosJob(object):
+class ArvadosJob(JobBase):
"""Submit and manage a Crunch job for executing a CWL CommandLineTool."""
- def __init__(self, runner):
+ def __init__(self, runner,
+ builder, # type: Builder
+ joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
+ make_path_mapper, # type: Callable[..., PathMapper]
+ requirements, # type: List[Dict[Text, Text]]
+ hints, # type: List[Dict[Text, Text]]
+ name # type: Text
+ ):
+ super(ArvadosJob, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
self.arvrunner = runner
self.running = False
self.uuid = None
- def run(self, dry_run=False, pull_image=True, **kwargs):
+ def run(self, runtimeContext):
script_parameters = {
"command": self.command_line
}
script_parameters["task.permanentFailCodes"] = self.permanentFailCodes
with Perf(metrics, "arv_docker_get_image %s" % self.name):
- (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
- if docker_req and kwargs.get("use_container") is not False:
+ (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
+ if docker_req and runtimeContext.use_container is not False:
if docker_req.get("dockerOutputDirectory"):
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+ runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api,
+ docker_req,
+ runtimeContext.pull_image,
+ self.arvrunner.project_uuid)
else:
runtime_constraints["docker_image"] = "arvados/jobs"
runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
- runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
+ runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
if "keep_cache" in runtime_req:
runtime_constraints["keep_cache_mb_per_task"] = runtime_req["keep_cache"]
if not self.arvrunner.ignore_docker_for_reuse:
filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
- enable_reuse = kwargs.get("enable_reuse", True)
+ enable_reuse = runtimeContext.enable_reuse
if enable_reuse:
- reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement")
+ reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
dirs[g.group(1)] = g.group(2)
if processStatus == "permanentFail":
- done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
+ done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
with Perf(metrics, "output collection %s" % self.name):
outputs = done.done(self, record, dirs["tmpdir"],
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
- def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+ def arvados_job_spec(self, debug=False):
"""Create an Arvados job specification for this workflow.
The returned dict can be used to create a job (i.e., passed as
if self.on_error:
self.job_order["arv:on_error"] = self.on_error
- if kwargs.get("debug"):
+ if debug:
self.job_order["arv:debug"] = True
return {
}
}
- def run(self, **kwargs):
- job_spec = self.arvados_job_spec(**kwargs)
+ def run(self, runtimeContext):
+ job_spec = self.arvados_job_spec(runtimeContext.debug)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
body=instance_spec).execute(num_retries=self.arvrunner.num_retries)
logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
- if kwargs.get("wait") is False:
+ if runtimeContext.wait is False:
self.uuid = self.arvrunner.pipeline["uuid"]
return
from .arvjob import ArvadosJob
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
+from functools import partial
class ArvadosCommandTool(CommandLineTool):
"""Wrap cwltool CommandLineTool to override selected methods."""
- def __init__(self, arvrunner, toolpath_object, **kwargs):
- super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
+ def __init__(self, arvrunner, toolpath_object, loadingContext):
+ super(ArvadosCommandTool, self).__init__(toolpath_object, loadingContext)
self.arvrunner = arvrunner
- self.work_api = kwargs["work_api"]
-
- def makeJobRunner(self, **kwargs):
- if self.work_api == "containers":
- return ArvadosContainer(self.arvrunner)
- elif self.work_api == "jobs":
- return ArvadosJob(self.arvrunner)
-
- def makePathMapper(self, reffiles, stagedir, **kwargs):
- # type: (List[Any], unicode, **Any) -> PathMapper
- if self.work_api == "containers":
- return ArvPathMapper(self.arvrunner, reffiles+kwargs.get("extra_reffiles", []), kwargs["basedir"],
+
+ def make_job_runner(self, runtimeContext):
+ if runtimeContext.work_api == "containers":
+ return partial(ArvadosContainer, self.arvrunner)
+ elif runtimeContext.work_api == "jobs":
+ return partial(ArvadosJob, self.arvrunner)
+ else:
+ raise Exception("Unsupported work_api %s", runtimeContext.work_api)
+
+ def make_path_mapper(self, reffiles, stagedir, runtimeContext, separateDirs):
+ if runtimeContext.work_api == "containers":
+ return ArvPathMapper(self.arvrunner, reffiles+runtimeContext.extra_reffiles, runtimeContext.basedir,
"/keep/%s",
- "/keep/%s/%s",
- **kwargs)
- elif self.work_api == "jobs":
- return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ "/keep/%s/%s")
+ elif runtimeContext.work_api == "jobs":
+ return ArvPathMapper(self.arvrunner, reffiles, runtimeContext.basedir,
"$(task.keep)/%s",
- "$(task.keep)/%s/%s",
- **kwargs)
+ "$(task.keep)/%s/%s")
- def job(self, joborder, output_callback, **kwargs):
+ def job(self, joborder, output_callback, runtimeContext):
# Workaround for #13365
- builderargs = kwargs.copy()
- builderargs["toplevel"] = True
- builderargs["tmp_outdir_prefix"] = ""
- builder = self._init_job(joborder, **builderargs)
+ builderargs = runtimeContext.copy()
+ builderargs.toplevel = True
+ builderargs.tmp_outdir_prefix = ""
+ builder = self._init_job(joborder, builderargs)
joborder = builder.job
- if self.work_api == "containers":
+ runtimeContext = runtimeContext.copy()
+
+ if runtimeContext.work_api == "containers":
dockerReq, is_req = self.get_requirement("DockerRequirement")
if dockerReq and dockerReq.get("dockerOutputDirectory"):
- kwargs["outdir"] = dockerReq.get("dockerOutputDirectory")
- kwargs["docker_outdir"] = dockerReq.get("dockerOutputDirectory")
+ runtimeContext.outdir = dockerReq.get("dockerOutputDirectory")
+ runtimeContext.docker_outdir = dockerReq.get("dockerOutputDirectory")
else:
- kwargs["outdir"] = "/var/spool/cwl"
- kwargs["docker_outdir"] = "/var/spool/cwl"
- elif self.work_api == "jobs":
- kwargs["outdir"] = "$(task.outdir)"
- kwargs["docker_outdir"] = "$(task.outdir)"
- kwargs["tmpdir"] = "$(task.tmpdir)"
- kwargs["docker_tmpdir"] = "$(task.tmpdir)"
- return super(ArvadosCommandTool, self).job(joborder, output_callback, **kwargs)
+ runtimeContext.outdir = "/var/spool/cwl"
+ runtimeContext.docker_outdir = "/var/spool/cwl"
+ elif runtimeContext.work_api == "jobs":
+ runtimeContext.outdir = "$(task.outdir)"
+ runtimeContext.docker_outdir = "$(task.outdir)"
+ runtimeContext.tmpdir = "$(task.tmpdir)"
+ runtimeContext.docker_tmpdir = "$(task.tmpdir)"
+ return super(ArvadosCommandTool, self).job(joborder, output_callback, runtimeContext)
from cwltool.workflow import Workflow, WorkflowException
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.builder import Builder
+from cwltool.context import LoadingContext
import ruamel.yaml as yaml
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
- def __init__(self, arvrunner, toolpath_object, **kwargs):
- super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
+ def __init__(self, arvrunner, toolpath_object, loadingContext):
+ super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
self.arvrunner = arvrunner
- self.work_api = kwargs["work_api"]
self.wf_pdh = None
self.dynamic_resource_req = []
self.static_resource_req = []
self.wf_reffiles = []
+ self.loadingContext = loadingContext
- def job(self, joborder, output_callback, **kwargs):
- kwargs["work_api"] = self.work_api
+ def job(self, joborder, output_callback, runtimeContext):
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
if req:
with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
packed = pack(document_loader, workflowobj, uri, self.metadata)
- builder = Builder()
- builder.job = joborder
- builder.requirements = workflowobj["requirements"]
- builder.hints = workflowobj["hints"]
- builder.resources = {}
+ builder = Builder(joborder,
+ requirements=workflowobj["requirements"],
+ hints=workflowobj["hints"],
+ resources={})
def visit(item):
for t in ("hints", "requirements"):
self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
upload_dependencies(self.arvrunner,
- kwargs.get("name", ""),
+ runtimeContext.name,
document_loader,
packed,
uri,
if self.dynamic_resource_req:
- builder = Builder()
- builder.job = joborder
- builder.requirements = self.requirements
- builder.hints = self.hints
- builder.resources = {}
+ builder = Builder(joborder,
+ requirements=self.requirements,
+ hints=self.hints,
+ resources={})
# Evaluate dynamic resource requirements using current builder
rs = copy.copy(self.static_resource_req)
reffiles = []
visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
- mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, kwargs["basedir"],
- "/keep/%s",
- "/keep/%s/%s",
- **kwargs)
+ mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
+ "/keep/%s",
+ "/keep/%s/%s")
# For containers API, we need to make sure any extra
# referenced files (ie referenced by the workflow but
# not in the inputs) are included in the mounts.
- kwargs["extra_reffiles"] = copy.deepcopy(self.wf_reffiles)
+ if self.wf_reffiles:
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
def keepmount(obj):
remove_redundant_fields(obj)
"outputs": self.tool["outputs"],
"stdout": "cwl.output.json",
"requirements": self.requirements+job_res_reqs+[
+ {"class": "InlineJavascriptRequirement"},
{
"class": "InitialWorkDirRequirement",
"listing": [{
"entryname": "workflow.cwl",
- "entry": {
- "class": "File",
- "location": "keep:%s/workflow.cwl" % self.wf_pdh
- }
+ "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
}, {
"entryname": "cwl.input.yml",
"entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
"arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
"id": "#"
})
- kwargs["loader"] = self.doc_loader
- kwargs["avsc_names"] = self.doc_schema
- kwargs["metadata"] = self.metadata
- return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
+ return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
else:
- return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
+ return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
--- /dev/null
+from cwltool.context import LoadingContext, RuntimeContext
+
+class ArvLoadingContext(LoadingContext):
+ def __init__(self, kwargs=None):
+ super(ArvLoadingContext, self).__init__(kwargs)
+
+class ArvRuntimeContext(RuntimeContext):
+ def __init__(self, kwargs=None):
+ self.work_api = None
+ self.extra_reffiles = []
+ self.priority = 500
+ self.enable_reuse = True
+ self.runnerjob = ""
+ self.submit_request_uuid = None
+ self.project_uuid = None
+ self.trash_intermediate = False
+ self.intermediate_output_ttl = 0
+ self.update_workflow = ""
+ self.create_workflow = False
+ self.submit_runner_ram = 0
+ self.ignore_docker_for_reuse = False
+ self.submit = True
+ self.submit_runner_image = None
+ self.wait = True
+ self.cwl_runner_job = None
+ self.storage_classes = "default"
+
+ super(ArvRuntimeContext, self).__init__(kwargs)
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
from cwltool.load_tool import load_tool
from cwltool.errors import WorkflowException
+from arvados_cwl.context import ArvRuntimeContext
from .fsaccess import CollectionFetcher, CollectionFsAccess
debug = job_order_object["arv:debug"]
del job_order_object["arv:debug"]
+ arvargs = argparse.Namespace()
+ arvargs.work_api = "jobs"
+ arvargs.output_name = output_name
+ arvargs.output_tags = output_tags
+ arvargs.thread_count = 1
+
runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache(
api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
- output_name=output_name, output_tags=output_tags)
+ arvargs=arvargs)
make_fs_access = functools.partial(CollectionFsAccess,
collection_cache=runner.collection_cache)
- t = load_tool(toolpath, runner.arv_make_tool,
- fetcher_constructor=functools.partial(CollectionFetcher,
- api_client=runner.api,
- fs_access=make_fs_access(""),
- num_retries=runner.num_retries))
+ t = load_tool(toolpath, runner.loadingContext)
if debug:
logger.setLevel(logging.DEBUG)
logging.getLogger('arvados').setLevel(logging.DEBUG)
logging.getLogger("cwltool").setLevel(logging.DEBUG)
- args = argparse.Namespace()
+ args = ArvRuntimeContext(vars(arvargs))
args.project_uuid = arvados.current_job()["owner_uuid"]
args.enable_reuse = enable_reuse
args.on_error = on_error
args.disable_js_validation = False
args.tmp_outdir_prefix = "tmp"
- runner.arv_executor(t, job_order_object, **vars(args))
+ runner.arv_executor(t, job_order_object, args, logger=logger)
except Exception as e:
if isinstance(e, WorkflowException):
logging.info("Workflow error %s", e)
crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
-def logtail(logcollection, logger, header, maxlen=25):
+def logtail(logcollection, logfunc, header, maxlen=25):
if len(logcollection) == 0:
- logger.info(header)
- logger.info(" ** log is empty **")
+ logfunc(header)
+ logfunc(" ** log is empty **")
return
containersapi = ("crunch-run.txt" in logcollection)
loglines = mergelogs.values()[0]
logtxt = "\n ".join(l.strip() for l in loglines)
- logger.info(header)
- logger.info("\n %s", logtxt)
+ logfunc(header)
+ logfunc("\n %s", logtxt)
pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
def __init__(self, arvrunner, referenced_files, input_basedir,
- collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
+ collection_pattern, file_pattern, name=None, single_collection=False):
self.arvrunner = arvrunner
self.input_basedir = input_basedir
self.collection_pattern = collection_pattern
from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
+from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
"""Uploads Docker images used in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
- (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+ (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
if docker_req:
if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
# TODO: can be supported by containers API, but not jobs API.
def __init__(self, runner, tool, job_order, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
- intermediate_output_ttl=0, merged_map=None, priority=None,
- secret_store=None):
+ intermediate_output_ttl=0, merged_map=None,
+ priority=None, secret_store=None):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
if enable_reuse:
# If reuse is permitted by command line arguments but
# disabled by the workflow itself, disable it.
- reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
+ reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
self.enable_reuse = enable_reuse
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
+ done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
self.final_output = record["output"]
outc = arvados.collection.CollectionReader(self.final_output,
import os
import re
+SETUP_DIR = os.path.dirname(__file__) or '.'
+
def git_latest_tag():
gitinfo = subprocess.check_output(
['git', 'describe', '--abbrev=0']).strip()
return str(gitinfo.decode('utf-8'))
+def choose_version_from():
+ sdk_ts = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', os.path.join(SETUP_DIR, "../python")]).strip()
+ cwl_ts = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', SETUP_DIR]).strip()
+ if int(sdk_ts) > int(cwl_ts):
+ getver = os.path.join(SETUP_DIR, "../python")
+ else:
+ getver = SETUP_DIR
+ return getver
+
def git_timestamp_tag():
gitinfo = subprocess.check_output(
['git', 'log', '--first-parent', '--max-count=1',
- '--format=format:%ct', '.']).strip()
+ '--format=format:%ct', choose_version_from()]).strip()
return str(time.strftime('.%Y%m%d%H%M%S', time.gmtime(int(gitinfo))))
def save_version(setup_dir, module, v):
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180524215209',
+ 'cwltool==1.0.20180615183820',
'schema-salad==2.7.20180501211602',
'typing >= 3.5.3',
'ruamel.yaml >=0.13.11, <0.15',
# SPDX-License-Identifier: Apache-2.0
import arvados_cwl
+import arvados_cwl.context
from arvados_cwl.arvdocker import arv_docker_clear_cache
import logging
import mock
class TestContainer(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": Loader({}),
+ "metadata": {"cwlVersion": "v1.0"}})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "containers",
+ "basedir": "",
+ "name": "test_run_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "tmpdir": "/tmp",
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
tool = cmap({
"inputs": [],
"outputs": [],
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner, enable_reuse)
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse),
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(enable_reuse=enable_reuse, priority=500)
+
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
runner.intermediate_output_ttl = 3600
runner.secret_store = cwltool.secrets.SecretStore()
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
- avsc_names=avsc_names, make_fs_access=make_fs_access,
- loader=Loader({}), metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_resource_requirements"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(enable_reuse=True, priority=500)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
call_args, call_kwargs = runner.api.container_requests().create.call_args
runner.intermediate_output_ttl = 0
runner.secret_store = cwltool.secrets.SecretStore()
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
- avsc_names=avsc_names, make_fs_access=make_fs_access,
- loader=Loader({}), metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_initial_work_dir"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(priority=500)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
call_args, call_kwargs = runner.api.container_requests().create.call_args
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_run_redirect"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(priority=500)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
col().open.return_value = []
- arvjob = arvados_cwl.ArvadosContainer(runner)
- arvjob.name = "testjob"
- arvjob.builder = mock.MagicMock()
+ arvjob = arvados_cwl.ArvadosContainer(runner,
+ mock.MagicMock(),
+ {},
+ None,
+ [],
+ [],
+ "testjob")
arvjob.output_callback = mock.MagicMock()
arvjob.collect_outputs = mock.MagicMock()
arvjob.successCodes = [0]
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_run_mounts"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
job_order = {
"p1": {
]
}
}
- for j in arvtool.job(job_order, mock.MagicMock(), basedir="", name="test_run_mounts",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(priority=500)
+ for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
]
}
]})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_secrets"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
job_order = {"pw": "blorp"}
runner.secret_store.store(["pw"], job_order)
- for j in arvtool.job(job_order, mock.MagicMock(), basedir="", name="test_secrets",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(enable_reuse=True, priority=500)
+ for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
}
}))
+ # The test passes no builder.resources
+ # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ def test_timelimit(self, keepdocker):
+ arv_docker_clear_cache()
+
+ runner = mock.MagicMock()
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+ runner.intermediate_output_ttl = 0
+ runner.secret_store = cwltool.secrets.SecretStore()
+
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
+ tool = cmap({
+ "inputs": [],
+ "outputs": [],
+ "baseCommand": "ls",
+ "arguments": [{"valueFrom": "$(runtime.outdir)"}],
+ "id": "#",
+ "class": "CommandLineTool",
+ "hints": [
+ {
+ "class": "http://commonwl.org/cwltool#TimeLimit",
+ "timelimit": 42
+ }
+ ]
+ })
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_timelimit"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
+ arvtool.formatgraph = None
+
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
+
+ _, kwargs = runner.api.container_requests().create.call_args
+ self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))
+
+
def test_get_intermediate_collection_info(self):
arvrunner = mock.MagicMock()
arvrunner.intermediate_output_ttl = 60
class TestJob(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": Loader({}),
+ "metadata": {"cwlVersion": "v1.0"},
+ "makeTool": runner.arv_make_tool})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "jobs",
+ "basedir": "",
+ "name": "test_run_job_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch('arvados.commands.keepdocker.list_images_in_arv')
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner, enable_reuse)
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
- j.run(enable_reuse=enable_reuse)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.jobs().create.assert_called_with(
body=JsonDiffMatcher({
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
runner.api.links().create.side_effect = ApiError(
mock.MagicMock(return_value={'status': 403}),
'Permission denied')
- j.run(enable_reuse=enable_reuse)
+ j.run(runtimeContext)
else:
assert not runner.api.links().create.called
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
-
tool = {
"inputs": [],
"outputs": [],
"id": "#",
"class": "CommandLineTool"
}
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
- make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
- j.run(enable_reuse=True)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.jobs().create.assert_called_with(
body=JsonDiffMatcher({
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
{"items": []},
{"items": [{"manifest_text": "ABC"}]})
- arvjob = arvados_cwl.ArvadosJob(runner)
- arvjob.name = "testjob"
- arvjob.builder = mock.MagicMock()
+ arvjob = arvados_cwl.ArvadosJob(runner,
+ mock.MagicMock(),
+ {},
+ None,
+ [],
+ [],
+ "testjob")
arvjob.output_callback = mock.MagicMock()
arvjob.collect_outputs = mock.MagicMock()
arvjob.collect_outputs.return_value = {"out": "stuff"}
{"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
)
- arvjob = arvados_cwl.ArvadosJob(runner)
- arvjob.name = "testjob"
- arvjob.builder = mock.MagicMock()
+ arvjob = arvados_cwl.ArvadosJob(runner,
+ mock.MagicMock(),
+ {},
+ None,
+ [],
+ [],
+ "testjob")
arvjob.output_callback = mock.MagicMock()
arvjob.collect_outputs = mock.MagicMock()
arvjob.collect_outputs.return_value = {"out": "stuff"}
class TestWorkflow(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+
+ document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
+ document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
+ document_loader.fetch_text = document_loader.fetcher.fetch_text
+ document_loader.check_exists = document_loader.fetcher.check_exists
+
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": document_loader,
+ "metadata": {"cwlVersion": "v1.0"},
+ "construct_tool_object": runner.arv_make_tool})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "jobs",
+ "basedir": "",
+ "name": "test_run_wf_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.collection.CollectionReader")
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
- document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
- document_loader.fetch_text = document_loader.fetcher.fetch_text
- document_loader.check_exists = document_loader.fetcher.check_exists
+ loadingContext, runtimeContext = self.helper(runner)
- tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=document_loader,
- makeTool=runner.arv_make_tool, metadata=metadata)
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
- it.next().run()
- it.next().run()
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+
+ it.next().run(runtimeContext)
+ it.next().run(runtimeContext)
with open("tests/wf/scatter2_subwf.cwl") as f:
subwf = StripYAMLComments(f.read())
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
- document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
- document_loader.fetch_text = document_loader.fetcher.fetch_text
- document_loader.check_exists = document_loader.fetcher.check_exists
+ loadingContext, runtimeContext = self.helper(runner)
- tool, metadata = document_loader.resolve_ref("tests/wf/echo-wf.cwl")
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=document_loader,
- makeTool=runner.arv_make_tool, metadata=metadata)
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
- it.next().run()
- it.next().run()
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+ it.next().run(runtimeContext)
+ it.next().run(runtimeContext)
with open("tests/wf/echo-subwf.cwl") as f:
subwf = StripYAMLComments(f.read())
final.open.return_value = openmock
openmock.__enter__.return_value = cwlout
- _, runner.final_output_collection = runner.make_output_collection("Test output", "tag0,tag1,tag2", {
+ _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "tag0,tag1,tag2", {
"foo": {
"class": "File",
"location": "keep:99999999999999999999999999999991+99/foo.txt",
final.copy.assert_has_calls([mock.call('bar.txt', 'baz.txt', overwrite=False, source_collection=readermock)])
final.copy.assert_has_calls([mock.call('foo.txt', 'foo.txt', overwrite=False, source_collection=readermock)])
- final.save_new.assert_has_calls([mock.call(ensure_unique_name=True, name='Test output', owner_uuid='zzzzz-j7d0g-zzzzzzzzzzzzzzz')])
+ final.save_new.assert_has_calls([mock.call(ensure_unique_name=True, name='Test output', owner_uuid='zzzzz-j7d0g-zzzzzzzzzzzzzzz', storage_classes=['foo'])])
self.assertEqual("""{
"bar": {
"basename": "baz.txt",
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_uuid + '\n')
+ @stubs
+ def test_error_when_multiple_storage_classes_specified(self, stubs):
+ storage_classes = "foo,bar"
+ exited = arvados_cwl.main(
+ ["--debug", "--storage-classes", storage_classes,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ sys.stdin, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 1)
+
@mock.patch("time.sleep")
@stubs
def test_submit_on_error(self, stubs, tm):
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_storage_classes(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--debug", "--submit", "--no-wait", "--api=containers", "--storage-classes=foo",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--enable-reuse', "--debug",
+ "--storage-classes=foo", '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+ @mock.patch("arvados_cwl.task_queue.TaskQueue")
+ @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
+ @mock.patch("arvados_cwl.ArvCwlRunner.make_output_collection", return_value = (None, None))
+ @stubs
+ def test_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
+ def set_final_output(job_order, output_callback, runtimeContext):
+ output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+ return []
+ job.side_effect = set_final_output
+
+ try:
+ exited = arvados_cwl.main(
+ ["--debug", "--local", "--storage-classes=foo",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ sys.stdin, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ make_output.assert_called_with(u'Output of submit_wf.cwl', ['foo'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
+
+ @mock.patch("arvados_cwl.task_queue.TaskQueue")
+ @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
+ @mock.patch("arvados_cwl.ArvCwlRunner.make_output_collection", return_value = (None, None))
+ @stubs
+ def test_default_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
+ def set_final_output(job_order, output_callback, runtimeContext):
+ output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+ return []
+ job.side_effect = set_final_output
+
+ try:
+ exited = arvados_cwl.main(
+ ["--debug", "--local",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ sys.stdin, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ make_output.assert_called_with(u'Output of submit_wf.cwl', ['default'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
@stubs
def test_submit_container_output_ttl(self, stubs):
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "encoding/json"
+ "fmt"
+ "math"
+ "strings"
+)
+
+type ByteSize int64
+
+var prefixValue = map[string]int64{
+ "": 1,
+ "K": 1000,
+ "Ki": 1 << 10,
+ "M": 1000000,
+ "Mi": 1 << 20,
+ "G": 1000000000,
+ "Gi": 1 << 30,
+ "T": 1000000000000,
+ "Ti": 1 << 40,
+ "P": 1000000000000000,
+ "Pi": 1 << 50,
+ "E": 1000000000000000000,
+ "Ei": 1 << 60,
+}
+
+func (n *ByteSize) UnmarshalJSON(data []byte) error {
+ if len(data) == 0 || data[0] != '"' {
+ var i int64
+ err := json.Unmarshal(data, &i)
+ if err != nil {
+ return err
+ }
+ *n = ByteSize(i)
+ return nil
+ }
+ var s string
+ err := json.Unmarshal(data, &s)
+ if err != nil {
+ return err
+ }
+ split := strings.LastIndexAny(s, "0123456789.+-eE") + 1
+ if split == 0 {
+ return fmt.Errorf("invalid byte size %q", s)
+ }
+ if s[split-1] == 'E' {
+ // We accepted an E as if it started the exponent part
+ // of a json number, but if the next char isn't +, -,
+ // or digit, then the E must have meant Exa. Instead
+ // of "4.5E"+"iB" we want "4.5"+"EiB".
+ split--
+ }
+ var val json.Number
+ dec := json.NewDecoder(strings.NewReader(s[:split]))
+ dec.UseNumber()
+ err = dec.Decode(&val)
+ if err != nil {
+ return err
+ }
+ if split == len(s) {
+ return nil
+ }
+ prefix := strings.Trim(s[split:], " ")
+ if strings.HasSuffix(prefix, "B") {
+ prefix = prefix[:len(prefix)-1]
+ }
+ pval, ok := prefixValue[prefix]
+ if !ok {
+ return fmt.Errorf("invalid unit %q", strings.Trim(s[split:], " "))
+ }
+ if intval, err := val.Int64(); err == nil {
+ if pval > 1 && (intval*pval)/pval != intval {
+ return fmt.Errorf("size %q overflows int64", s)
+ }
+ *n = ByteSize(intval * pval)
+ return nil
+ } else if floatval, err := val.Float64(); err == nil {
+ if floatval*float64(pval) > math.MaxInt64 {
+ return fmt.Errorf("size %q overflows int64", s)
+ }
+ *n = ByteSize(int64(floatval * float64(pval)))
+ return nil
+ } else {
+ return fmt.Errorf("bug: json.Number for %q is not int64 or float64: %s", s, err)
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "github.com/ghodss/yaml"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ByteSizeSuite{})
+
+type ByteSizeSuite struct{}
+
+func (s *ByteSizeSuite) TestUnmarshal(c *check.C) {
+ for _, testcase := range []struct {
+ in string
+ out int64
+ }{
+ {"0", 0},
+ {"5", 5},
+ {"5B", 5},
+ {"5 B", 5},
+ {" 4 KiB ", 4096},
+ {"0K", 0},
+ {"0Ki", 0},
+ {"0 KiB", 0},
+ {"4K", 4000},
+ {"4KB", 4000},
+ {"4Ki", 4096},
+ {"4KiB", 4096},
+ {"4MB", 4000000},
+ {"4MiB", 4194304},
+ {"4GB", 4000000000},
+ {"4 GiB", 4294967296},
+ {"4TB", 4000000000000},
+ {"4TiB", 4398046511104},
+ {"4PB", 4000000000000000},
+ {"4PiB", 4503599627370496},
+ {"4EB", 4000000000000000000},
+ {"4EiB", 4611686018427387904},
+ {"4.5EiB", 5188146770730811392},
+ {"1.5 GB", 1500000000},
+ {"1.5 GiB", 1610612736},
+ {"1.234 GiB", 1324997410}, // rounds down from 1324997410.816
+ {"1e2 KB", 100000},
+ {"20E-1 KiB", 2048},
+ {"1E0EB", 1000000000000000000},
+ {"1E-1EB", 100000000000000000},
+ {"1E-1EiB", 115292150460684704},
+ {"4.5E15 K", 4500000000000000000},
+ } {
+ var n ByteSize
+ err := yaml.Unmarshal([]byte(testcase.in+"\n"), &n)
+ c.Logf("%v => %v: %v", testcase.in, testcase.out, n)
+ c.Check(err, check.IsNil)
+ c.Check(int64(n), check.Equals, testcase.out)
+ }
+ for _, testcase := range []string{
+ "B", "K", "KB", "KiB", "4BK", "4iB", "4A", "b", "4b", "4mB", "4m", "4mib", "4KIB", "4K iB", "4Ki B", "BB", "4BB",
+ "400000 EB", // overflows int64
+ "4.11e4 EB", // ok as float64, but overflows int64
+ } {
+ var n ByteSize
+ err := yaml.Unmarshal([]byte(testcase+"\n"), &n)
+ c.Logf("%v => error: %v", n, err)
+ c.Check(err, check.NotNil)
+ }
+}
package arvados
import (
+ "encoding/json"
+ "errors"
"fmt"
"os"
}
type Cluster struct {
- ClusterID string `json:"-"`
- ManagementToken string
- SystemNodes map[string]SystemNode
- InstanceTypes []InstanceType
+ ClusterID string `json:"-"`
+ ManagementToken string
+ NodeProfiles map[string]NodeProfile
+ InstanceTypes InstanceTypeMap
+ HTTPRequestTimeout Duration
}
type InstanceType struct {
Name string
ProviderType string
VCPUs int
- RAM int64
- Scratch int64
+ RAM ByteSize
+ Scratch ByteSize
Price float64
- Preemptable bool
+ Preemptible bool
}
-// GetThisSystemNode returns a SystemNode for the node we're running
-// on right now.
-func (cc *Cluster) GetThisSystemNode() (*SystemNode, error) {
- hostname, err := os.Hostname()
+type InstanceTypeMap map[string]InstanceType
+
+var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
+
+// UnmarshalJSON handles old config files that provide an array of
+// instance types instead of a hash.
+func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
+ if len(data) > 0 && data[0] == '[' {
+ var arr []InstanceType
+ err := json.Unmarshal(data, &arr)
+ if err != nil {
+ return err
+ }
+ if len(arr) == 0 {
+ *it = nil
+ return nil
+ }
+ *it = make(map[string]InstanceType, len(arr))
+ for _, t := range arr {
+ if _, ok := (*it)[t.Name]; ok {
+ return errDuplicateInstanceTypeName
+ }
+ (*it)[t.Name] = t
+ }
+ return nil
+ }
+ var hash map[string]InstanceType
+ err := json.Unmarshal(data, &hash)
if err != nil {
- return nil, err
+ return err
}
- return cc.GetSystemNode(hostname)
+ // Fill in Name field using hash key.
+ *it = InstanceTypeMap(hash)
+ for name, t := range *it {
+ t.Name = name
+ (*it)[name] = t
+ }
+ return nil
}
-// GetSystemNode returns a SystemNode for the given hostname. An error
-// is returned if the appropriate configuration can't be determined
-// (e.g., this does not appear to be a system node).
-func (cc *Cluster) GetSystemNode(node string) (*SystemNode, error) {
- if cfg, ok := cc.SystemNodes[node]; ok {
+// GetNodeProfile returns a NodeProfile for the given hostname. An
+// error is returned if the appropriate configuration can't be
+// determined (e.g., this does not appear to be a system node). If
+// node is empty, use the OS-reported hostname.
+func (cc *Cluster) GetNodeProfile(node string) (*NodeProfile, error) {
+ if node == "" {
+ hostname, err := os.Hostname()
+ if err != nil {
+ return nil, err
+ }
+ node = hostname
+ }
+ if cfg, ok := cc.NodeProfiles[node]; ok {
return &cfg, nil
}
// If node is not listed, but "*" gives a default system node
// config, use the default config.
- if cfg, ok := cc.SystemNodes["*"]; ok {
+ if cfg, ok := cc.NodeProfiles["*"]; ok {
return &cfg, nil
}
return nil, fmt.Errorf("config does not provision host %q as a system node", node)
}
-type SystemNode struct {
+type NodeProfile struct {
+ Controller SystemServiceInstance `json:"arvados-controller"`
Health SystemServiceInstance `json:"arvados-health"`
Keepproxy SystemServiceInstance `json:"keepproxy"`
Keepstore SystemServiceInstance `json:"keepstore"`
Workbench SystemServiceInstance `json:"arvados-workbench"`
}
+type ServiceName string
+
+const (
+ ServiceNameRailsAPI ServiceName = "arvados-api-server"
+ ServiceNameController ServiceName = "arvados-controller"
+ ServiceNameNodemanager ServiceName = "arvados-node-manager"
+ ServiceNameWorkbench ServiceName = "arvados-workbench"
+ ServiceNameWebsocket ServiceName = "arvados-ws"
+ ServiceNameKeepweb ServiceName = "keep-web"
+ ServiceNameKeepproxy ServiceName = "keepproxy"
+ ServiceNameKeepstore ServiceName = "keepstore"
+)
+
// ServicePorts returns the configured listening address (or "" if
// disabled) for each service on the node.
-func (sn *SystemNode) ServicePorts() map[string]string {
- return map[string]string{
- "arvados-api-server": sn.RailsAPI.Listen,
- "arvados-node-manager": sn.Nodemanager.Listen,
- "arvados-workbench": sn.Workbench.Listen,
- "arvados-ws": sn.Websocket.Listen,
- "keep-web": sn.Keepweb.Listen,
- "keepproxy": sn.Keepproxy.Listen,
- "keepstore": sn.Keepstore.Listen,
+func (np *NodeProfile) ServicePorts() map[ServiceName]string {
+ return map[ServiceName]string{
+ ServiceNameRailsAPI: np.RailsAPI.Listen,
+ ServiceNameController: np.Controller.Listen,
+ ServiceNameNodemanager: np.Nodemanager.Listen,
+ ServiceNameWorkbench: np.Workbench.Listen,
+ ServiceNameWebsocket: np.Websocket.Listen,
+ ServiceNameKeepweb: np.Keepweb.Listen,
+ ServiceNameKeepproxy: np.Keepproxy.Listen,
+ ServiceNameKeepstore: np.Keepstore.Listen,
}
}
type SystemServiceInstance struct {
Listen string
+ TLS bool
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "github.com/ghodss/yaml"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ConfigSuite{})
+
+type ConfigSuite struct{}
+
+func (s *ConfigSuite) TestInstanceTypesAsArray(c *check.C) {
+ var cluster Cluster
+ yaml.Unmarshal([]byte("InstanceTypes:\n- Name: foo\n"), &cluster)
+ c.Check(len(cluster.InstanceTypes), check.Equals, 1)
+ c.Check(cluster.InstanceTypes["foo"].Name, check.Equals, "foo")
+}
+
+func (s *ConfigSuite) TestInstanceTypesAsHash(c *check.C) {
+ var cluster Cluster
+ yaml.Unmarshal([]byte("InstanceTypes:\n foo:\n ProviderType: bar\n"), &cluster)
+ c.Check(len(cluster.InstanceTypes), check.Equals, 1)
+ c.Check(cluster.InstanceTypes["foo"].Name, check.Equals, "foo")
+ c.Check(cluster.InstanceTypes["foo"].ProviderType, check.Equals, "bar")
+}
+
+func (s *ConfigSuite) TestInstanceTypeSize(c *check.C) {
+ var it InstanceType
+ err := yaml.Unmarshal([]byte("Name: foo\nScratch: 4GB\nRAM: 4GiB\n"), &it)
+ c.Check(err, check.IsNil)
+ c.Check(int64(it.Scratch), check.Equals, int64(4000000000))
+ c.Check(int64(it.RAM), check.Equals, int64(4294967296))
+}
// such as Partitions
type SchedulingParameters struct {
Partitions []string `json:"partitions"`
- Preemptable bool `json:"preemptable"`
+ Preemptible bool `json:"preemptible"`
+ MaxRunTime int `json:"max_run_time"`
}
// ContainerList is an arvados#containerList resource.
// exposes problems that can't be expressed in Checks, like
// "service S is needed, but isn't configured to run
// anywhere."
- Services map[string]ServiceHealth `json:"services"`
+ Services map[arvados.ServiceName]ServiceHealth `json:"services"`
}
type CheckResult struct {
resp := ClusterHealthResponse{
Health: "OK",
Checks: make(map[string]CheckResult),
- Services: make(map[string]ServiceHealth),
+ Services: make(map[arvados.ServiceName]ServiceHealth),
}
mtx := sync.Mutex{}
wg := sync.WaitGroup{}
- for node, nodeConfig := range cluster.SystemNodes {
- for svc, addr := range nodeConfig.ServicePorts() {
+ for profileName, profile := range cluster.NodeProfiles {
+ for svc, addr := range profile.ServicePorts() {
// Ensure svc is listed in resp.Services.
mtx.Lock()
if _, ok := resp.Services[svc]; !ok {
}
wg.Add(1)
- go func(node, svc, addr string) {
+ go func(profileName string, svc arvados.ServiceName, addr string) {
defer wg.Done()
var result CheckResult
- url, err := agg.pingURL(node, addr)
+ url, err := agg.pingURL(profileName, addr)
if err != nil {
result = CheckResult{
Health: "ERROR",
mtx.Lock()
defer mtx.Unlock()
- resp.Checks[svc+"+"+url] = result
+ resp.Checks[fmt.Sprintf("%s+%s", svc, url)] = result
if result.Health == "OK" {
h := resp.Services[svc]
h.N++
} else {
resp.Health = "ERROR"
}
- }(node, svc, addr)
+ }(profileName, svc, addr)
}
}
wg.Wait()
Clusters: map[string]arvados.Cluster{
"zzzzz": {
ManagementToken: arvadostest.ManagementToken,
- SystemNodes: map[string]arvados.SystemNode{},
+ NodeProfiles: map[string]arvados.NodeProfile{},
},
},
}}
func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
srv, listen := s.stubServer(&unhealthyHandler{})
defer srv.Close()
- s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+ s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
Keepstore: arvados.SystemServiceInstance{Listen: listen},
}
s.handler.ServeHTTP(s.resp, s.req)
func (s *AggregatorSuite) TestHealthy(c *check.C) {
srv, listen := s.stubServer(&healthyHandler{})
defer srv.Close()
- s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+ s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: listen},
Keepproxy: arvados.SystemServiceInstance{Listen: listen},
Keepstore: arvados.SystemServiceInstance{Listen: listen},
Keepweb: arvados.SystemServiceInstance{Listen: listen},
defer srvH.Close()
srvU, listenU := s.stubServer(&unhealthyHandler{})
defer srvU.Close()
- s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+ s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: listenH},
Keepproxy: arvados.SystemServiceInstance{Listen: listenH},
Keepstore: arvados.SystemServiceInstance{Listen: listenH},
Keepweb: arvados.SystemServiceInstance{Listen: listenH},
Websocket: arvados.SystemServiceInstance{Listen: listenH},
Workbench: arvados.SystemServiceInstance{Listen: listenH},
}
- s.handler.Config.Clusters["zzzzz"].SystemNodes["127.0.0.1"] = arvados.SystemNode{
+ s.handler.Config.Clusters["zzzzz"].NodeProfiles["127.0.0.1"] = arvados.NodeProfile{
Keepstore: arvados.SystemServiceInstance{Listen: listenU},
}
s.handler.ServeHTTP(s.resp, s.req)
s.handler.timeout = arvados.Duration(100 * time.Millisecond)
srv, listen := s.stubServer(&slowHandler{})
defer srv.Close()
- s.handler.Config.Clusters["zzzzz"].SystemNodes["localhost"] = arvados.SystemNode{
+ s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
Keepstore: arvados.SystemServiceInstance{Listen: listen},
}
s.handler.ServeHTTP(s.resp, s.req)
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+ "encoding/json"
+ "net/http"
+)
+
+type ErrorResponse struct {
+ Errors []string `json:"errors"`
+}
+
+func Error(w http.ResponseWriter, error string, code int) {
+ w.Header().Set("Content-Type", "application/json")
+ w.Header().Set("X-Content-Type-Options", "nosniff")
+ w.WriteHeader(code)
+ json.NewEncoder(w).Encode(ErrorResponse{Errors: []string{error}})
+}
name string
}
-var requestTimeContextKey = contextKey{"requestTime"}
-
-var Logger logrus.FieldLogger = logrus.StandardLogger()
+var (
+ requestTimeContextKey = contextKey{"requestTime"}
+ loggerContextKey = contextKey{"logger"}
+)
// LogRequests wraps an http.Handler, logging each request and
-// response via logrus.
-func LogRequests(h http.Handler) http.Handler {
+// response via logger.
+func LogRequests(logger logrus.FieldLogger, h http.Handler) http.Handler {
+ if logger == nil {
+ logger = logrus.StandardLogger()
+ }
return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
- req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
- lgr := Logger.WithFields(logrus.Fields{
+ lgr := logger.WithFields(logrus.Fields{
"RequestID": req.Header.Get("X-Request-Id"),
"remoteAddr": req.RemoteAddr,
"reqForwardedFor": req.Header.Get("X-Forwarded-For"),
"reqQuery": req.URL.RawQuery,
"reqBytes": req.ContentLength,
})
+ ctx := req.Context()
+ ctx = context.WithValue(ctx, &requestTimeContextKey, time.Now())
+ ctx = context.WithValue(ctx, &loggerContextKey, lgr)
+ req = req.WithContext(ctx)
+
logRequest(w, req, lgr)
defer logResponse(w, req, lgr)
h.ServeHTTP(w, req)
})
}
+func Logger(req *http.Request) logrus.FieldLogger {
+ if lgr, ok := req.Context().Value(&loggerContextKey).(logrus.FieldLogger); ok {
+ return lgr
+ } else {
+ return logrus.StandardLogger()
+ }
+}
+
func logRequest(w *responseTimer, req *http.Request, lgr *logrus.Entry) {
lgr.Info("request")
}
"encoding/json"
"net/http"
"net/http/httptest"
- "os"
"testing"
"time"
- log "github.com/Sirupsen/logrus"
+ "github.com/Sirupsen/logrus"
check "gopkg.in/check.v1"
)
type Suite struct{}
func (s *Suite) TestLogRequests(c *check.C) {
- defer log.SetOutput(os.Stdout)
captured := &bytes.Buffer{}
- log.SetOutput(captured)
- log.SetFormatter(&log.JSONFormatter{
+ log := logrus.New()
+ log.Out = captured
+ log.Formatter = &logrus.JSONFormatter{
TimestampFormat: time.RFC3339Nano,
- })
+ }
+
h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("hello world"))
})
req.Header.Set("X-Forwarded-For", "1.2.3.4:12345")
c.Assert(err, check.IsNil)
resp := httptest.NewRecorder()
- AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req)
+ AddRequestIDs(LogRequests(log, h)).ServeHTTP(resp, req)
dec := json.NewDecoder(captured)
def __init__(self, root, user_agent_pool=queue.LifoQueue(),
upload_counter=None,
download_counter=None,
- headers={}):
+ headers={},
+ insecure=False):
self.root = root
self._user_agent_pool = user_agent_pool
self._result = {'error': None}
self.put_headers = headers
self.upload_counter = upload_counter
self.download_counter = download_counter
+ self.insecure = insecure
def usable(self):
"""Is it worth attempting a request?"""
'{}: {}'.format(k,v) for k,v in self.get_headers.items()])
curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ if self.insecure:
+ curl.setopt(pycurl.SSL_VERIFYPEER, 0)
if method == "HEAD":
curl.setopt(pycurl.NOBODY, True)
self._setcurltimeouts(curl, timeout)
'{}: {}'.format(k,v) for k,v in self.put_headers.items()])
curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ if self.insecure:
+ curl.setopt(pycurl.SSL_VERIFYPEER, 0)
self._setcurltimeouts(curl, timeout)
try:
curl.perform()
if local_store is None:
local_store = os.environ.get('KEEP_LOCAL_STORE')
+ if api_client is None:
+ self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
+ else:
+ self.insecure = api_client.insecure
+
self.block_cache = block_cache if block_cache else KeepBlockCache()
self.timeout = timeout
self.proxy_timeout = proxy_timeout
root, self._user_agent_pool,
upload_counter=self.upload_counter,
download_counter=self.download_counter,
- headers=headers)
+ headers=headers,
+ insecure=self.insecure)
return local_roots
@staticmethod
root: self.KeepService(root, self._user_agent_pool,
upload_counter=self.upload_counter,
download_counter=self.download_counter,
- headers=headers)
+ headers=headers,
+ insecure=self.insecure)
for root in hint_roots
}
# SPDX-License-Identifier: Apache-2.0
daemon off;
-error_log stderr info; # Yes, must be specified here _and_ cmdline
+error_log "{{ERRORLOG}}" info; # Yes, must be specified here _and_ cmdline
events {
}
http {
- access_log {{ACCESSLOG}} combined;
+ access_log "{{ACCESSLOG}}" combined;
+ client_body_temp_path "{{TMPDIR}}";
upstream arv-git-http {
server localhost:{{GITPORT}};
}
server {
listen *:{{GITSSLPORT}} ssl default_server;
server_name _;
- ssl_certificate {{SSLCERT}};
- ssl_certificate_key {{SSLKEY}};
+ ssl_certificate "{{SSLCERT}}";
+ ssl_certificate_key "{{SSLKEY}}";
location / {
proxy_pass http://arv-git-http;
}
server {
listen *:{{KEEPPROXYSSLPORT}} ssl default_server;
server_name _;
- ssl_certificate {{SSLCERT}};
- ssl_certificate_key {{SSLKEY}};
+ ssl_certificate "{{SSLCERT}}";
+ ssl_certificate_key "{{SSLKEY}}";
location / {
proxy_pass http://keepproxy;
}
server {
listen *:{{KEEPWEBSSLPORT}} ssl default_server;
server_name ~^(?<request_host>.*)$;
- ssl_certificate {{SSLCERT}};
- ssl_certificate_key {{SSLKEY}};
+ ssl_certificate "{{SSLCERT}}";
+ ssl_certificate_key "{{SSLKEY}}";
location / {
proxy_pass http://keep-web;
proxy_set_header Host $request_host:{{KEEPWEBPORT}};
server {
listen *:{{KEEPWEBDLSSLPORT}} ssl default_server;
server_name ~.*;
- ssl_certificate {{SSLCERT}};
- ssl_certificate_key {{SSLKEY}};
+ ssl_certificate "{{SSLCERT}}";
+ ssl_certificate_key "{{SSLKEY}}";
location / {
proxy_pass http://keep-web;
proxy_set_header Host download:{{KEEPWEBPORT}};
server {
listen *:{{WSSPORT}} ssl default_server;
server_name ~^(?<request_host>.*)$;
- ssl_certificate {{SSLCERT}};
- ssl_certificate_key {{SSLKEY}};
+ ssl_certificate "{{SSLCERT}}";
+ ssl_certificate_key "{{SSLKEY}}";
location / {
proxy_pass http://ws;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
+ upstream controller {
+ server localhost:{{CONTROLLERPORT}};
+ }
+ server {
+ listen *:{{CONTROLLERSSLPORT}} ssl default_server;
+ server_name _;
+ ssl_certificate "{{SSLCERT}}";
+ ssl_certificate_key "{{SSLKEY}}";
+ location / {
+ proxy_pass http://controller;
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ }
+ }
}
format(port, timeout),
file=sys.stderr)
-def _fifo2stderr(label):
- """Create a fifo, and copy it to stderr, prepending label to each line.
+def _logfilename(label):
+ """Set up a labelled log file, and return a path to write logs to.
- Return value is the path to the new FIFO.
+ Normally, the returned path is {tmpdir}/{label}.log.
+
+ In debug mode, logs are also written to stderr, with [label]
+ prepended to each line. The returned path is a FIFO.
+label+ should contain only alphanumerics: it is also used as part
of the FIFO filename.
+
"""
+ logfilename = os.path.join(TEST_TMPDIR, label+'.log')
+ if not os.environ.get('ARVADOS_DEBUG', ''):
+ return logfilename
fifo = os.path.join(TEST_TMPDIR, label+'.fifo')
try:
os.remove(fifo)
if error.errno != errno.ENOENT:
raise
os.mkfifo(fifo, 0o700)
+ stdbuf = ['stdbuf', '-i0', '-oL', '-eL']
+ # open(fifo, 'r') would block waiting for someone to open the fifo
+ # for writing, so we need a separate cat process to open it for
+ # us.
+ cat = subprocess.Popen(
+ stdbuf+['cat', fifo],
+ stdin=open('/dev/null'),
+ stdout=subprocess.PIPE)
+ tee = subprocess.Popen(
+ stdbuf+['tee', '-a', logfilename],
+ stdin=cat.stdout,
+ stdout=subprocess.PIPE)
subprocess.Popen(
- ['stdbuf', '-i0', '-oL', '-eL', 'sed', '-e', 's/^/['+label+'] /', fifo],
+ stdbuf+['sed', '-e', 's/^/['+label+'] /'],
+ stdin=tee.stdout,
stdout=sys.stderr)
return fifo
kill_server_pid(_pidfile('api'))
my_api_host = None
+def run_controller():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ stop_controller()
+ rails_api_port = int(string.split(os.environ.get('ARVADOS_TEST_API_HOST', my_api_host), ':')[-1])
+ port = find_available_port()
+ conf = os.path.join(TEST_TMPDIR, 'arvados.yml')
+ with open(conf, 'w') as f:
+ f.write("""
+Clusters:
+ zzzzz:
+ NodeProfiles:
+ "*":
+ "arvados-controller":
+ Listen: ":{}"
+ "arvados-api-server":
+ Listen: ":{}"
+ TLS: true
+ """.format(port, rails_api_port))
+ logf = open(_logfilename('controller'), 'a')
+ controller = subprocess.Popen(
+ ["arvados-server", "controller", "-config", conf],
+ stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+ with open(_pidfile('controller'), 'w') as f:
+ f.write(str(controller.pid))
+ _wait_until_port_listens(port)
+ _setport('controller', port)
+ return port
+
+def stop_controller():
+ if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+ return
+ kill_server_pid(_pidfile('controller'))
+
def run_ws():
if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
return
_dbconfig('database'),
_dbconfig('username'),
_dbconfig('password')))
- logf = open(_fifo2stderr('ws'), 'w')
+ logf = open(_logfilename('ws'), 'a')
ws = subprocess.Popen(
["ws", "-config", conf],
stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
for arg, val in keep_args.items():
keep_cmd.append("{}={}".format(arg, val))
- logf = open(_fifo2stderr('keep{}'.format(n)), 'w')
+ logf = open(_logfilename('keep{}'.format(n)), 'a')
kp0 = subprocess.Popen(
keep_cmd, stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
port = find_available_port()
env = os.environ.copy()
env['ARVADOS_API_TOKEN'] = auth_token('anonymous')
- logf = open(_fifo2stderr('keepproxy'), 'w')
+ logf = open(_logfilename('keepproxy'), 'a')
kp = subprocess.Popen(
['keepproxy',
'-pid='+_pidfile('keepproxy'),
gitport = find_available_port()
env = os.environ.copy()
env.pop('ARVADOS_API_TOKEN', None)
- logf = open(_fifo2stderr('arv-git-httpd'), 'w')
+ logf = open(_logfilename('arv-git-httpd'), 'a')
agh = subprocess.Popen(
['arv-git-httpd',
'-repo-root='+gitdir+'/test',
keepwebport = find_available_port()
env = os.environ.copy()
env['ARVADOS_API_TOKEN'] = auth_token('anonymous')
- logf = open(_fifo2stderr('keep-web'), 'w')
+ logf = open(_logfilename('keep-web'), 'a')
keepweb = subprocess.Popen(
['keep-web',
'-allow-anonymous',
return
stop_nginx()
nginxconf = {}
+ nginxconf['CONTROLLERPORT'] = _getport('controller')
+ nginxconf['CONTROLLERSSLPORT'] = find_available_port()
nginxconf['KEEPWEBPORT'] = _getport('keep-web')
nginxconf['KEEPWEBDLSSLPORT'] = find_available_port()
nginxconf['KEEPWEBSSLPORT'] = find_available_port()
nginxconf['WSSPORT'] = _getport('wss')
nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
- nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
+ nginxconf['ACCESSLOG'] = _logfilename('nginx_access')
+ nginxconf['ERRORLOG'] = _logfilename('nginx_error')
+ nginxconf['TMPDIR'] = TEST_TMPDIR
conftemplatefile = os.path.join(MY_DIRNAME, 'nginx.conf')
conffile = os.path.join(TEST_TMPDIR, 'nginx.conf')
'-g', 'pid '+_pidfile('nginx')+';',
'-c', conffile],
env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+ _setport('controller-ssl', nginxconf['CONTROLLERSSLPORT'])
_setport('keep-web-dl-ssl', nginxconf['KEEPWEBDLSSLPORT'])
_setport('keep-web-ssl', nginxconf['KEEPWEBSSLPORT'])
_setport('keepproxy-ssl', nginxconf['KEEPPROXYSSLPORT'])
actions = [
'start', 'stop',
'start_ws', 'stop_ws',
+ 'start_controller', 'stop_controller',
'start_keep', 'stop_keep',
'start_keep_proxy', 'stop_keep_proxy',
'start_keep-web', 'stop_keep-web',
run_ws()
elif args.action == 'stop_ws':
stop_ws()
+ elif args.action == 'start_controller':
+ run_controller()
+ elif args.action == 'stop_controller':
+ stop_controller()
elif args.action == 'start_keep':
run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
elif args.action == 'stop_keep':
stop_keep_web()
elif args.action == 'start_nginx':
run_nginx()
+ print("export ARVADOS_API_HOST=0.0.0.0:{}".format(_getport('controller-ssl')))
elif args.action == 'stop_nginx':
stop_nginx()
else:
self.assertEqual('100::1', service.hostname)
self.assertEqual(10, service.port)
+ def test_insecure_disables_tls_verify(self):
+ api_client = self.mock_keep_services(count=1)
+ force_timeout = socket.timeout("timed out")
+
+ api_client.insecure = True
+ with tutil.mock_keep_responses(b'foo', 200) as mock:
+ keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
+ 0)
+
+ api_client.insecure = False
+ with tutil.mock_keep_responses(b'foo', 200) as mock:
+ keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
+ # getopt()==None here means we didn't change the
+ # default. If we were using real pycurl instead of a mock,
+ # it would return the default value 1.
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
+ None)
+
# test_*_timeout verify that KeepClient instructs pycurl to use
# the appropriate connection and read timeouts. They don't care
# whether pycurl actually exhibits the expected timeout behavior
def __getattr__(self, r):
if r == "api_token":
return "abc"
+ elif r == "insecure":
+ return False
else:
raise arvados.errors.KeepReadError()
keep_client = arvados.KeepClient(api_client=ApiMock(),
end
end
- if Rails.configuration.new_users_are_active
+ if Rails.configuration.new_users_are_active ||
+ Rails.configuration.auto_activate_users_from.include?(remote_user['uuid'][0..4])
# Update is_active to whatever it is at the remote end
user.is_active = remote_user['is_active']
elsif !remote_user['is_active']
before_validation :fill_field_defaults, :if => :new_record?
before_validation :validate_runtime_constraints
+ before_validation :set_default_preemptible_scheduling_parameter
before_validation :set_container
- before_validation :set_default_preemptable_scheduling_parameter
validates :command, :container_image, :output_path, :cwd, :presence => true
validates :output_ttl, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
validates :priority, numericality: { only_integer: true, greater_than_or_equal_to: 0, less_than_or_equal_to: 1000 }
end
end
- def set_default_preemptable_scheduling_parameter
+ def set_default_preemptible_scheduling_parameter
+ c = get_requesting_container()
if self.state == Committed
- # If preemptable instances (eg: AWS Spot Instances) are allowed,
+ # If preemptible instances (eg: AWS Spot Instances) are allowed,
# ask them on child containers by default.
- if Rails.configuration.preemptable_instances and
- !self.requesting_container_uuid.nil? and
- self.scheduling_parameters['preemptable'].nil?
- self.scheduling_parameters['preemptable'] = true
+ if Rails.configuration.preemptible_instances and !c.nil? and
+ self.scheduling_parameters['preemptible'].nil?
+ self.scheduling_parameters['preemptible'] = true
end
end
end
scheduling_parameters['partitions'].size)
errors.add :scheduling_parameters, "partitions must be an array of strings"
end
- if !Rails.configuration.preemptable_instances and scheduling_parameters['preemptable']
- errors.add :scheduling_parameters, "preemptable instances are not allowed"
+ if !Rails.configuration.preemptible_instances and scheduling_parameters['preemptible']
+ errors.add :scheduling_parameters, "preemptible instances are not allowed"
+ end
+ if scheduling_parameters.include? 'max_run_time' and
+ (!scheduling_parameters['max_run_time'].is_a?(Integer) ||
+ scheduling_parameters['max_run_time'] < 0)
+ errors.add :scheduling_parameters, "max_run_time must be positive integer"
end
end
end
end
def set_requesting_container_uuid
- return if !current_api_client_authorization
- if (c = Container.where('auth_uuid=?', current_api_client_authorization.uuid).select([:uuid, :priority]).first)
+ c = get_requesting_container()
+ if !c.nil?
self.requesting_container_uuid = c.uuid
self.priority = c.priority>0 ? 1 : 0
end
end
+
+ def get_requesting_container
+ return self.requesting_container_uuid if !self.requesting_container_uuid.nil?
+ return if !current_api_client_authorization
+ if (c = Container.where('auth_uuid=?', current_api_client_authorization.uuid).select([:uuid, :priority]).first)
+ return c
+ end
+ end
end
before_create :set_initial_username, :if => Proc.new { |user|
user.username.nil? and user.email
}
+ after_create :setup_on_activate
after_create :add_system_group_permission_link
after_create :invalidate_permissions_cache
after_create :auto_setup_new_user, :if => Proc.new { |user|
if !oid_login_perms.any?
# create openid login permission
- oid_login_perm = Link.create(link_class: 'permission',
+ oid_login_perm = Link.create!(link_class: 'permission',
name: 'can_login',
tail_uuid: self.email,
head_uuid: self.uuid,
### New user and & email settings
###
- # Config parameters to automatically setup new users.
+ # Config parameters to automatically setup new users. If enabled,
+ # this users will be able to self-activate. Enable this if you want
+ # to run an open instance where anyone can create an account and use
+ # the system without requiring manual approval.
+ #
# The params auto_setup_new_users_with_* are meaningful only when auto_setup_new_users is turned on.
# auto_setup_name_blacklist is a list of usernames to be blacklisted for auto setup.
auto_setup_new_users: false
auto_setup_new_users_with_repository: false
auto_setup_name_blacklist: [arvados, git, gitolite, gitolite-admin, root, syslog]
- # When new_users_are_active is set to true, the user agreement check is skipped.
+ # When new_users_are_active is set to true, new users will be active
+ # immediately. This skips the "self-activate" step which enforces
+ # user agreements. Should only be enabled for development.
new_users_are_active: false
# The e-mail address of the user you would like to become marked as an admin
### Crunch, DNS & compute node management
###
- # Preemptable instance support (e.g. AWS Spot Instances)
- # When true, child containers will get created with the preemptable
+ # Preemptible instance support (e.g. AWS Spot Instances)
+ # When true, child containers will get created with the preemptible
# scheduling parameter parameter set.
- preemptable_instances: false
+ preemptible_instances: false
# Docker image to be used when none found in runtime_constraints of a job
default_docker_image_for_jobs: false
# remote_hosts above.
remote_hosts_via_dns: false
+ # List of cluster prefixes. These are "trusted" clusters, users
+ # from the clusters listed here will be automatically setup and
+ # activated. This is separate from the settings
+ # auto_setup_new_users and new_users_are_active.
+ auto_activate_users_from: []
+
###
### Remaining assorted configuration options.
###
@fetched_commits[sha1] = ($? == 0)
end
- def tag_commit(commit_hash, tag_name)
+ def tag_commit(job, commit_hash, tag_name)
# @git_tags[T]==V if we know commit V has been tagged T in the
# arvados_internal repository.
if not @git_tags[tag_name]
next
end
ready &&= get_commit repo.server_path, job.script_version
- ready &&= tag_commit job.script_version, job.uuid
+ ready &&= tag_commit job, job.script_version, job.uuid
end
# This should be unnecessary, because API server does it during
# job create/update, but it's still not a bad idea to verify the
# tag is correct before starting the job:
- ready &&= tag_commit job.script_version, job.uuid
+ ready &&= tag_commit job, job.script_version, job.uuid
# The arvados_sdk_version doesn't support use of arbitrary
# remote URLs, so the requested version isn't necessarily copied
# into the internal repository yet.
if job.arvados_sdk_version
ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
- ready &&= tag_commit job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
+ ready &&= tag_commit job, job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
end
if not ready
# SPDX-License-Identifier: AGPL-3.0
module UpdatePriority
- # Clean up after races: if container priority>0 but there are no
- # committed container requests for it, reset priority to 0.
+ extend CurrentApiClient
+
+ # Clean up after races.
+ #
+ # If container priority>0 but there are no committed container
+ # requests for it, reset priority to 0.
+ #
+ # If container priority=0 but there are committed container requests
+ # for it with priority>0, update priority.
def self.update_priority
if !File.owned?(Rails.root.join('tmp'))
Rails.logger.warn("UpdatePriority: not owner of #{Rails.root}/tmp, skipping")
lockfile = Rails.root.join('tmp', 'update_priority.lock')
File.open(lockfile, File::RDWR|File::CREAT, 0600) do |f|
return unless f.flock(File::LOCK_NB|File::LOCK_EX)
- ActiveRecord::Base.connection.execute("UPDATE containers AS c SET priority=0 WHERE state='Queued' AND priority>0 AND uuid NOT IN (SELECT container_uuid FROM container_requests WHERE priority>0);")
+
+ # priority>0 but should be 0:
+ ActiveRecord::Base.connection.
+ exec_query("UPDATE containers AS c SET priority=0 WHERE state IN ('Queued', 'Locked', 'Running') AND priority>0 AND uuid NOT IN (SELECT container_uuid FROM container_requests WHERE priority>0 AND state='Committed');", 'UpdatePriority')
+
+ # priority==0 but should be >0:
+ act_as_system_user do
+ Container.
+ joins("JOIN container_requests ON container_requests.container_uuid=containers.uuid AND container_requests.state=#{Container.sanitize(ContainerRequest::Committed)} AND container_requests.priority>0").
+ where('containers.state IN (?) AND containers.priority=0 AND container_requests.uuid IS NOT NULL',
+ [Container::Queued, Container::Locked, Container::Running]).
+ map(&:update_priority!)
+ end
end
end
def check_update_whitelist permitted_fields
attribute_names.each do |field|
if !permitted_fields.include?(field.to_sym) && really_changed(field)
- errors.add field, "cannot be modified in this state (#{send(field+"_was").inspect}, #{send(field).inspect})"
+ errors.add field, "cannot be modified in state '#{self.state}' (#{send(field+"_was").inspect}, #{send(field).inspect})"
end
end
end
assert_response :success
assert_equal 'zbbbb-tpzed-000000000000000', json_response['uuid']
assert_equal false, json_response['is_admin']
+ assert_equal false, json_response['is_active']
assert_equal 'foo@example.com', json_response['email']
assert_equal 'barney', json_response['username']
refute_includes(group_uuids, groups(:trashed_project).uuid)
refute_includes(group_uuids, groups(:testusergroup_admins).uuid)
end
+
+ test 'auto-activate user from trusted cluster' do
+ Rails.configuration.auto_activate_users_from = ['zbbbb']
+ get '/arvados/v1/users/current', {format: 'json'}, auth(remote: 'zbbbb')
+ assert_response :success
+ assert_equal 'zbbbb-tpzed-000000000000000', json_response['uuid']
+ assert_equal false, json_response['is_admin']
+ assert_equal true, json_response['is_active']
+ assert_equal 'foo@example.com', json_response['email']
+ assert_equal 'barney', json_response['username']
+ end
+
+ test 'pre-activate remote user' do
+ post '/arvados/v1/users', {
+ "user" => {
+ "uuid" => "zbbbb-tpzed-000000000000000",
+ "email" => 'foo@example.com',
+ "username" => 'barney',
+ "is_active" => true
+ }
+ }, {'HTTP_AUTHORIZATION' => "OAuth2 #{api_token(:admin)}"}
+ assert_response :success
+
+ get '/arvados/v1/users/current', {format: 'json'}, auth(remote: 'zbbbb')
+ assert_response :success
+ assert_equal 'zbbbb-tpzed-000000000000000', json_response['uuid']
+ assert_equal nil, json_response['is_admin']
+ assert_equal true, json_response['is_active']
+ assert_equal 'foo@example.com', json_response['email']
+ assert_equal 'barney', json_response['username']
+ end
+
end
[
[false, ActiveRecord::RecordInvalid],
[true, nil],
- ].each do |preemptable_conf, expected|
- test "having Rails.configuration.preemptable_instances=#{preemptable_conf}, create preemptable container request and verify #{expected}" do
- sp = {"preemptable" => true}
+ ].each do |preemptible_conf, expected|
+ test "having Rails.configuration.preemptible_instances=#{preemptible_conf}, create preemptible container request and verify #{expected}" do
+ sp = {"preemptible" => true}
common_attrs = {cwd: "test",
priority: 1,
command: ["echo", "hello"],
output_path: "test",
scheduling_parameters: sp,
mounts: {"test" => {"kind" => "json"}}}
- Rails.configuration.preemptable_instances = preemptable_conf
+ Rails.configuration.preemptible_instances = preemptible_conf
set_user_from_auth :active
cr = create_minimal_req!(common_attrs)
'zzzzz-dz642-runningcontainr',
nil,
].each do |requesting_c|
- test "having preemptable instances active on the API server, a committed #{requesting_c.nil? ? 'non-':''}child CR should not ask for preemptable instance if parameter already set to false" do
+ test "having preemptible instances active on the API server, a committed #{requesting_c.nil? ? 'non-':''}child CR should not ask for preemptible instance if parameter already set to false" do
common_attrs = {cwd: "test",
priority: 1,
command: ["echo", "hello"],
output_path: "test",
- scheduling_parameters: {"preemptable" => false},
+ scheduling_parameters: {"preemptible" => false},
mounts: {"test" => {"kind" => "json"}}}
- Rails.configuration.preemptable_instances = true
+ Rails.configuration.preemptible_instances = true
set_user_from_auth :active
if requesting_c
cr.state = ContainerRequest::Committed
cr.save!
- assert_equal false, cr.scheduling_parameters['preemptable']
+ assert_equal false, cr.scheduling_parameters['preemptible']
end
end
[true, nil, nil],
[false, 'zzzzz-dz642-runningcontainr', nil],
[false, nil, nil],
- ].each do |preemptable_conf, requesting_c, schedule_preemptable|
- test "having Rails.configuration.preemptable_instances=#{preemptable_conf}, #{requesting_c.nil? ? 'non-':''}child CR should #{schedule_preemptable ? '':'not'} ask for preemptable instance by default" do
+ ].each do |preemptible_conf, requesting_c, schedule_preemptible|
+ test "having Rails.configuration.preemptible_instances=#{preemptible_conf}, #{requesting_c.nil? ? 'non-':''}child CR should #{schedule_preemptible ? '':'not'} ask for preemptible instance by default" do
common_attrs = {cwd: "test",
priority: 1,
command: ["echo", "hello"],
output_path: "test",
mounts: {"test" => {"kind" => "json"}}}
- Rails.configuration.preemptable_instances = preemptable_conf
+ Rails.configuration.preemptible_instances = preemptible_conf
set_user_from_auth :active
if requesting_c
cr.state = ContainerRequest::Committed
cr.save!
- assert_equal schedule_preemptable, cr.scheduling_parameters['preemptable']
+ assert_equal schedule_preemptible, cr.scheduling_parameters['preemptible']
end
end
[{"partitions" => "fastcpu"}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
[{"partitions" => "fastcpu"}, ContainerRequest::Uncommitted],
[{"partitions" => ["fastcpu","vfastcpu"]}, ContainerRequest::Committed],
+ [{"max_run_time" => "one day"}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+ [{"max_run_time" => "one day"}, ContainerRequest::Uncommitted],
+ [{"max_run_time" => -1}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+ [{"max_run_time" => -1}, ContainerRequest::Uncommitted],
+ [{"max_run_time" => 86400}, ContainerRequest::Committed],
].each do |sp, state, expected|
test "create container request with scheduling_parameters #{sp} in state #{state} and verify #{expected}" do
common_attrs = {cwd: "test",
end
end
+ test "Having preemptible_instances=true create a committed child container request and verify the scheduling parameter of its container" do
+ common_attrs = {cwd: "test",
+ priority: 1,
+ command: ["echo", "hello"],
+ output_path: "test",
+ state: ContainerRequest::Committed,
+ mounts: {"test" => {"kind" => "json"}}}
+ set_user_from_auth :active
+ Rails.configuration.preemptible_instances = true
+
+ cr = with_container_auth(Container.find_by_uuid 'zzzzz-dz642-runningcontainr') do
+ create_minimal_req!(common_attrs)
+ end
+ assert_equal 'zzzzz-dz642-runningcontainr', cr.requesting_container_uuid
+ assert_equal true, cr.scheduling_parameters["preemptible"]
+
+ c = Container.find_by_uuid(cr.container_uuid)
+ assert_equal true, c.scheduling_parameters["preemptible"]
+ end
+
[['Committed', true, {name: "foobar", priority: 123}],
['Committed', false, {container_count: 2}],
['Committed', false, {container_count: 0}],
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require 'test_helper'
+require 'update_priority'
+
+class UpdatePriorityTest < ActiveSupport::TestCase
+ test 'priority 0 but should be >0' do
+ uuid = containers(:running).uuid
+ ActiveRecord::Base.connection.exec_query('UPDATE containers SET priority=0 WHERE uuid=$1', 'test-setup', [[nil, uuid]])
+ assert_equal 0, Container.find_by_uuid(uuid).priority
+ UpdatePriority.update_priority
+ assert_operator 0, :<, Container.find_by_uuid(uuid).priority
+
+ uuid = containers(:queued).uuid
+ ActiveRecord::Base.connection.exec_query('UPDATE containers SET priority=0 WHERE uuid=$1', 'test-setup', [[nil, uuid]])
+ assert_equal 0, Container.find_by_uuid(uuid).priority
+ UpdatePriority.update_priority
+ assert_operator 0, :<, Container.find_by_uuid(uuid).priority
+ end
+
+ test 'priority>0 but should be 0' do
+ uuid = containers(:running).uuid
+ ActiveRecord::Base.connection.exec_query('DELETE FROM container_requests WHERE container_uuid=$1', 'test-setup', [[nil, uuid]])
+ assert_operator 0, :<, Container.find_by_uuid(uuid).priority
+ UpdatePriority.update_priority
+ assert_equal 0, Container.find_by_uuid(uuid).priority
+ end
+end
assert_equal(expect_username, user.username)
# check user setup
- verify_link_exists(Rails.configuration.auto_setup_new_users,
+ verify_link_exists(Rails.configuration.auto_setup_new_users || active,
groups(:all_users).uuid, user.uuid,
"permission", "can_read")
# Check for OID login link.
- verify_link_exists(Rails.configuration.auto_setup_new_users,
+ verify_link_exists(Rails.configuration.auto_setup_new_users || active,
user.uuid, user.email, "permission", "can_login")
# Check for repository.
if named_repo = (prior_repo or
}
for _, trial := range []struct {
- types []arvados.InstanceType
+ types map[string]arvados.InstanceType
sbatchArgs []string
err error
}{
// Choose node type => use --constraint arg
{
- types: []arvados.InstanceType{
- {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
- {Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
- {Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
- {Name: "a1.large", Price: 0.16, RAM: 1024000000, VCPUs: 8},
+ types: map[string]arvados.InstanceType{
+ "a1.tiny": {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+ "a1.small": {Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
+ "a1.medium": {Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
+ "a1.large": {Name: "a1.large", Price: 0.16, RAM: 1024000000, VCPUs: 8},
},
sbatchArgs: []string{"--constraint=instancetype=a1.medium"},
},
},
// No node type is big enough => error
{
- types: []arvados.InstanceType{
- {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+ types: map[string]arvados.InstanceType{
+ "a1.tiny": {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
},
err: dispatchcloud.ConstraintsNotSatisfiableError{},
},
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
func (runner *ContainerRunner) WaitFinish() error {
+ var runTimeExceeded <-chan time.Time
runner.CrunchLog.Print("Waiting for container to finish")
waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
arvMountExit := runner.ArvMountExit
+ if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
+ runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
+ }
for {
select {
case waitBody := <-waitOk:
// arvMountExit will always be ready now that
// it's closed, but that doesn't interest us.
arvMountExit = nil
+
+ case <-runTimeExceeded:
+ runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.")
+ runner.stop(nil)
+ runTimeExceeded = nil
}
}
}
"mounts": {"/tmp": {"kind": "tmp"} },
"output_path": "/tmp",
"priority": 1,
- "runtime_constraints": {}
+ "runtime_constraints": {}
}`, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, "hello world\n"))
t.logWriter.Close()
}
+func (s *TestSuite) TestRunTimeExceeded(c *C) {
+ api, _, _ := s.fullRunHelper(c, `{
+ "command": ["sleep", "3"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {},
+ "scheduling_parameters":{"max_run_time": 1}
+}`, nil, 0, func(t *TestDockerClient) {
+ time.Sleep(3 * time.Second)
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
+}
+
func (s *TestSuite) TestCrunchstat(c *C) {
api, _, _ := s.fullRunHelper(c, `{
"command": ["sleep", "1"],
if err != nil {
log.Fatal(err)
}
- nodeCfg, err := clusterCfg.GetThisSystemNode()
+ nodeCfg, err := clusterCfg.GetNodeProfile("")
if err != nil {
log.Fatal(err)
}
}
func (srv *server) Start() error {
- srv.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(&handler{Config: srv.Config}))
+ srv.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(nil, &handler{Config: srv.Config}))
srv.Addr = srv.Config.Listen
return srv.Server.Start()
}
// Start serving requests.
router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
- http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
+ http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(nil, router)))
log.Println("shutting down")
}
systemAuthToken string
debugLogf func(string, ...interface{})
- ManagementToken string
+ ManagementToken string `doc: The secret key that must be provided by monitoring services
+wishing to access the health check endpoint (/_health).`
metrics
}
mux := http.NewServeMux()
mux.Handle("/", theConfig.metrics.Instrument(
- httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter))))
+ httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter))))
mux.HandleFunc("/metrics.json", theConfig.metrics.exportJSON)
mux.Handle("/metrics", theConfig.metrics.exportProm)
"VolumeSize": volsize,
"VolumeType": "gp2"
}}]
- if size.preemptable:
+ if size.preemptible:
# Request a Spot instance for this node
kw['ex_spot_market'] = True
return kw
section_types = {
'instance_type': str,
'price': float,
- 'preemptable': bool,
+ 'preemptible': bool,
}
for sec_name in self.sections():
sec_words = sec_name.split(None, 2)
if sec_words[0] != 'Size':
continue
size_spec = self.get_section(sec_name, section_types, int)
- if 'preemptable' not in size_spec:
- size_spec['preemptable'] = False
+ if 'preemptible' not in size_spec:
+ size_spec['preemptible'] = False
if 'instance_type' not in size_spec:
# Assume instance type is Size name if missing
size_spec['instance_type'] = sec_words[1]
self.cores = 0
self.bandwidth = 0
self.price = 9999999
- self.preemptable = False
+ self.preemptible = False
self.extra = {}
def meets_constraints(self, **kwargs):
self.disk = 0
self.scratch = self.disk * 1000
self.ram = int(self.ram * node_mem_scaling)
- self.preemptable = False
+ self.preemptible = False
for name, override in kwargs.iteritems():
if name == 'instance_type': continue
if not hasattr(self, name):
# You may also want to define the amount of scratch space (expressed
# in GB) for Crunch jobs. You can also override Amazon's provided
# data fields (such as price per hour) by setting them here.
+#
+# Additionally, you can ask for a preemptible instance (AWS's spot instance)
+# by adding the appropriate boolean configuration flag. If you want to have
+# both spot & reserved versions of the same size, you can do so by renaming
+# the Size section and specifying the instance type inside it.
[Size m4.large]
cores = 2
price = 0.126
scratch = 100
+[Size m4.large.spot]
+instance_type = m4.large
+preemptible = true
+cores = 2
+price = 0.126
+scratch = 100
+
[Size m4.xlarge]
cores = 4
price = 0.252
create_method.call_args[1].get('ex_metadata', {'arg': 'missing'}).items()
)
- def test_create_preemptable_instance(self):
+ def test_create_preemptible_instance(self):
arv_node = testutil.arvados_node_mock()
driver = self.new_driver()
- driver.create_node(testutil.MockSize(1, preemptable=True), arv_node)
+ driver.create_node(testutil.MockSize(1, preemptible=True), arv_node)
create_method = self.driver_mock().create_node
self.assertTrue(create_method.called)
self.assertEqual(
cores = 1
price = 0.8
-[Size 1.preemptable]
+[Size 1.preemptible]
instance_type = 1
-preemptable = true
+preemptible = true
cores = 1
price = 0.8
self.assertEqual('Small', size.name)
self.assertEqual(1, kwargs['cores'])
self.assertEqual(0.8, kwargs['price'])
- # preemptable is False by default
- self.assertEqual(False, kwargs['preemptable'])
+ # preemptible is False by default
+ self.assertEqual(False, kwargs['preemptible'])
# instance_type == arvados node size id by default
self.assertEqual(kwargs['id'], kwargs['instance_type'])
- # Now retrieve the preemptable version
+ # Now retrieve the preemptible version
size, kwargs = sizes[1]
self.assertEqual('Small', size.name)
- self.assertEqual('1.preemptable', kwargs['id'])
+ self.assertEqual('1.preemptible', kwargs['id'])
self.assertEqual(1, kwargs['cores'])
self.assertEqual(0.8, kwargs['price'])
- self.assertEqual(True, kwargs['preemptable'])
+ self.assertEqual(True, kwargs['preemptible'])
self.assertEqual('1', kwargs['instance_type'])
class MockSize(object):
- def __init__(self, factor, preemptable=False):
+ def __init__(self, factor, preemptible=False):
self.id = 'z{}.test'.format(factor)
self.name = 'test size '+self.id
self.ram = 128 * factor
self.price = float(factor)
self.extra = {}
self.real = self
- self.preemptable = preemptable
+ self.preemptible = preemptible
def __eq__(self, other):
return self.id == other.id