. "$VENVDIR/bin/activate"
# Needed for run_test_server.py which is used by certain (non-Python) tests.
- pip install --no-cache-dir PyYAML \
+ pip install --no-cache-dir PyYAML future \
|| fatal "pip install PyYAML failed"
# Preinstall libcloud if using a fork; otherwise nodemanager "pip
}
check_arvados_config() {
+ if [[ "$1" = "env" ]] ; then
+ return
+ fi
if [[ -z "$ARVADOS_CONFIG" ]] ; then
# Create config file. The run_test_server script requires PyYAML,
# so virtualenv needs to be active. Downstream steps like
# workbench install which require a valid config.yml.
- if [[ (! -s "$VENVDIR/bin/activate") && "$1" != "env" ]] ; then
+ if [[ ! -s "$VENVDIR/bin/activate" ]] ; then
install_env
fi
. "$VENVDIR/bin/activate"
&& git --git-dir internal.git init \
|| return 1
- cd "$WORKSPACE/services/api" \
- && RAILS_ENV=test bundle exec rails db:environment:set \
- && RAILS_ENV=test bundle exec rake db:drop \
- && RAILS_ENV=test bundle exec rake db:setup \
- && RAILS_ENV=test bundle exec rake db:fixtures:load
+
+ (cd "$WORKSPACE/services/api"
+ export RAILS_ENV=test
+ if bundle exec rails db:environment:set ; then
+ bundle exec rake db:drop
+ fi
+ bundle exec rake db:setup \
+ && bundle exec rake db:fixtures:load
+ )
}
declare -a pythonstuff
</code></pre>
</notextile>
-h2. Create a dispatcher token
+h2. Configure the dispatcher (optional)
-Create an Arvados superuser token for use by the dispatcher. If you have multiple dispatch processes, you should give each one a different token.
+Crunch-dispatch-slurm reads the common configuration file at @/etc/arvados/config.yml@. The essential configuration parameters will already be set by previous install steps, so no additional configuration is required. The following sections describe optional configuration parameters.
-{% include 'create_superuser_token' %}
+h3(#PollPeriod). Containers.PollInterval
-h2. Configure the dispatcher
-
-Set up crunch-dispatch-slurm's configuration directory:
+crunch-dispatch-slurm polls the API server periodically for new containers to run. The @PollInterval@ option controls how often this poll happens. Set this to a string of numbers suffixed with one of the time units @ns@, @us@, @ms@, @s@, @m@, or @h@. For example:
<notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/arvados</span>
-~$ <span class="userinput">sudo install -d -o root -g <b>crunch</b> -m 0750 /etc/arvados/crunch-dispatch-slurm</span>
+<pre>
+Clusters:
+ zzzzz:
+ Containers:
+ <code class="userinput">PollInterval: <b>3m30s</b>
</code></pre>
</notextile>
-Edit @/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml@ to authenticate to your Arvados API server, using the token you generated in the previous step. Follow this YAML format:
+h3(#ReserveExtraRAM). Containers.ReserveExtraRAM: Extra RAM for jobs
+
+Extra RAM to reserve (in bytes) on each SLURM job submitted by Arvados, which is added to the amount specified in the container's @runtime_constraints@. If not provided, the default value is zero. Helpful when using @-cgroup-parent-subsystem@, where @crunch-run@ and @arv-mount@ share the control group memory limit with the user process. In this situation, at least 256MiB is recommended to accomodate each container's @crunch-run@ and @arv-mount@ processes.
+
+Supports suffixes @KB@, @KiB@, @MB@, @MiB@, @GB@, @GiB@, @TB@, @TiB@, @PB@, @PiB@, @EB@, @EiB@ (where @KB@ is 10[^3^], @KiB@ is 2[^10^], @MB@ is 10[^6^], @MiB@ is 2[^20^] and so forth).
<notextile>
-<pre><code class="userinput">Client:
- APIHost: <b>zzzzz.arvadosapi.com</b>
- AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
-</code></pre>
+<pre>
+Clusters:
+ zzzzz:
+ Containers:
+ <code class="userinput">ReserveExtraRAM: <b>256MiB</b></code>
+</pre>
</notextile>
-This is the only configuration required by crunch-dispatch-slurm. The subsections below describe optional configuration flags you can set inside the main configuration object.
-
-h3(#KeepServiceURIs). Client::KeepServiceURIs
+h3(#MinRetryPeriod). Containers.MinRetryPeriod: Rate-limit repeated attempts to start containers
-Override Keep service discovery with a predefined list of Keep URIs. This can be useful if the compute nodes run a local keepstore that should handle all Keep traffic. Example:
+If SLURM is unable to run a container, the dispatcher will submit it again after the next PollPeriod. If PollPeriod is very short, this can be excessive. If MinRetryPeriod is set, the dispatcher will avoid submitting the same container to SLURM more than once in the given time span.
<notextile>
-<pre><code class="userinput">Client:
- APIHost: zzzzz.arvadosapi.com
- AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
- KeepServiceURIs:
- - <b>http://127.0.0.1:25107</b>
-</code></pre>
+<pre>
+Clusters:
+ zzzzz:
+ Containers:
+ <code class="userinput">MinRetryPeriod: <b>30s</b></code>
+</pre>
</notextile>
-h3(#PollPeriod). PollPeriod
+h3(#KeepServiceURIs). Containers.SLURM.SbatchEnvironmentVariables
-crunch-dispatch-slurm polls the API server periodically for new containers to run. The @PollPeriod@ option controls how often this poll happens. Set this to a string of numbers suffixed with one of the time units @ns@, @us@, @ms@, @s@, @m@, or @h@. For example:
+Some Arvados installations run a local keepstore on each compute node to handle all Keep traffic. To override Keep service discovery and access the local keep server instead of the global servers, set ARVADOS_KEEP_SERVICES in SbatchEnvironmentVariables:
<notextile>
-<pre><code class="userinput">PollPeriod: <b>3m30s</b>
+<pre>
+Clusters:
+ zzzzz:
+ Containers:
+ SLURM:
+ <span class="userinput">SbatchEnvironmentVariables:
+ ARVADOS_KEEP_SERVICES: "http://127.0.0.1:25107"</span>
</code></pre>
</notextile>
-h3(#PrioritySpread). PrioritySpread
+h3(#PrioritySpread). Containers.SLURM.PrioritySpread
crunch-dispatch-slurm adjusts the "nice" values of its SLURM jobs to ensure containers are prioritized correctly relative to one another. This option tunes the adjustment mechanism.
* If non-Arvados jobs run on your SLURM cluster, and your Arvados containers are waiting too long in the SLURM queue because their "nice" values are too high for them to compete with other SLURM jobs, you should use a smaller PrioritySpread value.
The smallest usable value is @1@. The default value of @10@ is used if this option is zero or negative. Example:
<notextile>
-<pre><code class="userinput">PrioritySpread: <b>1000</b>
-</code></pre>
+<pre>
+Clusters:
+ zzzzz:
+ Containers:
+ SLURM:
+ <code class="userinput">PrioritySpread: <b>1000</b></code></pre>
</notextile>
-h3(#SbatchArguments). SbatchArguments
+h3(#SbatchArguments). Containers.SLURM.SbatchArgumentsList
When crunch-dispatch-slurm invokes @sbatch@, you can add arguments to the command by specifying @SbatchArguments@. You can use this to send the jobs to specific cluster partitions or add resource requests. Set @SbatchArguments@ to an array of strings. For example:
<notextile>
-<pre><code class="userinput">SbatchArguments:
-- <b>"--partition=PartitionName"</b>
-</code></pre>
+<pre>
+Clusters:
+ zzzzz:
+ Containers:
+ SLURM:
+ <code class="userinput">SbatchArgumentsList:
+ - <b>"--partition=PartitionName"</b></code>
+</pre>
</notextile>
Note: If an argument is supplied multiple times, @slurm@ uses the value of the last occurrence of the argument on the command line. Arguments specified through Arvados are added after the arguments listed in SbatchArguments. This means, for example, an Arvados container with that specifies @partitions@ in @scheduling_parameter@ will override an occurrence of @--partition@ in SbatchArguments. As a result, for container parameters that can be specified through Arvados, SbatchArguments can be used to specify defaults but not enforce specific policy.
-h3(#CrunchRunCommand-cgroups). CrunchRunCommand: Dispatch to SLURM cgroups
+h3(#CrunchRunCommand-cgroups). Containers.CrunchRunArgumentList: Dispatch to SLURM cgroups
If your SLURM cluster uses the @task/cgroup@ TaskPlugin, you can configure Crunch's Docker containers to be dispatched inside SLURM's cgroups. This provides consistent enforcement of resource constraints. To do this, use a crunch-dispatch-slurm configuration like the following:
<notextile>
-<pre><code class="userinput">CrunchRunCommand:
-- <b>crunch-run</b>
-- <b>"-cgroup-parent-subsystem=memory"</b>
-</code></pre>
+<pre>
+Clusters:
+ zzzzz:
+ Containers:
+ <code class="userinput">CrunchRunArgumentsList:
+ - <b>"-cgroup-parent-subsystem=memory"</b></code>
+</pre>
</notextile>
The choice of subsystem ("memory" in this example) must correspond to one of the resource types enabled in SLURM's @cgroup.conf@. Limits for other resource types will also be respected. The specified subsystem is singled out only to let Crunch determine the name of the cgroup provided by SLURM. When doing this, you should also set "ReserveExtraRAM":#ReserveExtraRAM .
{% include 'notebox_end' %}
-h3(#CrunchRunCommand-network). CrunchRunCommand: Using host networking for containers
+h3(#CrunchRunCommand-network). Containers.CrunchRunArgumentList: Using host networking for containers
Older Linux kernels (prior to 3.18) have bugs in network namespace handling which can lead to compute node lockups. This by is indicated by blocked kernel tasks in "Workqueue: netns cleanup_net". If you are experiencing this problem, as a workaround you can disable use of network namespaces by Docker across the cluster. Be aware this reduces container isolation, which may be a security risk.
<notextile>
-<pre><code class="userinput">CrunchRunCommand:
-- <b>crunch-run</b>
-- <b>"-container-enable-networking=always"</b>
-- <b>"-container-network-mode=host"</b>
-</code></pre>
-</notextile>
-
-h3(#MinRetryPeriod). MinRetryPeriod: Rate-limit repeated attempts to start containers
-
-If SLURM is unable to run a container, the dispatcher will submit it again after the next PollPeriod. If PollPeriod is very short, this can be excessive. If MinRetryPeriod is set, the dispatcher will avoid submitting the same container to SLURM more than once in the given time span.
-
-<notextile>
-<pre><code class="userinput">MinRetryPeriod: <b>30s</b>
-</code></pre>
-</notextile>
-
-h3(#ReserveExtraRAM). ReserveExtraRAM: Extra RAM for jobs
-
-Extra RAM to reserve (in bytes) on each SLURM job submitted by Arvados, which is added to the amount specified in the container's @runtime_constraints@. If not provided, the default value is zero. Helpful when using @-cgroup-parent-subsystem@, where @crunch-run@ and @arv-mount@ share the control group memory limit with the user process. In this situation, at least 256MiB is recommended to accomodate each container's @crunch-run@ and @arv-mount@ processes.
-
-<notextile>
-<pre><code class="userinput">ReserveExtraRAM: <b>268435456</b>
-</code></pre>
+<pre>
+Clusters:
+ zzzzz:
+ Containers:
+ <code class="userinput">CrunchRunArgumentsList:
+ - <b>"-container-enable-networking=always"</b>
+ - <b>"-container-network-mode=host"</b></code>
+</pre>
</notextile>
h2. Restart the dispatcher
<pre><code>~$ <span class="userinput">arvados-ws -h</span>
Usage of arvados-ws:
-config path
- path to config file (default "/etc/arvados/ws/ws.yml")
+ path to config file (default "/etc/arvados/config.yml")
-dump-config
show current configuration and exit
</code></pre>
</notextile>
-h3. Create a configuration file
+h3. Update cluster config
-Create @/etc/arvados/ws/ws.yml@ using the following template. Replace @xxxxxxxx@ with the "password you generated during database setup":install-postgresql.html#api.
+Edit the cluster config at @/etc/arvados/config.yml@ and set @Services.Websocket.ExternalURL@ and @Services.Websocket.InternalURLs@. Replace @zzzzz@ with your cluster id.
<notextile>
-<pre><code>Client:
- APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
-Listen: ":<span class="userinput">9003</span>"
-Postgres:
- dbname: arvados_production
- host: localhost
- password: <span class="userinput">xxxxxxxx</span>
- user: arvados
-</code></pre>
+<pre><code>Clusters:
+ zzzzz:
+ Services:
+ <span class="userinput">Websocket:
+ ExternalURL: wss://ws.uuid_prefix.your.domain/websocket
+ InternalURLs:
+ "http://localhost:9003": {}
+</span></code></pre>
</notextile>
h3. Start the service (option 1: systemd)
h3. Update API server configuration
-Ensure the websocket server address is correct in the API server configuration file @/etc/arvados/api/application.yml@.
-
-<notextile>
-<pre><code>websocket_address: wss://ws.<span class="userinput">uuid_prefix.your.domain</span>/websocket
-</code></pre>
-</notextile>
-
Restart Nginx to reload the API server configuration.
<notextile>
h3. Verify DNS and proxy setup
-Use a host elsewhere on the Internet to confirm that your DNS, proxy, and SSL are configured correctly.
+Use a host elsewhere on the Internet to confirm that your DNS, proxy, and SSL are configured correctly. For @Authorization: Bearer xxxx@ replace @xxxx@ with the value from @ManagementToken@ in @config.yml@.
<notextile>
-<pre><code>$ <span class="userinput">curl https://ws.<b>uuid_prefix.your.domain</b>/status.json</span>
-{"Clients":1}
+<pre><code>$ <span class="userinput">curl -H "Authorization: Bearer xxxx" https://ws.<b>uuid_prefix.your.domain</b>/_health/ping</span>
+{"health":"OK"}
</code></pre>
</notextile>
|==--debug==| Print even more logging|
|==--metrics==| Print timing metrics|
|==--tool-help==| Print command line help for tool|
-|==--enable-reuse==| Enable job or container reuse (default)|
-|==--disable-reuse==| Disable job or container reuse|
-|==--project-uuid UUID==| Project that will own the workflow jobs, if not provided, will go to home project.|
+|==--enable-reuse==| Enable container reuse (default)|
+|==--disable-reuse==| Disable container reuse|
+|==--project-uuid UUID==| Project that will own the workflow containers, if not provided, will go to home project.|
|==--output-name OUTPUT_NAME==|Name to use for collection that stores the final output.|
|==--output-tags OUTPUT_TAGS==|Tags for the final output collection separated by commas, e.g., =='--output-tags tag0,tag1,tag2'==.|
-|==--ignore-docker-for-reuse==|Ignore Docker image version when deciding whether to reuse past jobs.|
+|==--ignore-docker-for-reuse==|Ignore Docker image version when deciding whether to reuse past containers.|
|==--submit==| Submit workflow runner to Arvados to manage the workflow (default).|
-|==--local==| Run workflow on local host (still submits jobs to Arvados).|
+|==--local==| Run workflow on local host (still submits containers to Arvados).|
|==--create-template==| (Deprecated) synonym for --create-workflow.|
-|==--create-workflow==| Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.|
+|==--create-workflow==| Register an Arvados workflow that can be run from Workbench|
|==--update-workflow== UUID|Update an existing Arvados workflow or pipeline template with the given UUID.|
-|==--wait==| After submitting workflow runner job, wait for completion.|
-|==--no-wait==| Submit workflow runner job and exit.|
+|==--wait==| After submitting workflow runner, wait for completion.|
+|==--no-wait==| Submit workflow runner and exit.|
|==--log-timestamps==| Prefix logging lines with timestamp|
|==--no-log-timestamps==| No timestamp on logging lines|
-|==--api== {jobs,containers}|Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.|
+|==--api== {containers}|Select work submission API. Only supports 'containers'|
|==--compute-checksum==| Compute checksum of contents while collecting outputs|
-|==--submit-runner-ram== SUBMIT_RUNNER_RAM|RAM (in MiB) required for the workflow runner job (default 1024)|
-|==--submit-runner-image== SUBMIT_RUNNER_IMAGE|Docker image for workflow runner job|
+|==--submit-runner-ram== SUBMIT_RUNNER_RAM|RAM (in MiB) required for the workflow runner (default 1024)|
+|==--submit-runner-image== SUBMIT_RUNNER_IMAGE|Docker image for workflow runner|
|==--always-submit-runner==|When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool|
|==--submit-request-uuid== UUID|Update and commit to supplied container request instead of creating a new one (containers API only).|
|==--submit-runner-cluster== CLUSTER_ID|Submit workflow runner to a remote cluster (containers API only)|
|==--storage-classes== STORAGE_CLASSES|Specify comma separated list of storage classes to be used when saving workflow output to Keep.|
|==--intermediate-output-ttl== N|If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).|
|==--priority== PRIORITY|Workflow priority (range 1..1000, higher has precedence over lower, containers api only)|
-|==--thread-count== THREAD_COUNT|Number of threads to use for job submit and output collection.|
+|==--thread-count== THREAD_COUNT|Number of threads to use for container submit and output collection.|
|==--http-timeout== HTTP_TIMEOUT|API request timeout in seconds. Default is 300 seconds (5 minutes).|
|==--trash-intermediate==|Immediately trash intermediate outputs on workflow success.|
|==--no-trash-intermediate==|Do not trash intermediate outputs (default).|
h3(#local). Control a workflow locally
-To run a workflow with local control, use @--local@. This means that the host where you run @arvados-cwl-runner@ will be responsible for submitting jobs, however, the jobs themselves will still run on the Arvados cluster. With @--local@, if you interrupt @arvados-cwl-runner@ or log out, the workflow will be terminated.
+To run a workflow with local control, use @--local@. This means that the host where you run @arvados-cwl-runner@ will be responsible for submitting containers, however, the containers themselves will still run on the Arvados cluster. With @--local@, if you interrupt @arvados-cwl-runner@ or log out, the workflow will be terminated.
<notextile>
<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --local bwa-mem.cwl bwa-mem-input.yml</span>
To prevent misbehaving steps from running forever and wasting resources, you can fail the step if it exceeds a certain running time with "ToolTimeLimit":https://www.commonwl.org/v1.1/CommandLineTool.html#ToolTimeLimit instead of the deprecated @cwltool:TimeLimit@ .
To control if an individual step can be reused, use "WorkReuse":https://www.commonwl.org/v1.1/CommandLineTool.html#WorkReuse instead of the deprecated @arv:ReuseRequirement@.
-
-h2(#migrate). Differences in running CWL on the legacy jobs API vs containers API
-
-Most users can ignore this section.
-
-When migrating your Arvados cluster from using the jobs API (--api=jobs) (sometimes referred to as "crunch v1") to the containers API (--api=containers) ("crunch v2") there are a few differences in behavior:
-
-A tool may fail to find an input file that could be found when run under the jobs API. This is because tools are limited to accessing collections explicitly listed in the input, and further limited to those individual files or subdirectories that are listed. For example, given an explicit file input @/dir/subdir/file1.txt@, a tool will not be allowed to implicitly access a file in the parent directory @/dir/file2.txt@. Use @secondaryFiles@ or a @Directory@ for files that need to be grouped together.
-
-A tool may fail when attempting to rename or delete a file in the output directory. This may happen because files listed in @InitialWorkDirRequirement@ appear in the output directory as normal files (not symlinks) but cannot be moved, renamed or deleted unless marked as "writable" in CWL. These files will be added to the output collection but without any additional copies of the underlying data.
-
-A tool may fail when attempting to access the network. This may happen because, unlike the jobs API, under the containers API network access is disabled by default. Tools which require network access should add "arv:APIRequirement: {}":cwl-extensions.html#APIRequirement to the @requirements@ section.
-
-CWL v1.1 is not supported by the Jobs API.
SLURM:
PrioritySpread: 0
SbatchArgumentsList: []
+ SbatchEnvironmentVariables:
+ SAMPLE: ""
Managed:
# Path to dns server configuration directory
# (e.g. /etc/unbound.d/conf.d). If false, do not write any config
cluster.SystemRootToken = client.AuthToken
}
cluster.TLS.Insecure = client.Insecure
+ ks := ""
+ for i, u := range client.KeepServiceURIs {
+ if i > 0 {
+ ks += " "
+ }
+ ks += u
+ }
+ cluster.Containers.SLURM.SbatchEnvironmentVariables = map[string]string{"ARVADOS_KEEP_SERVICES": ks}
}
// update config using values from an crunch-dispatch-slurm config file.
SLURM:
PrioritySpread: 0
SbatchArgumentsList: []
+ SbatchEnvironmentVariables:
+ SAMPLE: ""
Managed:
# Path to dns server configuration directory
# (e.g. /etc/unbound.d/conf.d). If false, do not write any config
# SPDX-License-Identifier: Apache-2.0
# Implement cwl-runner interface for submitting and running work on Arvados, using
-# either the Crunch jobs API or Crunch containers API.
+# the Crunch containers API.
from future.utils import viewitems
from builtins import str
# These arn't used directly in this file but
# other code expects to import them from here
from .arvcontainer import ArvadosContainer
-from .arvjob import ArvadosJob
from .arvtool import ArvadosCommandTool
from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
from .util import get_current_container
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--enable-reuse", action="store_true",
default=True, dest="enable_reuse",
- help="Enable job or container reuse (default)")
+ help="Enable container reuse (default)")
exgroup.add_argument("--disable-reuse", action="store_false",
default=True, dest="enable_reuse",
- help="Disable job or container reuse")
+ help="Disable container reuse")
- parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
+ parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow containers, if not provided, will go to home project.")
parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None)
parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
parser.add_argument("--ignore-docker-for-reuse", action="store_true",
- help="Ignore Docker image version when deciding whether to reuse past jobs.",
+ help="Ignore Docker image version when deciding whether to reuse past containers.",
default=False)
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
default=True, dest="submit")
- exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
+ exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits containers to Arvados).",
default=True, dest="submit")
exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
dest="create_workflow")
- exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
- exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
+ exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
+ exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
+ exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
default=True, dest="wait")
- exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
+ exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner and exit.",
default=True, dest="wait")
exgroup = parser.add_mutually_exclusive_group()
parser.add_argument("--api",
default=None, dest="work_api",
- choices=("jobs", "containers"),
- help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
+ choices=("containers",),
+ help="Select work submission API. Only supports 'containers'")
parser.add_argument("--compute-checksum", action="store_true", default=False,
help="Compute checksum of contents while collecting outputs",
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--submit-request-uuid",
default=None,
- help="Update and commit to supplied container request instead of creating a new one (containers API only).",
+ help="Update and commit to supplied container request instead of creating a new one.",
metavar="UUID")
exgroup.add_argument("--submit-runner-cluster",
- help="Submit workflow runner to a remote cluster (containers API only)",
+ help="Submit workflow runner to a remote cluster",
default=None,
metavar="CLUSTER_ID")
default=0)
parser.add_argument("--priority", type=int,
- help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
+ help="Workflow priority (range 1..1000, higher has precedence over lower)",
default=DEFAULT_PRIORITY)
parser.add_argument("--disable-validate", dest="do_validate",
if arvargs.update_workflow:
if arvargs.update_workflow.find('-7fd4e-') == 5:
want_api = 'containers'
- elif arvargs.update_workflow.find('-p5p6p-') == 5:
- want_api = 'jobs'
else:
want_api = None
if want_api and arvargs.work_api and want_api != arvargs.work_api:
return 1
# Note that unless in debug mode, some stack traces related to user
- # workflow errors may be suppressed. See ArvadosJob.done().
+ # workflow errors may be suppressed.
if arvargs.debug:
logger.setLevel(logging.DEBUG)
logging.getLogger('arvados').setLevel(logging.DEBUG)
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from past.builtins import basestring
-from builtins import object
-from future.utils import viewitems
-
-import logging
-import re
-import copy
-import json
-import time
-
-from cwltool.process import shortname, UnsupportedRequirement
-from cwltool.errors import WorkflowException
-from cwltool.command_line_tool import revmap_file, CommandLineTool
-from cwltool.load_tool import fetch_document
-from cwltool.builder import Builder
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.job import JobBase
-
-from schema_salad.sourceline import SourceLine
-
-import arvados_cwl.util
-import ruamel.yaml as yaml
-
-import arvados.collection
-from arvados.errors import ApiError
-
-from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image, packed_workflow, upload_workflow_collection, trim_anonymous_location, remove_redundant_fields
-from .pathmapper import VwdPathMapper, trim_listing
-from .perf import Perf
-from . import done
-from ._version import __version__
-from .util import get_intermediate_collection_info
-
-logger = logging.getLogger('arvados.cwl-runner')
-metrics = logging.getLogger('arvados.cwl-runner.metrics')
-
-crunchrunner_re = re.compile(r"^.*crunchrunner: \$\(task\.(tmpdir|outdir|keep)\)=(.*)$")
-
-crunchrunner_git_commit = 'a3f2cb186e437bfce0031b024b2157b73ed2717d'
-
-class ArvadosJob(JobBase):
- """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
-
- def __init__(self, runner,
- builder, # type: Builder
- joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
- make_path_mapper, # type: Callable[..., PathMapper]
- requirements, # type: List[Dict[Text, Text]]
- hints, # type: List[Dict[Text, Text]]
- name # type: Text
- ):
- super(ArvadosJob, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
- self.arvrunner = runner
- self.running = False
- self.uuid = None
-
- def run(self, runtimeContext):
- script_parameters = {
- "command": self.command_line
- }
- runtime_constraints = {}
-
- with Perf(metrics, "generatefiles %s" % self.name):
- if self.generatefiles["listing"]:
- vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
- keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries)
- script_parameters["task.vwd"] = {}
- generatemapper = VwdPathMapper(self.generatefiles["listing"], "", "",
- separateDirs=False)
-
- with Perf(metrics, "createfiles %s" % self.name):
- for f, p in generatemapper.items():
- if p.type == "CreateFile":
- with vwd.open(p.target, "w") as n:
- n.write(p.resolved.encode("utf-8"))
-
- if vwd:
- with Perf(metrics, "generatefiles.save_new %s" % self.name):
- info = get_intermediate_collection_info(self.name, None, runtimeContext.intermediate_output_ttl)
- vwd.save_new(name=info["name"],
- owner_uuid=self.arvrunner.project_uuid,
- ensure_unique_name=True,
- trash_at=info["trash_at"],
- properties=info["properties"])
-
- for f, p in generatemapper.items():
- if p.type == "File":
- script_parameters["task.vwd"][p.target] = p.resolved
- if p.type == "CreateFile":
- script_parameters["task.vwd"][p.target] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), p.target)
-
- script_parameters["task.env"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
- if self.environment:
- script_parameters["task.env"].update(self.environment)
-
- if self.stdin:
- script_parameters["task.stdin"] = self.stdin
-
- if self.stdout:
- script_parameters["task.stdout"] = self.stdout
-
- if self.stderr:
- script_parameters["task.stderr"] = self.stderr
-
- if self.successCodes:
- script_parameters["task.successCodes"] = self.successCodes
- if self.temporaryFailCodes:
- script_parameters["task.temporaryFailCodes"] = self.temporaryFailCodes
- if self.permanentFailCodes:
- script_parameters["task.permanentFailCodes"] = self.permanentFailCodes
-
- with Perf(metrics, "arv_docker_get_image %s" % self.name):
- (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
- if docker_req and runtimeContext.use_container is not False:
- if docker_req.get("dockerOutputDirectory"):
- raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
- "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api,
- docker_req,
- runtimeContext.pull_image,
- self.arvrunner.project_uuid)
- else:
- runtime_constraints["docker_image"] = "arvados/jobs"
-
- resources = self.builder.resources
- if resources is not None:
- runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
- runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
- runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
-
- runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
- if runtime_req:
- if "keep_cache" in runtime_req:
- runtime_constraints["keep_cache_mb_per_task"] = runtime_req["keep_cache"]
- runtime_constraints["min_ram_mb_per_node"] += runtime_req["keep_cache"]
- if "outputDirType" in runtime_req:
- if runtime_req["outputDirType"] == "local_output_dir":
- script_parameters["task.keepTmpOutput"] = False
- elif runtime_req["outputDirType"] == "keep_output_dir":
- script_parameters["task.keepTmpOutput"] = True
-
- filters = [["repository", "=", "arvados"],
- ["script", "=", "crunchrunner"],
- ["script_version", "in git", crunchrunner_git_commit]]
- if not self.arvrunner.ignore_docker_for_reuse:
- filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
-
- enable_reuse = runtimeContext.enable_reuse
- if enable_reuse:
- reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
- if reuse_req:
- enable_reuse = reuse_req["enableReuse"]
-
- self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
-
- try:
- with Perf(metrics, "create %s" % self.name):
- response = self.arvrunner.api.jobs().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "script": "crunchrunner",
- "repository": "arvados",
- "script_version": "master",
- "minimum_script_version": crunchrunner_git_commit,
- "script_parameters": {"tasks": [script_parameters]},
- "runtime_constraints": runtime_constraints
- },
- filters=filters,
- find_or_create=enable_reuse
- ).execute(num_retries=self.arvrunner.num_retries)
-
- self.uuid = response["uuid"]
- self.arvrunner.process_submitted(self)
-
- self.update_pipeline_component(response)
-
- if response["state"] == "Complete":
- logger.info("%s reused job %s", self.arvrunner.label(self), response["uuid"])
- # Give read permission to the desired project on reused jobs
- if response["owner_uuid"] != self.arvrunner.project_uuid:
- try:
- self.arvrunner.api.links().create(body={
- 'link_class': 'permission',
- 'name': 'can_read',
- 'tail_uuid': self.arvrunner.project_uuid,
- 'head_uuid': response["uuid"],
- }).execute(num_retries=self.arvrunner.num_retries)
- except ApiError as e:
- # The user might not have "manage" access on the job: log
- # a message and continue.
- logger.info("Creating read permission on job %s: %s",
- response["uuid"],
- e)
- else:
- logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
- except Exception:
- logger.exception("%s error" % (self.arvrunner.label(self)))
- self.output_callback({}, "permanentFail")
-
- def update_pipeline_component(self, record):
- with self.arvrunner.workflow_eval_lock:
- if self.arvrunner.pipeline:
- self.arvrunner.pipeline["components"][self.name] = {"job": record}
- with Perf(metrics, "update_pipeline_component %s" % self.name):
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
- uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
- if self.arvrunner.uuid:
- try:
- job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
- if job:
- components = job["components"]
- components[self.name] = record["uuid"]
- self.arvrunner.api.jobs().update(
- uuid=self.arvrunner.uuid,
- body={
- "components": components
- }).execute(num_retries=self.arvrunner.num_retries)
- except Exception:
- logger.exception("Error adding to components")
-
- def done(self, record):
- try:
- self.update_pipeline_component(record)
- except:
- pass
-
- try:
- if record["state"] == "Complete":
- processStatus = "success"
- # we don't have the real exit code so fake it.
- record["exit_code"] = 0
- else:
- processStatus = "permanentFail"
- record["exit_code"] = 1
-
- outputs = {}
- try:
- if record["output"]:
- with Perf(metrics, "inspect log %s" % self.name):
- logc = arvados.collection.CollectionReader(record["log"],
- api_client=self.arvrunner.api,
- keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries)
- log = logc.open(list(logc.keys())[0])
- dirs = {
- "tmpdir": "/tmpdir",
- "outdir": "/outdir",
- "keep": "/keep"
- }
- for l in log:
- # Determine the tmpdir, outdir and keep paths from
- # the job run. Unfortunately, we can't take the first
- # values we find (which are expected to be near the
- # top) and stop scanning because if the node fails and
- # the job restarts on a different node these values
- # will different runs, and we need to know about the
- # final run that actually produced output.
- g = crunchrunner_re.match(l)
- if g:
- dirs[g.group(1)] = g.group(2)
-
- if processStatus == "permanentFail":
- done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
-
- with Perf(metrics, "output collection %s" % self.name):
- outputs = done.done(self, record, dirs["tmpdir"],
- dirs["outdir"], dirs["keep"])
- except WorkflowException as e:
- # Only include a stack trace if in debug mode.
- # This is most likely a user workflow error and a stack trace may obfuscate more useful output.
- logger.error("%s unable to collect output from %s:\n%s",
- self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
- processStatus = "permanentFail"
- except Exception:
- logger.exception("Got unknown exception while collecting output for job %s:", self.name)
- processStatus = "permanentFail"
-
- # Note: Currently, on error output_callback is expecting an empty dict,
- # anything else will fail.
- if not isinstance(outputs, dict):
- logger.error("Unexpected output type %s '%s'", type(outputs), outputs)
- outputs = {}
- processStatus = "permanentFail"
- finally:
- self.output_callback(outputs, processStatus)
-
-
-class RunnerJob(Runner):
- """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
-
- def arvados_job_spec(self, debug=False):
- """Create an Arvados job specification for this workflow.
-
- The returned dict can be used to create a job (i.e., passed as
- the +body+ argument to jobs().create()), or as a component in
- a pipeline template or pipeline instance.
- """
-
- if self.embedded_tool.tool["id"].startswith("keep:"):
- self.job_order["cwl:tool"] = self.embedded_tool.tool["id"][5:]
- else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
- wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
- self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
-
- adjustDirObjs(self.job_order, trim_listing)
- visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
- visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
-
- if self.output_name:
- self.job_order["arv:output_name"] = self.output_name
-
- if self.output_tags:
- self.job_order["arv:output_tags"] = self.output_tags
-
- self.job_order["arv:enable_reuse"] = self.enable_reuse
-
- if self.on_error:
- self.job_order["arv:on_error"] = self.on_error
-
- if debug:
- self.job_order["arv:debug"] = True
-
- return {
- "script": "cwl-runner",
- "script_version": "master",
- "minimum_script_version": "570509ab4d2ef93d870fd2b1f2eab178afb1bad9",
- "repository": "arvados",
- "script_parameters": self.job_order,
- "runtime_constraints": {
- "docker_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
- "min_ram_mb_per_node": self.submit_runner_ram
- }
- }
-
- def run(self, runtimeContext):
- job_spec = self.arvados_job_spec(runtimeContext.debug)
-
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-
- job = self.arvrunner.api.jobs().create(
- body=job_spec,
- find_or_create=self.enable_reuse
- ).execute(num_retries=self.arvrunner.num_retries)
-
- for k,v in viewitems(job_spec["script_parameters"]):
- if v is False or v is None or isinstance(v, dict):
- job_spec["script_parameters"][k] = {"value": v}
-
- del job_spec["owner_uuid"]
- job_spec["job"] = job
-
- instance_spec = {
- "owner_uuid": self.arvrunner.project_uuid,
- "name": self.name,
- "components": {
- "cwl-runner": job_spec,
- },
- "state": "RunningOnServer",
- }
- if not self.enable_reuse:
- instance_spec["properties"] = {"run_options": {"enable_job_reuse": False}}
-
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
- body=instance_spec).execute(num_retries=self.arvrunner.num_retries)
- logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
-
- if runtimeContext.wait is False:
- self.uuid = self.arvrunner.pipeline["uuid"]
- return
-
- self.uuid = job["uuid"]
- self.arvrunner.process_submitted(self)
-
-
-class RunnerTemplate(object):
- """An Arvados pipeline template that invokes a CWL workflow."""
-
- type_to_dataclass = {
- 'boolean': 'boolean',
- 'File': 'File',
- 'Directory': 'Collection',
- 'float': 'number',
- 'int': 'number',
- 'string': 'text',
- }
-
- def __init__(self, runner, tool, job_order, enable_reuse, uuid,
- submit_runner_ram=0, name=None, merged_map=None,
- loadingContext=None):
- self.runner = runner
- self.embedded_tool = tool
- self.job = RunnerJob(
- runner=runner,
- tool=tool,
- enable_reuse=enable_reuse,
- output_name=None,
- output_tags=None,
- submit_runner_ram=submit_runner_ram,
- name=name,
- merged_map=merged_map,
- loadingContext=loadingContext)
- self.job.job_order = job_order
- self.uuid = uuid
-
- def pipeline_component_spec(self):
- """Return a component that Workbench and a-r-p-i will understand.
-
- Specifically, translate CWL input specs to Arvados pipeline
- format, like {"dataclass":"File","value":"xyz"}.
- """
-
- spec = self.job.arvados_job_spec()
-
- # Most of the component spec is exactly the same as the job
- # spec (script, script_version, etc.).
- # spec['script_parameters'] isn't right, though. A component
- # spec's script_parameters hash is a translation of
- # self.tool.tool['inputs'] with defaults/overrides taken from
- # the job order. So we move the job parameters out of the way
- # and build a new spec['script_parameters'].
- job_params = spec['script_parameters']
- spec['script_parameters'] = {}
-
- for param in self.embedded_tool.tool['inputs']:
- param = copy.deepcopy(param)
-
- # Data type and "required" flag...
- types = param['type']
- if not isinstance(types, list):
- types = [types]
- param['required'] = 'null' not in types
- non_null_types = [t for t in types if t != "null"]
- if len(non_null_types) == 1:
- the_type = [c for c in non_null_types][0]
- dataclass = None
- if isinstance(the_type, basestring):
- dataclass = self.type_to_dataclass.get(the_type)
- if dataclass:
- param['dataclass'] = dataclass
- # Note: If we didn't figure out a single appropriate
- # dataclass, we just left that attribute out. We leave
- # the "type" attribute there in any case, which might help
- # downstream.
-
- # Title and description...
- title = param.pop('label', '')
- descr = param.pop('doc', '').rstrip('\n')
- if title:
- param['title'] = title
- if descr:
- param['description'] = descr
-
- # Fill in the value from the current job order, if any.
- param_id = shortname(param.pop('id'))
- value = job_params.get(param_id)
- if value is None:
- pass
- elif not isinstance(value, dict):
- param['value'] = value
- elif param.get('dataclass') in ('File', 'Collection') and value.get('location'):
- param['value'] = value['location'][5:]
-
- spec['script_parameters'][param_id] = param
- spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
- return spec
-
- def save(self):
- body = {
- "components": {
- self.job.name: self.pipeline_component_spec(),
- },
- "name": self.job.name,
- }
- if self.runner.project_uuid:
- body["owner_uuid"] = self.runner.project_uuid
- if self.uuid:
- self.runner.api.pipeline_templates().update(
- uuid=self.uuid, body=body).execute(
- num_retries=self.runner.num_retries)
- logger.info("Updated template %s", self.uuid)
- else:
- self.uuid = self.runner.api.pipeline_templates().create(
- body=body, ensure_unique_name=True).execute(
- num_retries=self.runner.num_retries)['uuid']
- logger.info("Created template %s", self.uuid)
# SPDX-License-Identifier: Apache-2.0
from cwltool.command_line_tool import CommandLineTool, ExpressionTool
-from .arvjob import ArvadosJob
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
from .runner import make_builder
def make_job_runner(self, runtimeContext):
if runtimeContext.work_api == "containers":
return partial(ArvadosContainer, self.arvrunner, runtimeContext)
- elif runtimeContext.work_api == "jobs":
- return partial(ArvadosJob, self.arvrunner)
else:
raise Exception("Unsupported work_api %s", runtimeContext.work_api)
return ArvPathMapper(self.arvrunner, reffiles+runtimeContext.extra_reffiles, runtimeContext.basedir,
"/keep/%s",
"/keep/%s/%s")
- elif runtimeContext.work_api == "jobs":
- return ArvPathMapper(self.arvrunner, reffiles, runtimeContext.basedir,
- "$(task.keep)/%s",
- "$(task.keep)/%s/%s")
def job(self, joborder, output_callback, runtimeContext):
builder = make_builder(joborder, self.hints, self.requirements, runtimeContext)
else:
runtimeContext.outdir = "/var/spool/cwl"
runtimeContext.docker_outdir = "/var/spool/cwl"
- elif runtimeContext.work_api == "jobs":
- runtimeContext.outdir = "$(task.outdir)"
- runtimeContext.docker_outdir = "$(task.outdir)"
- runtimeContext.tmpdir = "$(task.tmpdir)"
- runtimeContext.docker_tmpdir = "$(task.tmpdir)"
return super(ArvadosCommandTool, self).job(joborder, output_callback, runtimeContext)
class ArvadosExpressionTool(ExpressionTool):
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-# Crunch script integration for running arvados-cwl-runner (importing
-# arvados_cwl module) inside a crunch job.
-#
-# This gets the job record, transforms the script parameters into a valid CWL
-# input object, then executes the CWL runner to run the underlying workflow or
-# tool. When the workflow completes, record the output object in an output
-# collection for this runner job.
-
-from past.builtins import basestring
-from future.utils import viewitems
-
-import arvados
-import arvados_cwl
-import arvados.collection
-import arvados.util
-import cwltool.main
-import logging
-import os
-import json
-import argparse
-import re
-import functools
-
-from arvados.api import OrderedJsonModel
-from cwltool.process import shortname
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
-from cwltool.load_tool import load_tool
-from cwltool.errors import WorkflowException
-from arvados_cwl.context import ArvRuntimeContext
-
-from .fsaccess import CollectionFetcher, CollectionFsAccess
-
-logger = logging.getLogger('arvados.cwl-runner')
-
-def run():
- # Timestamps are added by crunch-job, so don't print redundant timestamps.
- arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
-
- # Print package versions
- logger.info(arvados_cwl.versionstring())
-
- api = arvados.api("v1")
-
- arvados_cwl.add_arv_hints()
-
- runner = None
- try:
- job_order_object = arvados.current_job()['script_parameters']
- toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
-
- pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
-
- def keeppath(v):
- if pdh_path.match(v):
- return "keep:%s" % v
- else:
- return v
-
- def keeppathObj(v):
- if "location" in v:
- v["location"] = keeppath(v["location"])
-
- for k,v in viewitems(job_order_object):
- if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
- job_order_object[k] = {
- "class": "File",
- "location": "keep:%s" % v
- }
-
- adjustFileObjs(job_order_object, keeppathObj)
- adjustDirObjs(job_order_object, keeppathObj)
- normalizeFilesDirs(job_order_object)
-
- output_name = None
- output_tags = None
- enable_reuse = True
- on_error = "continue"
- debug = False
-
- if "arv:output_name" in job_order_object:
- output_name = job_order_object["arv:output_name"]
- del job_order_object["arv:output_name"]
-
- if "arv:output_tags" in job_order_object:
- output_tags = job_order_object["arv:output_tags"]
- del job_order_object["arv:output_tags"]
-
- if "arv:enable_reuse" in job_order_object:
- enable_reuse = job_order_object["arv:enable_reuse"]
- del job_order_object["arv:enable_reuse"]
-
- if "arv:on_error" in job_order_object:
- on_error = job_order_object["arv:on_error"]
- del job_order_object["arv:on_error"]
-
- if "arv:debug" in job_order_object:
- debug = job_order_object["arv:debug"]
- del job_order_object["arv:debug"]
-
- arvargs = argparse.Namespace()
- arvargs.work_api = "jobs"
- arvargs.output_name = output_name
- arvargs.output_tags = output_tags
- arvargs.thread_count = 1
- arvargs.collection_cache_size = None
-
- runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache(
- api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
- arvargs=arvargs)
-
- make_fs_access = functools.partial(CollectionFsAccess,
- collection_cache=runner.collection_cache)
-
- t = load_tool(toolpath, runner.loadingContext)
-
- if debug:
- logger.setLevel(logging.DEBUG)
- logging.getLogger('arvados').setLevel(logging.DEBUG)
- logging.getLogger("cwltool").setLevel(logging.DEBUG)
-
- args = ArvRuntimeContext(vars(arvargs))
- args.project_uuid = arvados.current_job()["owner_uuid"]
- args.enable_reuse = enable_reuse
- args.on_error = on_error
- args.submit = False
- args.debug = debug
- args.quiet = False
- args.ignore_docker_for_reuse = False
- args.basedir = os.getcwd()
- args.name = None
- args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
- args.make_fs_access = make_fs_access
- args.trash_intermediate = False
- args.intermediate_output_ttl = 0
- args.priority = arvados_cwl.DEFAULT_PRIORITY
- args.do_validate = True
- args.disable_js_validation = False
- args.tmp_outdir_prefix = "tmp"
-
- runner.arv_executor(t, job_order_object, args, logger=logger)
- except Exception as e:
- if isinstance(e, WorkflowException):
- logging.info("Workflow error %s", e)
- else:
- logging.exception("Unhandled exception")
- if runner and runner.final_output_collection:
- outputCollection = runner.final_output_collection.portable_data_hash()
- else:
- outputCollection = None
- api.job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={
- 'output': outputCollection,
- 'success': False,
- 'progress':1.0
- }).execute()
import arvados_cwl.util
from .arvcontainer import RunnerContainer
-from .arvjob import RunnerJob, RunnerTemplate
from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
class ArvCwlExecutor(object):
- """Execute a CWL tool or workflow, submit work (using either jobs or
- containers API), wait for them to complete, and report output.
+ """Execute a CWL tool or workflow, submit work (using containers API),
+ wait for them to complete, and report output.
"""
num_retries=self.num_retries)
self.work_api = None
- expected_api = ["containers", "jobs"]
+ expected_api = ["containers"]
for api in expected_api:
try:
methods = self.api._rootDesc.get('resources')[api]['methods']
raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
if self.work_api == "jobs":
- logger.warning("""
+ logger.error("""
*******************************
-Using the deprecated 'jobs' API.
-
-To get rid of this warning:
-
-Users: read about migrating at
-http://doc.arvados.org/user/cwl/cwl-style.html#migrate
-and use the option --api=containers
-
-Admins: configure the cluster to disable the 'jobs' API as described at:
-http://doc.arvados.org/install/install-api-server.html#disable_api_methods
+The 'jobs' API is no longer supported.
*******************************""")
+ exit(1)
self.loadingContext = ArvLoadingContext(vars(arvargs))
self.loadingContext.fetcher_constructor = self.fetcher_constructor
return "[%s %s]" % (self.work_api[0:-1], obj.name)
def poll_states(self):
- """Poll status of jobs or containers listed in the processes dict.
+ """Poll status of containers listed in the processes dict.
Runs in a separate thread.
"""
begin_poll = time.time()
if self.work_api == "containers":
table = self.poll_api.container_requests()
- elif self.work_api == "jobs":
- table = self.poll_api.jobs()
pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
def check_features(self, obj, parentfield=""):
if isinstance(obj, dict):
- if obj.get("writable") and self.work_api != "containers":
- raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
if obj.get("class") == "DockerRequirement":
if obj.get("dockerOutputDirectory"):
- if self.work_api != "containers":
- raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
- "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
if not obj.get("dockerOutputDirectory").startswith('/'):
raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
"Option 'dockerOutputDirectory' must be an absolute path.")
- if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
- raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
if obj.get("class") == "InplaceUpdateRequirement":
if obj["inplaceUpdate"] and parentfield == "requirements":
raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
except Exception:
logger.exception("Setting container output")
return
- elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
- self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
- body={
- 'output': self.final_output_collection.portable_data_hash(),
- 'success': self.final_status == "success",
- 'progress':1.0
- }).execute(num_retries=self.num_retries)
def apply_reqs(self, job_order_object, tool):
if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
existing_uuid = runtimeContext.update_workflow
if existing_uuid or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
- if self.work_api == "jobs":
- tmpl = RunnerTemplate(self, tool, job_order,
- runtimeContext.enable_reuse,
- uuid=existing_uuid,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map,
- loadingContext=loadingContext)
- tmpl.save()
- # cwltool.main will write our return value to stdout.
- return (tmpl.uuid, "success")
- elif self.work_api == "containers":
+ if self.work_api == "containers":
return (upload_workflow(self, tool, job_order,
self.project_uuid,
uuid=existing_uuid,
runtimeContext.docker_outdir = "/var/spool/cwl"
runtimeContext.tmpdir = "/tmp"
runtimeContext.docker_tmpdir = "/tmp"
- elif self.work_api == "jobs":
- if runtimeContext.priority != DEFAULT_PRIORITY:
- raise Exception("--priority not implemented for jobs API.")
- runtimeContext.outdir = "$(task.outdir)"
- runtimeContext.docker_outdir = "$(task.outdir)"
- runtimeContext.tmpdir = "$(task.tmpdir)"
if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
raise Exception("--priority must be in the range 1..1000.")
secret_store=self.secret_store,
collection_cache_size=runtimeContext.collection_cache_size,
collection_cache_is_default=self.should_estimate_cache_size)
- elif self.work_api == "jobs":
- tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
- self.output_name,
- self.output_tags,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- on_error=runtimeContext.on_error,
- submit_runner_image=runtimeContext.submit_runner_image,
- merged_map=merged_map)
- elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
- # Create pipeline for local run
- self.pipeline = self.api.pipeline_instances().create(
- body={
- "owner_uuid": self.project_uuid,
- "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
- "components": {},
- "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
- logger.info("Pipeline instance %s", self.pipeline["uuid"])
if runtimeContext.cwl_runner_job is not None:
self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from future import standard_library
-standard_library.install_aliases()
-from builtins import str
-from builtins import next
-
-import functools
-import json
-import logging
-import mock
-import os
-import unittest
-import copy
-import io
-import argparse
-
-import arvados
-import arvados_cwl
-import arvados_cwl.executor
-import cwltool.process
-from arvados.errors import ApiError
-from schema_salad.ref_resolver import Loader
-from schema_salad.sourceline import cmap
-from .mock_discovery import get_rootDesc
-from .matcher import JsonDiffMatcher, StripYAMLComments
-from .test_container import CollectionMock
-from arvados_cwl.arvdocker import arv_docker_clear_cache
-
-if not os.getenv('ARVADOS_DEBUG'):
- logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
- logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-
-class TestJob(unittest.TestCase):
-
- def setUp(self):
- cwltool.process._names = set()
-
- def helper(self, runner, enable_reuse=True):
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
-
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- loadingContext = arvados_cwl.context.ArvLoadingContext(
- {"avsc_names": avsc_names,
- "basedir": "",
- "make_fs_access": make_fs_access,
- "loader": Loader({}),
- "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
- "makeTool": runner.arv_make_tool})
- runtimeContext = arvados_cwl.context.ArvRuntimeContext(
- {"work_api": "jobs",
- "basedir": "",
- "name": "test_run_job_"+str(enable_reuse),
- "make_fs_access": make_fs_access,
- "enable_reuse": enable_reuse,
- "priority": 500})
-
- return loadingContext, runtimeContext
-
- # The test passes no builder.resources
- # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
- def test_run(self, list_images_in_arv):
- for enable_reuse in (True, False):
- arv_docker_clear_cache()
- runner = mock.MagicMock()
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.ignore_docker_for_reuse = False
- runner.num_retries = 0
-
- list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
- # Simulate reused job from another project so that we can check is a can_read
- # link is added.
- runner.api.jobs().create().execute.return_value = {
- 'state': 'Complete' if enable_reuse else 'Queued',
- 'owner_uuid': 'zzzzz-tpzed-yyyyyyyyyyyyyyy' if enable_reuse else 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'uuid': 'zzzzz-819sb-yyyyyyyyyyyyyyy',
- 'output': None,
- }
-
- tool = cmap({
- "inputs": [],
- "outputs": [],
- "baseCommand": "ls",
- "arguments": [{"valueFrom": "$(runtime.outdir)"}],
- "id": "#",
- "class": "CommandLineTool"
- })
-
- loadingContext, runtimeContext = self.helper(runner, enable_reuse)
-
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
- arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
- j.run(runtimeContext)
- runner.api.jobs().create.assert_called_with(
- body=JsonDiffMatcher({
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'runtime_constraints': {},
- 'script_parameters': {
- 'tasks': [{
- 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
- 'command': ['ls', '$(task.outdir)']
- }],
- },
- 'script_version': 'master',
- 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
- 'repository': 'arvados',
- 'script': 'crunchrunner',
- 'runtime_constraints': {
- 'docker_image': 'arvados/jobs',
- 'min_cores_per_node': 1,
- 'min_ram_mb_per_node': 1024,
- 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
- }
- }),
- find_or_create=enable_reuse,
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']]
- )
- if enable_reuse:
- runner.api.links().create.assert_called_with(
- body=JsonDiffMatcher({
- 'link_class': 'permission',
- 'name': 'can_read',
- "tail_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
- "head_uuid": "zzzzz-819sb-yyyyyyyyyyyyyyy",
- })
- )
- # Simulate an API excepction when trying to create a
- # sharing link on the job
- runner.api.links().create.side_effect = ApiError(
- mock.MagicMock(return_value={'status': 403}),
- bytes(b'Permission denied'))
- j.run(runtimeContext)
- else:
- assert not runner.api.links().create.called
-
- # The test passes some fields in builder.resources
- # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
- def test_resource_requirements(self, list_images_in_arv):
- runner = mock.MagicMock()
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.ignore_docker_for_reuse = False
- runner.num_retries = 0
- arvados_cwl.add_arv_hints()
-
- list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
-
- tool = {
- "inputs": [],
- "outputs": [],
- "hints": [{
- "class": "ResourceRequirement",
- "coresMin": 3,
- "ramMin": 3000,
- "tmpdirMin": 4000
- }, {
- "class": "http://arvados.org/cwl#RuntimeConstraints",
- "keep_cache": 512,
- "outputDirType": "keep_output_dir"
- }, {
- "class": "http://arvados.org/cwl#APIRequirement",
- },
- {
- "class": "http://arvados.org/cwl#ReuseRequirement",
- "enableReuse": False
- }],
- "baseCommand": "ls",
- "id": "#",
- "class": "CommandLineTool"
- }
-
- loadingContext, runtimeContext = self.helper(runner)
-
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
- arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
- j.run(runtimeContext)
- runner.api.jobs().create.assert_called_with(
- body=JsonDiffMatcher({
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'runtime_constraints': {},
- 'script_parameters': {
- 'tasks': [{
- 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
- 'task.keepTmpOutput': True,
- 'command': ['ls']
- }]
- },
- 'script_version': 'master',
- 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
- 'repository': 'arvados',
- 'script': 'crunchrunner',
- 'runtime_constraints': {
- 'docker_image': 'arvados/jobs',
- 'min_cores_per_node': 3,
- 'min_ram_mb_per_node': 3512, # ramMin + keep_cache
- 'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
- 'keep_cache_mb_per_task': 512
- }
- }),
- find_or_create=False,
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']])
-
- @mock.patch("arvados.collection.CollectionReader")
- def test_done(self, reader):
- api = mock.MagicMock()
-
- runner = mock.MagicMock()
- runner.api = api
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.num_retries = 0
- runner.ignore_docker_for_reuse = False
-
- reader().keys.return_value = "log.txt"
- reader().open.return_value = io.StringIO(
- str(u"""2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
-2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
-2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
- """))
- api.collections().list().execute.side_effect = ({"items": []},
- {"items": [{"manifest_text": "XYZ"}]},
- {"items": []},
- {"items": [{"manifest_text": "ABC"}]})
-
- arvjob = arvados_cwl.ArvadosJob(runner,
- mock.MagicMock(),
- {},
- None,
- [],
- [],
- "testjob")
- arvjob.output_callback = mock.MagicMock()
- arvjob.collect_outputs = mock.MagicMock()
- arvjob.collect_outputs.return_value = {"out": "stuff"}
-
- arvjob.done({
- "state": "Complete",
- "output": "99999999999999999999999999999993+99",
- "log": "99999999999999999999999999999994+99",
- "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- })
-
- api.collections().list.assert_has_calls([
- mock.call(),
- # Output collection check
- mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
- ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
- ['name', '=', 'Output 9999999 of testjob']]),
- mock.call().execute(num_retries=0),
- mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
- select=['manifest_text']),
- mock.call().execute(num_retries=0),
- # Log collection's turn
- mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
- ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
- ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
- mock.call().execute(num_retries=0),
- mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999994+99']],
- select=['manifest_text']),
- mock.call().execute(num_retries=0)])
-
- api.collections().create.assert_has_calls([
- mock.call(ensure_unique_name=True,
- body={'portable_data_hash': '99999999999999999999999999999993+99',
- 'manifest_text': 'XYZ',
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'name': 'Output 9999999 of testjob'}),
- mock.call().execute(num_retries=0),
- mock.call(ensure_unique_name=True,
- body={'portable_data_hash': '99999999999999999999999999999994+99',
- 'manifest_text': 'ABC',
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'name': 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
- mock.call().execute(num_retries=0),
- ])
-
- arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
-
- @mock.patch("arvados.collection.CollectionReader")
- def test_done_use_existing_collection(self, reader):
- api = mock.MagicMock()
-
- runner = mock.MagicMock()
- runner.api = api
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.num_retries = 0
-
- reader().keys.return_value = "log.txt"
- reader().open.return_value = io.StringIO(
- str(u"""2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
-2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
-2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
- """))
-
- api.collections().list().execute.side_effect = (
- {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
- {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
- )
-
- arvjob = arvados_cwl.ArvadosJob(runner,
- mock.MagicMock(),
- {},
- None,
- [],
- [],
- "testjob")
- arvjob.output_callback = mock.MagicMock()
- arvjob.collect_outputs = mock.MagicMock()
- arvjob.collect_outputs.return_value = {"out": "stuff"}
-
- arvjob.done({
- "state": "Complete",
- "output": "99999999999999999999999999999993+99",
- "log": "99999999999999999999999999999994+99",
- "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- })
-
- api.collections().list.assert_has_calls([
- mock.call(),
- # Output collection
- mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
- ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
- ['name', '=', 'Output 9999999 of testjob']]),
- mock.call().execute(num_retries=0),
- # Log collection
- mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
- ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
- ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
- mock.call().execute(num_retries=0)
- ])
-
- self.assertFalse(api.collections().create.called)
-
- arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
-
-
-class TestWorkflow(unittest.TestCase):
-
- def setUp(self):
- cwltool.process._names = set()
-
- def helper(self, runner, enable_reuse=True):
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
-
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-
- document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
- document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
- document_loader.fetch_text = document_loader.fetcher.fetch_text
- document_loader.check_exists = document_loader.fetcher.check_exists
-
- loadingContext = arvados_cwl.context.ArvLoadingContext(
- {"avsc_names": avsc_names,
- "basedir": "",
- "make_fs_access": make_fs_access,
- "loader": document_loader,
- "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
- "construct_tool_object": runner.arv_make_tool})
- runtimeContext = arvados_cwl.context.ArvRuntimeContext(
- {"work_api": "jobs",
- "basedir": "",
- "name": "test_run_wf_"+str(enable_reuse),
- "make_fs_access": make_fs_access,
- "enable_reuse": enable_reuse,
- "priority": 500})
-
- return loadingContext, runtimeContext
-
- # The test passes no builder.resources
- # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- @mock.patch("arvados.collection.CollectionReader")
- @mock.patch("arvados.collection.Collection")
- @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
- def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
- arv_docker_clear_cache()
- arvados_cwl.add_arv_hints()
-
- api = mock.MagicMock()
- api._rootDesc = get_rootDesc()
-
- runner = arvados_cwl.executor.ArvCwlExecutor(api, argparse.Namespace(work_api="jobs",
- output_name=None,
- output_tags=None,
- thread_count=1,
- collection_cache_size=None))
- self.assertEqual(runner.work_api, 'jobs')
-
- list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
- runner.api.collections().list().execute.return_value = {"items": [{
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
- "portable_data_hash": "99999999999999999999999999999993+99"}]}
-
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.ignore_docker_for_reuse = False
- runner.num_retries = 0
-
- loadingContext, runtimeContext = self.helper(runner)
- runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
- tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
- metadata["cwlVersion"] = tool["cwlVersion"]
-
- mockc = mock.MagicMock()
- mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
- mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
-
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
- arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), runtimeContext)
-
- next(it).run(runtimeContext)
- next(it).run(runtimeContext)
-
- with open("tests/wf/scatter2_subwf.cwl") as f:
- subwf = StripYAMLComments(f.read().rstrip())
-
- runner.api.jobs().create.assert_called_with(
- body=JsonDiffMatcher({
- 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
- 'repository': 'arvados',
- 'script_version': 'master',
- 'script': 'crunchrunner',
- 'script_parameters': {
- 'tasks': [{'task.env': {
- 'HOME': '$(task.outdir)',
- 'TMPDIR': '$(task.tmpdir)'},
- 'task.vwd': {
- 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
- 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
- },
- 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
- 'task.stdout': 'cwl.output.json'}]},
- 'runtime_constraints': {
- 'min_scratch_mb_per_node': 2048,
- 'min_cores_per_node': 1,
- 'docker_image': 'arvados/jobs',
- 'min_ram_mb_per_node': 1024
- },
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']],
- find_or_create=True)
-
- mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
- mockc.open().__enter__().write.assert_has_calls([mock.call(
-bytes(b'''{
- "fileblub": {
- "basename": "token.txt",
- "class": "File",
- "location": "/keep/99999999999999999999999999999999+118/token.txt",
- "size": 0
- },
- "sleeptime": 5
-}'''))])
-
- # The test passes no builder.resources
- # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- @mock.patch("arvados.collection.CollectionReader")
- @mock.patch("arvados.collection.Collection")
- @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
- def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
- arv_docker_clear_cache()
- arvados_cwl.add_arv_hints()
-
- api = mock.MagicMock()
- api._rootDesc = get_rootDesc()
-
- runner = arvados_cwl.executor.ArvCwlExecutor(api, argparse.Namespace(work_api="jobs",
- output_name=None,
- output_tags=None,
- thread_count=1,
- collection_cache_size=None))
- self.assertEqual(runner.work_api, 'jobs')
-
- list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
- runner.api.collections().list().execute.return_value = {"items": [{
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
- "portable_data_hash": "99999999999999999999999999999993+99"}]}
-
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.ignore_docker_for_reuse = False
- runner.num_retries = 0
-
- loadingContext, runtimeContext = self.helper(runner)
- loadingContext.do_update = True
- runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
- tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
-
- mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
-
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
- arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), runtimeContext)
-
- next(it).run(runtimeContext)
- next(it).run(runtimeContext)
-
- with open("tests/wf/echo-subwf.cwl") as f:
- subwf = StripYAMLComments(f.read())
-
- runner.api.jobs().create.assert_called_with(
- body=JsonDiffMatcher({
- 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
- 'repository': 'arvados',
- 'script_version': 'master',
- 'script': 'crunchrunner',
- 'script_parameters': {
- 'tasks': [{'task.env': {
- 'HOME': '$(task.outdir)',
- 'TMPDIR': '$(task.tmpdir)'},
- 'task.vwd': {
- 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
- 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
- },
- 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
- 'task.stdout': 'cwl.output.json'}]},
- 'runtime_constraints': {
- 'min_scratch_mb_per_node': 4096,
- 'min_cores_per_node': 3,
- 'docker_image': 'arvados/jobs',
- 'min_ram_mb_per_node': 1024
- },
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']],
- find_or_create=True)
-
- def test_default_work_api(self):
- arvados_cwl.add_arv_hints()
-
- api = mock.MagicMock()
- api._rootDesc = copy.deepcopy(get_rootDesc())
- del api._rootDesc.get('resources')['jobs']['methods']['create']
- runner = arvados_cwl.executor.ArvCwlExecutor(api)
- self.assertEqual(runner.work_api, 'containers')
def setUp(self):
cwltool.process._names = set()
- @mock.patch("arvados_cwl.arvdocker.arv_docker_get_image")
- @mock.patch("time.sleep")
- @stubs
- def test_submit(self, stubs, tm, arvdock):
- def get_image(api_client, dockerRequirement, pull_image, project_uuid):
- if dockerRequirement["dockerPull"] == 'arvados/jobs:'+arvados_cwl.__version__:
- return '999999999999999999999999999999d3+99'
- elif dockerRequirement["dockerPull"] == "debian:8":
- return '999999999999999999999999999999d4+99'
- arvdock.side_effect = get_image
-
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=jobs", "--debug",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- stubs.api.collections().create.assert_has_calls([
- mock.call(body=JsonDiffMatcher({
- 'manifest_text':
- '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
- 'replication_desired': None,
- 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
- }), ensure_unique_name=False),
- mock.call(body=JsonDiffMatcher({
- 'manifest_text':
- '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
- 'replication_desired': None,
- 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
- }), ensure_unique_name=False),
- mock.call(body=JsonDiffMatcher({
- 'manifest_text':
- ". 68089141fbf7e020ac90a9d6a575bc8f+1312 0:1312:workflow.cwl\n",
- 'replication_desired': None,
- 'name': 'submit_wf.cwl',
- }), ensure_unique_name=True) ])
-
- arvdock.assert_has_calls([
- mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8"}, True, None),
- mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8", 'http://arvados.org/cwl#dockerCollectionPDH': '999999999999999999999999999999d4+99'}, True, None),
- mock.call(stubs.api, {'dockerPull': 'arvados/jobs:'+arvados_cwl.__version__}, True, None)
- ])
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @mock.patch("time.sleep")
- @stubs
- def test_submit_no_reuse(self, stubs, tm):
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=jobs", "--debug", "--disable-reuse",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
- expect_pipeline["properties"] = {"run_options": {"enable_job_reuse": False}}
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
@stubs
def test_error_when_multiple_storage_classes_specified(self, stubs):
storage_classes = "foo,bar"
sys.stdin, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 1)
- @mock.patch("time.sleep")
- @stubs
- def test_submit_on_error(self, stubs, tm):
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=jobs", "--debug", "--on-error=stop",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:on_error"] = "stop"
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @mock.patch("time.sleep")
- @stubs
- def test_submit_runner_ram(self, stubs, tm):
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--submit-runner-ram=2048",
- "--api=jobs",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["runtime_constraints"]["min_ram_mb_per_node"] = 2048
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
@mock.patch("time.sleep")
@stubs
def test_submit_invalid_runner_ram(self, stubs, tm):
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 1)
- @mock.patch("time.sleep")
- @stubs
- def test_submit_output_name(self, stubs, tm):
- output_name = "test_output_name"
-
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--output-name", output_name,
- "--api=jobs",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @mock.patch("time.sleep")
- @stubs
- def test_submit_pipeline_name(self, stubs, tm):
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--name=hello job 123",
- "--api=jobs",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
- self.assertEqual(exited, 0)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["name"] = "hello job 123"
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
-
- @mock.patch("time.sleep")
- @stubs
- def test_submit_output_tags(self, stubs, tm):
- output_tags = "tag0,tag1,tag2"
-
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--output-tags", output_tags,
- "--api=jobs",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
- self.assertEqual(exited, 0)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
-
- @mock.patch("time.sleep")
- @stubs
- def test_submit_with_project_uuid(self, stubs, tm):
- project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
-
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug",
- "--project-uuid", project_uuid,
- "--api=jobs",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- sys.stdout, sys.stderr, api_client=stubs.api)
- self.assertEqual(exited, 0)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["owner_uuid"] = project_uuid
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
@stubs
def test_submit_container(self, stubs):
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @mock.patch("arvados.collection.CollectionReader")
- @mock.patch("time.sleep")
- @stubs
- def test_submit_jobs_keepref(self, stubs, tm, reader):
- with open("tests/wf/expect_arvworkflow.cwl") as f:
- reader().open().__enter__().read.return_value = f.read()
-
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=jobs", "--debug",
- "keep:99999999999999999999999999999994+99/expect_arvworkflow.cwl#main", "-x", "XxX"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["x"] = "XxX"
- del expect_pipeline["components"]["cwl-runner"]["script_parameters"]["y"]
- del expect_pipeline["components"]["cwl-runner"]["script_parameters"]["z"]
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["cwl:tool"] = "99999999999999999999999999999994+99/expect_arvworkflow.cwl#main"
- expect_pipeline["name"] = "expect_arvworkflow.cwl#main"
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(exited, 0)
-
@mock.patch("time.sleep")
@stubs
def test_submit_arvworkflow(self, stubs, tm):
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
- def test_submit_job_runner_image(self, stubs):
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=jobs", "--debug", "--submit-runner-image=arvados/jobs:123",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
-
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["runtime_constraints"]["docker_image"] = "999999999999999999999999999999d5+99"
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
@stubs
def test_submit_container_runner_image(self, stubs):
exited = arvados_cwl.main(
cwltool_logger.removeHandler(stderr_logger)
-class TestCreateTemplate(unittest.TestCase):
- existing_template_uuid = "zzzzz-p5p6p-validworkfloyml"
-
- def _adjust_script_params(self, expect_component):
- expect_component['script_parameters']['x'] = {
- 'dataclass': 'File',
- 'required': True,
- 'type': 'File',
- 'value': '169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- }
- expect_component['script_parameters']['y'] = {
- 'dataclass': 'Collection',
- 'required': True,
- 'type': 'Directory',
- 'value': '99999999999999999999999999999998+99',
- }
- expect_component['script_parameters']['z'] = {
- 'dataclass': 'Collection',
- 'required': True,
- 'type': 'Directory',
- }
-
- @stubs
- def test_create(self, stubs):
- project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
-
- exited = arvados_cwl.main(
- ["--create-workflow", "--debug",
- "--api=jobs",
- "--project-uuid", project_uuid,
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- stubs.api.pipeline_instances().create.refute_called()
- stubs.api.jobs().create.refute_called()
-
- expect_component = copy.deepcopy(stubs.expect_job_spec)
- self._adjust_script_params(expect_component)
- expect_template = {
- "components": {
- "submit_wf.cwl": expect_component,
- },
- "name": "submit_wf.cwl",
- "owner_uuid": project_uuid,
- }
- stubs.api.pipeline_templates().create.assert_called_with(
- body=JsonDiffMatcher(expect_template), ensure_unique_name=True)
-
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_template_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @stubs
- def test_create_name(self, stubs):
- project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
-
- exited = arvados_cwl.main(
- ["--create-workflow", "--debug",
- "--project-uuid", project_uuid,
- "--api=jobs",
- "--name", "testing 123",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- stubs.api.pipeline_instances().create.refute_called()
- stubs.api.jobs().create.refute_called()
-
- expect_component = copy.deepcopy(stubs.expect_job_spec)
- self._adjust_script_params(expect_component)
- expect_template = {
- "components": {
- "testing 123": expect_component,
- },
- "name": "testing 123",
- "owner_uuid": project_uuid,
- }
- stubs.api.pipeline_templates().create.assert_called_with(
- body=JsonDiffMatcher(expect_template), ensure_unique_name=True)
-
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_template_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @stubs
- def test_update_name(self, stubs):
- project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
-
- exited = arvados_cwl.main(
- ["--update-workflow", self.existing_template_uuid,
- "--debug",
- "--project-uuid", project_uuid,
- "--api=jobs",
- "--name", "testing 123",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- stubs.api.pipeline_instances().create.refute_called()
- stubs.api.jobs().create.refute_called()
-
- expect_component = copy.deepcopy(stubs.expect_job_spec)
- self._adjust_script_params(expect_component)
- expect_template = {
- "components": {
- "testing 123": expect_component,
- },
- "name": "testing 123",
- "owner_uuid": project_uuid,
- }
- stubs.api.pipeline_templates().create.refute_called()
- stubs.api.pipeline_templates().update.assert_called_with(
- body=JsonDiffMatcher(expect_template), uuid=self.existing_template_uuid)
-
- self.assertEqual(stubs.capture_stdout.getvalue(),
- self.existing_template_uuid + '\n')
- self.assertEqual(exited, 0)
-
-
class TestCreateWorkflow(unittest.TestCase):
existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml"
expect_workflow = StripYAMLComments(
stubs.expect_workflow_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
- def test_incompatible_api(self, stubs):
- capture_stderr = StringIO()
- acr_logger = logging.getLogger('arvados.cwl-runner')
- stderr_logger = logging.StreamHandler(capture_stderr)
- acr_logger.addHandler(stderr_logger)
-
- try:
- exited = arvados_cwl.main(
- ["--update-workflow", self.existing_workflow_uuid,
- "--api=jobs",
- "--debug",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- sys.stderr, sys.stderr, api_client=stubs.api)
- self.assertEqual(exited, 1)
- self.assertRegexpMatches(
- capture_stderr.getvalue(),
- "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
- finally:
- acr_logger.removeHandler(stderr_logger)
@stubs
def test_update(self, stubs):
self.assertEqual(stubs.capture_stdout.getvalue(),
stubs.expect_workflow_uuid + '\n')
self.assertEqual(exited, 0)
-
-class TestTemplateInputs(unittest.TestCase):
- expect_template = {
- "components": {
- "inputs_test.cwl": {
- 'runtime_constraints': {
- 'docker_image': '999999999999999999999999999999d3+99',
- 'min_ram_mb_per_node': 1024
- },
- 'script_parameters': {
- 'cwl:tool':
- 'a2de777156fb700f1363b1f2e370adca+60/workflow.cwl#main',
- 'optionalFloatInput': None,
- 'fileInput': {
- 'type': 'File',
- 'dataclass': 'File',
- 'required': True,
- 'title': "It's a file; we expect to find some characters in it.",
- 'description': 'If there were anything further to say, it would be said here,\nor here.'
- },
- 'floatInput': {
- 'type': 'float',
- 'dataclass': 'number',
- 'required': True,
- 'title': 'Floats like a duck',
- 'default': 0.1,
- 'value': 0.1,
- },
- 'optionalFloatInput': {
- 'type': ['null', 'float'],
- 'dataclass': 'number',
- 'required': False,
- },
- 'boolInput': {
- 'type': 'boolean',
- 'dataclass': 'boolean',
- 'required': True,
- 'title': 'True or false?',
- },
- },
- 'repository': 'arvados',
- 'script_version': 'master',
- 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
- 'script': 'cwl-runner',
- },
- },
- "name": "inputs_test.cwl",
- }
-
- @stubs
- def test_inputs_empty(self, stubs):
- exited = arvados_cwl.main(
- ["--debug", "--api=jobs", "--create-template",
- "tests/wf/inputs_test.cwl", "tests/order/empty_order.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- stubs.api.pipeline_templates().create.assert_called_with(
- body=JsonDiffMatcher(self.expect_template), ensure_unique_name=True)
-
- self.assertEqual(exited, 0)
-
- @stubs
- def test_inputs(self, stubs):
- exited = arvados_cwl.main(
- ["--api=jobs", "--create-template",
- "tests/wf/inputs_test.cwl", "tests/order/inputs_test_order.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_template = copy.deepcopy(self.expect_template)
- params = expect_template[
- "components"]["inputs_test.cwl"]["script_parameters"]
- params["fileInput"]["value"] = '169f39d466a5438ac4a90e779bf750c7+53/blorp.txt'
- params["cwl:tool"] = 'a2de777156fb700f1363b1f2e370adca+60/workflow.cwl#main'
- params["floatInput"]["value"] = 1.234
- params["boolInput"]["value"] = True
-
- stubs.api.pipeline_templates().create.assert_called_with(
- body=JsonDiffMatcher(expect_template), ensure_unique_name=True)
- self.assertEqual(exited, 0)
hints:
- class: arv:RunInSingleContainer
- class: ResourceRequirement
- ramMin: $(inputs.count*128)
+ ramMin: $(inputs.count*32)
- class: arv:APIRequirement
scatter: count
run:
type: int
script: File
outputs: []
- arguments: [python, $(inputs.script), $(inputs.count * 128)]
+ arguments: [python, $(inputs.script), $(inputs.count * 32)]
outputs: []
hints:
- class: ResourceRequirement
- ramMin: $(inputs.count*128)
+ ramMin: $(inputs.count*32)
steps:
sleep1:
in:
type: int
script: File
outputs: []
- arguments: [python, $(inputs.script), $(inputs.count * 128)]
+ arguments: [python, $(inputs.script), $(inputs.count * 32)]
id: subtool
hints:
- class: ResourceRequirement
- ramMin: $(inputs.count*128)
+ ramMin: $(inputs.count*32)
inputs:
count:
type: int
script: File
outputs: []
- arguments: [python, $(inputs.script), $(inputs.count * 128)]
+ arguments: [python, $(inputs.script), $(inputs.count * 32)]
id: subtool
hints:
- class: ResourceRequirement
- ramMin: 128
+ ramMin: 32
inputs:
count:
type: int
script: File
outputs: []
- arguments: [python, $(inputs.script), "128"]
+ arguments: [python, $(inputs.script), "32"]
LogUpdateSize ByteSize
}
SLURM struct {
- PrioritySpread int64
- SbatchArgumentsList []string
- Managed struct {
+ PrioritySpread int64
+ SbatchArgumentsList []string
+ SbatchEnvironmentVariables map[string]string
+ Managed struct {
DNSServerConfDir string
DNSServerConfTemplate string
DNSServerReloadCommand string
if disp.Client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
+ for k, v := range disp.cluster.Containers.SLURM.SbatchEnvironmentVariables {
+ os.Setenv(k, v)
+ }
} else {
disp.logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
}
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"net/http/httptest"
"os"
Client:
APIHost: example.com
AuthToken: abcdefg
+ KeepServiceURIs:
+ - https://example.com/keep1
+ - https://example.com/keep2
SbatchArguments: ["--foo", "bar"]
PollPeriod: 12s
PrioritySpread: 42
`)
tmpfile, err := ioutil.TempFile("", "example")
if err != nil {
- log.Fatal(err)
+ c.Error(err)
}
defer os.Remove(tmpfile.Name()) // clean up
if _, err := tmpfile.Write(content); err != nil {
- log.Fatal(err)
+ c.Error(err)
}
if err := tmpfile.Close(); err != nil {
- log.Fatal(err)
+ c.Error(err)
}
+ os.Setenv("ARVADOS_KEEP_SERVICES", "")
err = s.disp.configure("crunch-dispatch-slurm", []string{"-config", tmpfile.Name()})
c.Check(err, IsNil)
c.Check(s.disp.cluster.Containers.ReserveExtraRAM, Equals, arvados.ByteSize(12345))
c.Check(s.disp.cluster.Containers.MinRetryPeriod, Equals, arvados.Duration(13*time.Second))
c.Check(s.disp.cluster.API.MaxItemsPerResponse, Equals, 99)
+ c.Check(s.disp.cluster.Containers.SLURM.SbatchEnvironmentVariables, DeepEquals, map[string]string{
+ "ARVADOS_KEEP_SERVICES": "https://example.com/keep1 https://example.com/keep2",
+ })
+ c.Check(os.Getenv("ARVADOS_KEEP_SERVICES"), Equals, "https://example.com/keep1 https://example.com/keep2")
}
} else {
// /collections/ID/PATH...
collectionID = parseCollectionIDFromURL(pathParts[1])
- tokens = h.Config.AnonymousTokens
stripParts = 2
+ // This path is only meant to work for public
+ // data. Tokens provided with the request are
+ // ignored.
+ credentialsOK = false
}
}
forceReload = true
}
+ if credentialsOK {
+ reqTokens = auth.CredentialsFromRequest(r).Tokens
+ }
+
formToken := r.FormValue("api_token")
if formToken != "" && r.Header.Get("Origin") != "" && attachment && r.URL.Query().Get("api_token") == "" {
// The client provided an explicit token in the POST
//
// * The token isn't embedded in the URL, so we don't
// need to worry about bookmarks and copy/paste.
- tokens = append(tokens, formToken)
+ reqTokens = append(reqTokens, formToken)
} else if formToken != "" && browserMethod[r.Method] {
// The client provided an explicit token in the query
// string, or a form in POST body. We must put the
}
if useSiteFS {
- if tokens == nil {
- tokens = auth.CredentialsFromRequest(r).Tokens
- }
- h.serveSiteFS(w, r, tokens, credentialsOK, attachment)
+ h.serveSiteFS(w, r, reqTokens, credentialsOK, attachment)
return
}
}
if tokens == nil {
- if credentialsOK {
- reqTokens = auth.CredentialsFromRequest(r).Tokens
- }
tokens = append(reqTokens, h.Config.AnonymousTokens...)
}
return resp
}
-func (s *IntegrationSuite) TestDirectoryListing(c *check.C) {
+func (s *IntegrationSuite) TestDirectoryListingWithAnonymousToken(c *check.C) {
+ s.testServer.Config.AnonymousTokens = []string{arvadostest.AnonymousToken}
+ s.testDirectoryListing(c)
+}
+
+func (s *IntegrationSuite) TestDirectoryListingWithNoAnonymousToken(c *check.C) {
+ s.testServer.Config.AnonymousTokens = nil
+ s.testDirectoryListing(c)
+}
+
+func (s *IntegrationSuite) testDirectoryListing(c *check.C) {
s.testServer.Config.AttachmentOnlyHost = "download.example.com"
authHeader := http.Header{
"Authorization": {"OAuth2 " + arvadostest.ActiveToken},
cutDirs: 1,
},
{
- uri: "download.example.com/collections/" + arvadostest.FooAndBarFilesInDirUUID + "/",
- header: authHeader,
- expect: []string{"dir1/foo", "dir1/bar"},
- cutDirs: 2,
+ // URLs of this form ignore authHeader, and
+ // FooAndBarFilesInDirUUID isn't public, so
+ // this returns 404.
+ uri: "download.example.com/collections/" + arvadostest.FooAndBarFilesInDirUUID + "/",
+ header: authHeader,
+ expect: nil,
},
{
uri: "download.example.com/users/active/foo_file_in_dir/",
}
func (s *IntegrationSuite) TestMetrics(c *check.C) {
+ s.testServer.Config.AttachmentOnlyHost = s.testServer.Addr
origin := "http://" + s.testServer.Addr
req, _ := http.NewRequest("GET", origin+"/notfound", nil)
_, err := http.DefaultClient.Do(req)
set -u
-uuid_prefix=$(cat /var/lib/arvados/api_uuid_prefix)
-
-if ! test -s /var/lib/arvados/api_secret_token ; then
- ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/api_secret_token
-fi
-secret_token=$(cat /var/lib/arvados/api_secret_token)
-
-if ! test -s /var/lib/arvados/blob_signing_key ; then
- ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/blob_signing_key
-fi
-blob_signing_key=$(cat /var/lib/arvados/blob_signing_key)
-
-if ! test -s /var/lib/arvados/management_token ; then
- ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/management_token
-fi
-management_token=$(cat /var/lib/arvados/management_token)
-
-sso_app_secret=$(cat /var/lib/arvados/sso_app_secret)
-
-if test -s /var/lib/arvados/vm-uuid ; then
- vm_uuid=$(cat /var/lib/arvados/vm-uuid)
-else
- vm_uuid=$uuid_prefix-2x53u-$(ruby -e 'puts rand(2**400).to_s(36)[0,15]')
- echo $vm_uuid > /var/lib/arvados/vm-uuid
-fi
-
-if ! test -f /var/lib/arvados/api_database_pw ; then
- ruby -e 'puts rand(2**128).to_s(36)' > /var/lib/arvados/api_database_pw
-fi
-database_pw=$(cat /var/lib/arvados/api_database_pw)
-
-if ! (psql postgres -c "\du" | grep "^ arvados ") >/dev/null ; then
- psql postgres -c "create user arvados with password '$database_pw'"
-fi
-psql postgres -c "ALTER USER arvados WITH SUPERUSER;"
+flock /var/lib/arvados/cluster_config.yml.lock /usr/local/lib/arvbox/cluster-config.sh
if test -a /usr/src/arvados/services/api/config/arvados_config.rb ; then
rm -f config/application.yml config/database.yml
- flock /var/lib/arvados/cluster_config.yml.lock /usr/local/lib/arvbox/cluster-config.sh
else
+ uuid_prefix=$(cat /var/lib/arvados/api_uuid_prefix)
+ secret_token=$(cat /var/lib/arvados/api_secret_token)
+ blob_signing_key=$(cat /var/lib/arvados/blob_signing_key)
+ management_token=$(cat /var/lib/arvados/management_token)
+ sso_app_secret=$(cat /var/lib/arvados/sso_app_secret)
+ database_pw=$(cat /var/lib/arvados/api_database_pw)
+ vm_uuid=$(cat /var/lib/arvados/vm-uuid)
+
cat >config/application.yml <<EOF
$RAILS_ENV:
uuid_prefix: $uuid_prefix
. /usr/local/lib/arvbox/common.sh
+set -u
+
+if ! test -s /var/lib/arvados/api_uuid_prefix ; then
+ ruby -e 'puts "x#{rand(2**64).to_s(36)[0,4]}"' > /var/lib/arvados/api_uuid_prefix
+fi
uuid_prefix=$(cat /var/lib/arvados/api_uuid_prefix)
+
+if ! test -s /var/lib/arvados/api_secret_token ; then
+ ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/api_secret_token
+fi
secret_token=$(cat /var/lib/arvados/api_secret_token)
+
+if ! test -s /var/lib/arvados/blob_signing_key ; then
+ ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/blob_signing_key
+fi
blob_signing_key=$(cat /var/lib/arvados/blob_signing_key)
+
+if ! test -s /var/lib/arvados/management_token ; then
+ ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/management_token
+fi
management_token=$(cat /var/lib/arvados/management_token)
+
+if ! test -s /var/lib/arvados/sso_app_secret ; then
+ ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/sso_app_secret
+fi
sso_app_secret=$(cat /var/lib/arvados/sso_app_secret)
+
+if ! test -s /var/lib/arvados/vm-uuid ; then
+ echo $uuid_prefix-2x53u-$(ruby -e 'puts rand(2**400).to_s(36)[0,15]') > /var/lib/arvados/vm-uuid
+fi
vm_uuid=$(cat /var/lib/arvados/vm-uuid)
+
+if ! test -f /var/lib/arvados/api_database_pw ; then
+ ruby -e 'puts rand(2**128).to_s(36)' > /var/lib/arvados/api_database_pw
+fi
database_pw=$(cat /var/lib/arvados/api_database_pw)
+if ! (psql postgres -c "\du" | grep "^ arvados ") >/dev/null ; then
+ psql postgres -c "create user arvados with password '$database_pw'"
+fi
+psql postgres -c "ALTER USER arvados WITH SUPERUSER;"
+
+if ! test -s /var/lib/arvados/workbench_secret_token ; then
+ ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/workbench_secret_token
+fi
workbench_secret_key_base=$(cat /var/lib/arvados/workbench_secret_token)
if test -s /var/lib/arvados/api_rails_env ; then
. /usr/local/lib/arvbox/common.sh
+/usr/local/lib/arvbox/runsu.sh flock /var/lib/arvados/cluster_config.yml.lock /usr/local/lib/arvbox/cluster-config.sh
+
uuid_prefix=$(cat /var/lib/arvados/api_uuid_prefix)
if ! openssl verify -CAfile $root_cert $root_cert ; then
exit
fi
-flock /var/lib/arvados/cluster_config.yml.lock /usr/local/lib/arvbox/cluster-config.sh
+/usr/local/lib/arvbox/runsu.sh flock /var/lib/arvados/cluster_config.yml.lock /usr/local/lib/arvbox/cluster-config.sh
exec /usr/local/lib/arvbox/runsu.sh /usr/local/bin/arvados-controller
set -u
-if ! test -s /var/lib/arvados/api_uuid_prefix ; then
- ruby -e 'puts "x#{rand(2**64).to_s(36)[0,4]}"' > /var/lib/arvados/api_uuid_prefix
-fi
uuid_prefix=$(cat /var/lib/arvados/api_uuid_prefix)
if ! test -s /var/lib/arvados/sso_secret_token ; then
if ! test -f /var/lib/arvados/sso_database_setup ; then
bundle exec rake db:setup
- if ! test -s /var/lib/arvados/sso_app_secret ; then
- ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/sso_app_secret
- fi
app_secret=$(cat /var/lib/arvados/sso_app_secret)
bundle exec rails console <<EOF
set -u
-if ! test -s /var/lib/arvados/workbench_secret_token ; then
- ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/workbench_secret_token
-fi
secret_token=$(cat /var/lib/arvados/workbench_secret_token)
if test -a /usr/src/arvados/apps/workbench/config/arvados_config.rb ; then