import cwltool.main
import cwltool.workflow
import cwltool.process
-import schema_salad
from schema_salad.sourceline import SourceLine
import arvados
kwargs["fetcher_constructor"] = partial(CollectionFetcher,
api_client=self.api,
fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
- num_retries=self.num_retries,
- overrides=kwargs.get("override_tools"))
+ num_retries=self.num_retries)
+ kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
def check_features(self, obj):
if isinstance(obj, dict):
- if obj.get("writable"):
- raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
+ if obj.get("writable") and self.work_api != "containers":
+ raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
if obj.get("class") == "DockerRequirement":
if obj.get("dockerOutputDirectory"):
- # TODO: can be supported by containers API, but not jobs API.
- raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
- "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+ if self.work_api != "containers":
+ raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+ "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
+ if not obj.get("dockerOutputDirectory").startswith('/'):
+ raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
+ "Option 'dockerOutputDirectory' must be an absolute path.")
for v in obj.itervalues():
self.check_features(v)
elif isinstance(obj, list):
for i,v in enumerate(obj):
- with SourceLine(obj, i, UnsupportedRequirement):
+ with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
self.check_features(v)
def make_output_collection(self, name, tagsString, outputObj):
def rewrite(fileobj):
fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
- for k in ("basename", "listing", "contents"):
+ for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
if k in fileobj:
del fileobj[k]
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- override_tools = {}
- upload_workflow_deps(self, tool, override_tools)
+ merged_map = upload_workflow_deps(self, tool)
# Reload tool object which may have been updated by
# upload_workflow_deps
makeTool=self.arv_make_tool,
loader=tool.doc_loader,
avsc_names=tool.doc_schema,
- metadata=tool.metadata,
- override_tools=override_tools)
+ metadata=tool.metadata)
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % kwargs["name"],
kwargs.get("enable_reuse"),
uuid=existing_uuid,
submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"])
+ name=kwargs["name"],
+ merged_map=merged_map)
tmpl.save()
# cwltool.main will write our return value to stdout.
return (tmpl.uuid, "success")
self.project_uuid,
uuid=existing_uuid,
submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"]),
+ name=kwargs["name"],
+ merged_map=merged_map),
"success")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
if kwargs.get("submit"):
# Submit a runner job to run the workflow for us.
if self.work_api == "containers":
- if tool.tool["class"] == "CommandLineTool":
+ if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
kwargs["runnerjob"] = tool.tool["id"]
- upload_dependencies(self,
- kwargs["name"],
- tool.doc_loader,
- tool.tool,
- tool.tool["id"],
- False)
runnerjob = tool.job(job_order,
self.output_callback,
**kwargs).next()
name=kwargs.get("name"),
on_error=kwargs.get("on_error"),
submit_runner_image=kwargs.get("submit_runner_image"),
- intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
+ intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
+ merged_map=merged_map)
elif self.work_api == "jobs":
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
self.output_name,
submit_runner_ram=kwargs.get("submit_runner_ram"),
name=kwargs.get("name"),
on_error=kwargs.get("on_error"),
- submit_runner_image=kwargs.get("submit_runner_image"))
+ submit_runner_image=kwargs.get("submit_runner_image"),
+ merged_map=merged_map)
elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
arvpkg = pkg_resources.require("arvados-python-client")
cwlpkg = pkg_resources.require("cwltool")
- return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
+ return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
"arvados-python-client", arvpkg[0].version,
"cwltool", cwlpkg[0].version)
arvargs.conformance_test = None
arvargs.use_container = True
arvargs.relax_path_checks = True
- arvargs.validate = None
+ arvargs.print_supported_versions = False
make_fs_access = partial(CollectionFsAccess,
collection_cache=runner.collection_cache)