# unit tests.
stdout = None
- if arvargs.submit and (arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow)):
+ if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow):
executor.loadingContext.do_validate = False
executor.fast_submit = True
}
elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
workflowpath = "/var/lib/cwl/workflow.json#main"
- record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
- packed = yaml.safe_load(record["definition"])
+ #record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
+ #packed = yaml.safe_load(record["definition"])
+ packed = self.loadingContext.loader.idx[self.embedded_tool.tool["id"]]
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
"content": packed
}
container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
+ main = self.loadingContext.loader.idx["_:main"]
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
- "content": packed
+ "content": main
}
container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
logger = logging.getLogger('arvados.cwl-runner')
-cached_lookups = {}
-cached_lookups_lock = threading.Lock()
-
def determine_image_id(dockerImageId):
for line in (
subprocess.check_output( # nosec
return None
-def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid,
- force_pull, tmp_outdir_prefix, match_local_docker, copy_deps):
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, runtimeContext):
"""Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+ project_uuid = runtimeContext.project_uuid,
+ force_pull = runtimeContext.force_docker_pull,
+ tmp_outdir_prefix = runtimeContext.tmp_outdir_prefix,
+ match_local_docker = runtimeContext.match_local_docker,
+ copy_deps = runtimeContext.copy_deps
+ cached_lookups = runtimeContext.cached_docker_lookups
+
if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement:
return dockerRequirement["http://arvados.org/cwl#dockerCollectionPDH"]
if hasattr(dockerRequirement, 'lc'):
dockerRequirement.lc.data["dockerImageId"] = dockerRequirement.lc.data["dockerPull"]
- global cached_lookups
- global cached_lookups_lock
- with cached_lookups_lock:
- if dockerRequirement["dockerImageId"] in cached_lookups:
- return cached_lookups[dockerRequirement["dockerImageId"]]
+ if dockerRequirement["dockerImageId"] in cached_lookups:
+ return cached_lookups[dockerRequirement["dockerImageId"]]
with SourceLine(dockerRequirement, "dockerImageId", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
sp = dockerRequirement["dockerImageId"].split(":")
pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
- with cached_lookups_lock:
- cached_lookups[dockerRequirement["dockerImageId"]] = pdh
+ cached_lookups[dockerRequirement["dockerImageId"]] = pdh
return pdh
-
-def arv_docker_clear_cache():
- global cached_lookups
- global cached_lookups_lock
- with cached_lookups_lock:
- cached_lookups = {}
def rel_ref(s, baseuri, urlexpander, merged_map):
uri = urlexpander(s, baseuri)
- print("DDD", baseuri, merged_map)
fileuri = urllib.parse.urldefrag(baseuri)[0]
if fileuri in merged_map:
replacements = merged_map[fileuri].resolved
r = os.path.relpath(p2, p1)
if r == ".":
r = ""
- print("AAA", uri, s)
- print("BBBB", p1, p2, p3, r)
return os.path.join(r, p3)
-def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
+def update_refs(d, baseuri, urlexpander, merged_map, set_block_style, runtimeContext):
if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
d.fa.set_block_style()
if isinstance(d, MutableSequence):
for s in d:
- update_refs(s, baseuri, urlexpander, merged_map, set_block_style)
+ update_refs(s, baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
elif isinstance(d, MutableMapping):
if "id" in d:
baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
+ if d.get("class") == "DockerRequirement":
+ dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
+ d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
+
for s in d:
for field in ("$include", "$import", "location", "run"):
if field in d and isinstance(d[field], str):
for n, s in enumerate(d["$schemas"]):
d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
- update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style)
+ update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
- runtimeContext, uuid=None,
- submit_runner_ram=0, name=None, merged_map=None,
- submit_runner_image=None,
- git_info=None):
+ runtimeContext,
+ uuid=None,
+ submit_runner_ram=0, name=None, merged_map=None,
+ submit_runner_image=None,
+ git_info=None,
+ set_defaults=False):
firstfile = None
workflow_files = set()
col = arvados.collection.Collection()
- #print(merged_map.keys())
-
for w in workflow_files | import_files:
# 1. load YAML
# 2. find $import, $include, $schema, run, location
# 3. update field value
- update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style)
+ update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style, runtimeContext)
with col.open(w[n+1:], "wt") as f:
yamlloader.dump(result, stream=f)
if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
- col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True)
-
toolfile = tool.tool["id"][n+1:]
+ properties = {
+ "type": "workflow",
+ "arv:workflowMain": toolfile,
+ }
+
+ col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
+
+ adjustDirObjs(job_order, trim_listing)
+ adjustFileObjs(job_order, trim_anonymous_location)
+ adjustDirObjs(job_order, trim_anonymous_location)
+
# now construct the wrapper
step = {
main = tool.tool
+ wf_runner_resources = None
+
+ hints = main.get("hints", [])
+ found = False
+ for h in hints:
+ if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
+ wf_runner_resources = h
+ found = True
+ break
+ if not found:
+ wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
+ hints.append(wf_runner_resources)
+
+ # uncomment me
+ # wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+ # submit_runner_image or "arvados/jobs:"+__version__,
+ # runtimeContext)
+
+ if submit_runner_ram:
+ wf_runner_resources["ramMin"] = submit_runner_ram
+
newinputs = []
for i in main["inputs"]:
inp = {}
"loadListing", "default"):
if f in i:
inp[f] = i[f]
+
+ if set_defaults:
+ sn = shortname(i["id"])
+ if sn in job_order:
+ inp["default"] = job_order[sn]
+
inp["id"] = "#main/%s" % shortname(i["id"])
newinputs.append(inp)
if main.get("requirements"):
wrapper["requirements"].extend(main["requirements"])
- if main.get("hints"):
+ if hints:
wrapper["hints"] = main["hints"]
doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
for g in git_info:
doc[g] = git_info[g]
- update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False)
+ update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext)
+
+ return doc
+
+
+def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
if project_uuid:
body["workflow"]["owner_uuid"] = project_uuid
- if uuid:
- call = arvRunner.api.workflows().update(uuid=uuid, body=body)
+ if update_uuid:
+ call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
else:
call = arvRunner.api.workflows().create(body=body)
return call.execute(num_retries=arvRunner.num_retries)["uuid"]
self.defer_downloads = False
self.varying_url_params = ""
self.prefer_cached_downloads = False
+ self.cached_docker_lookups = {}
super(ArvRuntimeContext, self).__init__(kwargs)
#with Perf(metrics, "load_tool"):
# tool = load_tool(tool.tool, loadingContext)
- if runtimeContext.update_workflow or runtimeContext.create_workflow:
- # Create a pipeline template or workflow record and exit.
- if self.work_api == "containers":
- uuid = new_upload_workflow(self, tool, job_order,
- runtimeContext.project_uuid,
- runtimeContext,
- uuid=runtimeContext.update_workflow,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map,
- submit_runner_image=runtimeContext.submit_runner_image,
- git_info=git_info)
+ if runtimeContext.update_workflow or runtimeContext.create_workflow or (runtimeContext.submit and not self.fast_submit):
+ # upload workflow and get back the workflow wrapper
+
+ workflow_wrapper = new_upload_workflow(self, tool, job_order,
+ runtimeContext.project_uuid,
+ runtimeContext,
+ uuid=runtimeContext.update_workflow,
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ merged_map=merged_map,
+ submit_runner_image=runtimeContext.submit_runner_image,
+ git_info=git_info,
+ set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow))
+
+ if runtimeContext.update_workflow or runtimeContext.create_workflow:
+ # Now create a workflow record and exit.
+ uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
+ runtimeContext.project_uuid, runtimeContext.update_workflow)
self.stdout.write(uuid + "\n")
return (None, "success")
+ loadingContext.loader.idx["_:main"] = workflow_wrapper
+
+ # Reload just the wrapper workflow.
+ self.fast_submit = True
+ tool = load_tool(workflow_wrapper, loadingContext)
+
+
self.apply_reqs(job_order, tool)
self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
import threading
from collections import OrderedDict
-import ruamel.yaml as yaml
+import ruamel.yaml
import cwltool.stdfsaccess
from cwltool.pathmapper import abspath
return f.read()
if url.startswith("arvwf:"):
record = self.api_client.workflows().get(uuid=url[6:]).execute(num_retries=self.num_retries)
- definition = yaml.round_trip_load(record["definition"])
+ yaml = ruamel.yaml.YAML(typ='rt', pure=True)
+ definition = yaml.load(record["definition"])
definition["label"] = record["name"]
return yaml.round_trip_dump(definition)
return super(CollectionFetcher, self).fetch_text(url)
single_collection=True,
optional_deps=optional_deps)
- print("MMM", mapper._pathmap)
-
keeprefs = set()
def addkeepref(k):
if k.startswith("keep:"):
keeprefs.add(collection_pdh_pattern.match(k).group(1))
- def setloc(p):
+
+ def collectloc(p):
loc = p.get("location")
if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
- p["location"] = mapper.mapper(p["location"]).resolved
addkeepref(p["location"])
return
if uuid not in uuid_map:
raise SourceLine(p, "location", validate.ValidationException).makeError(
"Collection uuid %s not found" % uuid)
- p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
- p[collectionUUID] = uuid
- #with Perf(metrics, "setloc"):
- # visit_class(workflowobj, ("File", "Directory"), setloc)
- # visit_class(discovered, ("File", "Directory"), setloc)
+ with Perf(metrics, "collectloc"):
+ visit_class(workflowobj, ("File", "Directory"), collectloc)
+ visit_class(discovered, ("File", "Directory"), collectloc)
if discovered_secondaryfiles is not None:
for d in discovered:
continue
col = col["items"][0]
col["name"] = arvados.util.trim_name(col["name"])
- print("CCC name", col["name"])
try:
arvrunner.api.collections().create(body={"collection": {
"owner_uuid": runtimeContext.project_uuid,
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
- True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ True, runtimeContext)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool, runtimeContext)
else:
packed["http://schema.org/version"] = githash
+def setloc(mapper, p):
+ loc = p.get("location")
+ if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
+ p["location"] = mapper.mapper(p["location"]).resolved
+ return
+
+ if not loc:
+ return
+
+ if collectionUUID in p:
+ uuid = p[collectionUUID]
+ if uuid not in uuid_map:
+ raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ gp = collection_pdh_pattern.match(loc)
+ if gp and uuid_map[uuid] != gp.groups()[0]:
+ # This file entry has both collectionUUID and a PDH
+ # location. If the PDH doesn't match the one returned
+ # the API server, raise an error.
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Expected collection uuid %s to be %s but API server reported %s" % (
+ uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
+ gp = collection_uuid_pattern.match(loc)
+ if not gp:
+ # Not a uuid pattern (must be a pdh pattern)
+ return
+
+ uuid = gp.groups()[0]
+ if uuid not in uuid_map:
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+ p[collectionUUID] = uuid
+
+
+def update_from_mapper(workflowobj, mapper):
+ with Perf(metrics, "setloc"):
+ visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
if "job_order" in job_order:
del job_order["job_order"]
+ update_from_mapper(job_order, jobmapper)
+
return job_order
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
discovered_secondaryfiles=discovered_secondaryfiles,
cache=tool_dep_cache)
- print("PM", pm.items())
document_loader.idx[deptool["id"]] = deptool
toolmap = {}
for k,v in pm.items():
try:
return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
- True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ True, runtimeContext)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
collection_cache_is_default=True,
git_info=None):
- loadingContext = loadingContext.copy()
- loadingContext.metadata = updated_tool.metadata.copy()
+ self.loadingContext = loadingContext.copy()
+ self.loadingContext.metadata = updated_tool.metadata.copy()
super(Runner, self).__init__(updated_tool.tool, loadingContext)
self.intermediate_output_ttl = intermediate_output_ttl
self.priority = priority
self.secret_store = secret_store
- self.enable_dev = loadingContext.enable_dev
+ self.enable_dev = self.loadingContext.enable_dev
self.git_info = git_info
- self.fast_parser = loadingContext.fast_parser
+ self.fast_parser = self.loadingContext.fast_parser
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
import arvados_cwl
import arvados_cwl.context
import arvados_cwl.util
-from arvados_cwl.arvdocker import arv_docker_clear_cache
+#from arvados_cwl.arvdocker import arv_docker_clear_cache
import copy
import arvados.config
import logging
def setUp(self):
cwltool.process._names = set()
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
def tearDown(self):
root_logger = logging.getLogger('')
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_run(self, keepdocker):
for enable_reuse in (True, False):
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_setting_storage_class(self, keepdocker):
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_setting_process_properties(self, keepdocker):
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_cuda_requirement(self, keepdocker):
arvados_cwl.add_arv_hints()
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_match_local_docker(self, keepdocker, determine_image_id):
arvados_cwl.add_arv_hints()
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(container_request))
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runtimeContext.match_local_docker = True
container_request['container_image'] = '99999999999999999999999999999993+99'
container_request['name'] = 'test_run_True_2'
arvados_cwl.add_arv_hints()
for enable_preemptible in (None, True, False):
for preemptible_hint in (None, True, False):
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
class TestWorkflow(unittest.TestCase):
def setUp(self):
cwltool.process._names = set()
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
def helper(self, runner, enable_reuse=True):
document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
from .matcher import JsonDiffMatcher, StripYAMLComments
from .mock_discovery import get_rootDesc
-import ruamel.yaml as yaml
+import ruamel.yaml
_rootDesc = None
stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
- stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- stubs.api.jobs().create().execute.return_value = {
- "uuid": stubs.expect_job_uuid,
- "state": "Queued",
- }
-
stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz"
stubs.api.container_requests().create().execute.return_value = {
"uuid": stubs.expect_container_request_uuid,
"state": "Queued"
}
- stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
- stubs.api.pipeline_templates().create().execute.return_value = {
- "uuid": stubs.expect_pipeline_template_uuid,
- }
- stubs.expect_job_spec = {
- 'runtime_constraints': {
- 'docker_image': '999999999999999999999999999999d3+99',
- 'min_ram_mb_per_node': 1024
- },
- 'script_parameters': {
- 'x': {
- 'basename': 'blorp.txt',
- 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- 'class': 'File'
- },
- 'y': {
- 'basename': '99999999999999999999999999999998+99',
- 'location': 'keep:99999999999999999999999999999998+99',
- 'class': 'Directory'
- },
- 'z': {
- 'basename': 'anonymous',
- "listing": [{
- "basename": "renamed.txt",
- "class": "File",
- "location": "keep:99999999999999999999999999999998+99/file1.txt",
- "size": 0
- }],
- 'class': 'Directory'
- },
- 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main'
- },
- 'repository': 'arvados',
- 'script_version': 'master',
- 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
- 'script': 'cwl-runner'
- }
- stubs.pipeline_component = stubs.expect_job_spec.copy()
- stubs.expect_pipeline_instance = {
- 'name': 'submit_wf.cwl',
- 'state': 'RunningOnServer',
- 'owner_uuid': None,
- "components": {
- "cwl-runner": {
- 'runtime_constraints': {'docker_image': '999999999999999999999999999999d3+99', 'min_ram_mb_per_node': 1024},
- 'script_parameters': {
- 'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
- 'x': {"value": {
- 'basename': 'blorp.txt',
- 'class': 'File',
- 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- "size": 16
- }},
- 'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
- 'listing': [
- {
- 'basename': 'renamed.txt',
- 'class': 'File', 'location':
- 'keep:99999999999999999999999999999998+99/file1.txt',
- 'size': 0
- }
- ]}},
- 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main',
- 'arv:debug': True,
- 'arv:enable_reuse': True,
- 'arv:on_error': 'continue'
- },
- 'repository': 'arvados',
- 'script_version': 'master',
- 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
- 'script': 'cwl-runner',
- 'job': {'state': 'Queued', 'uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}
- }
- }
- }
- stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
- stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
- stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
- stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
- stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
- "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
- "state": "Queued"
- }
- stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
- stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
-
cwd = os.getcwd()
- filepath = os.path.join(cwd, "tests/wf/submit_wf_packed.cwl")
+ filepath = os.path.join(cwd, "tests/wf/submit_wf_wrapper.cwl")
with open(filepath) as f:
- expect_packed_workflow = yaml.round_trip_load(f)
+ yaml = ruamel.yaml.YAML(typ='rt', pure=True)
+ expect_packed_workflow = yaml.load(f)
if wfpath is None:
wfpath = wfname
mocktool = mock.NonCallableMock(tool=gitinfo_workflow["$graph"][0], metadata=gitinfo_workflow)
stubs.git_info = arvados_cwl.executor.ArvCwlExecutor.get_git_info(mocktool)
- expect_packed_workflow.update(stubs.git_info)
+ #expect_packed_workflow.update(stubs.git_info)
stubs.git_props = {"arv:"+k.split("#", 1)[1]: v for k,v in stubs.git_info.items()}
- if wfname == wfpath:
- container_name = "%s (%s)" % (wfpath, stubs.git_props["arv:gitDescribe"])
- else:
- container_name = wfname
+ #if wfname == wfpath:
+ # container_name = "%s (%s)" % (wfpath, stubs.git_props["arv:gitDescribe"])
+ #else:
+ container_name = wfname
stubs.expect_container_spec = {
'priority': 500,
'vcpus': 1,
'ram': (1024+256)*1024*1024
},
+ 'properties': {},
'use_existing': False,
- 'properties': stubs.git_props,
'secret_mounts': {}
}
+ #'properties': stubs.git_props,
stubs.expect_workflow_uuid = "zzzzz-7fd4e-zzzzzzzzzzzzzzz"
stubs.api.workflows().create().execute.return_value = {
def setUp(self):
cwltool.process._names = set()
- arvados_cwl.arvdocker.arv_docker_clear_cache()
+ #arvados_cwl.arvdocker.arv_docker_clear_cache()
def tearDown(self):
root_logger = logging.getLogger('')
@stubs()
def test_submit_container(self, stubs):
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=containers", "--debug",
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--disable-git",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl ('+ stubs.git_props["arv:gitDescribe"] +') input (169f39d466a5438ac4a90e779bf750c7+53)',
+ 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
}), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
@mock.patch("cwltool.docker.DockerCommandLineJob.get_image")
@mock.patch("arvados.api")
def test_arvados_jobs_image(self, api, get_image, find_one_image_hash):
- arvados_cwl.arvdocker.arv_docker_clear_cache()
+ #arvados_cwl.arvdocker.arv_docker_clear_cache()
arvrunner = mock.MagicMock()
arvrunner.project_uuid = ""
def setUp(self):
cwltool.process._names = set()
- arvados_cwl.arvdocker.arv_docker_clear_cache()
+ #arvados_cwl.arvdocker.arv_docker_clear_cache()
def tearDown(self):
root_logger = logging.getLogger('')