19385: Work in progress checkpoint, submitting uses wrappers
authorPeter Amstutz <peter.amstutz@curii.com>
Thu, 12 Jan 2023 20:53:20 +0000 (15:53 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Thu, 12 Jan 2023 20:53:20 +0000 (15:53 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

Tested-by: Peter Amstutz <peter.amstutz@curii.com>
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/context.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_submit.py

index 20eed989cc0f05c19e79e8e0ecd2df4dece977c6..e151dbf7bb428683d329f2109f1ab5e7ced2a1ab 100644 (file)
@@ -381,7 +381,7 @@ def main(args=sys.argv[1:],
         # unit tests.
         stdout = None
 
-    if arvargs.submit and (arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow)):
+    if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow):
         executor.loadingContext.do_validate = False
         executor.fast_submit = True
 
index 30e0092e93753726601bc6786cc1b746d3882c70..e0ee7285a87ffcf99b04c8e757706dba6dede258 100644 (file)
@@ -537,19 +537,20 @@ class RunnerContainer(Runner):
             }
         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
             workflowpath = "/var/lib/cwl/workflow.json#main"
-            record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
-            packed = yaml.safe_load(record["definition"])
+            #record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
+            #packed = yaml.safe_load(record["definition"])
+            packed = self.loadingContext.loader.idx[self.embedded_tool.tool["id"]]
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
                 "content": packed
             }
             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
         else:
-            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
+            main = self.loadingContext.loader.idx["_:main"]
             workflowpath = "/var/lib/cwl/workflow.json#main"
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
-                "content": packed
+                "content": main
             }
 
         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
index ef8398df341e88861ff80d6152762b52cb00ae0a..9c20c0c5095a04b27cfd92f9bf79bfb3324824d5 100644 (file)
@@ -17,9 +17,6 @@ import arvados.commands.keepdocker
 
 logger = logging.getLogger('arvados.cwl-runner')
 
-cached_lookups = {}
-cached_lookups_lock = threading.Lock()
-
 def determine_image_id(dockerImageId):
     for line in (
         subprocess.check_output(  # nosec
@@ -56,10 +53,16 @@ def determine_image_id(dockerImageId):
     return None
 
 
-def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid,
-                         force_pull, tmp_outdir_prefix, match_local_docker, copy_deps):
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, runtimeContext):
     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
 
+    project_uuid = runtimeContext.project_uuid,
+    force_pull = runtimeContext.force_docker_pull,
+    tmp_outdir_prefix = runtimeContext.tmp_outdir_prefix,
+    match_local_docker = runtimeContext.match_local_docker,
+    copy_deps = runtimeContext.copy_deps
+    cached_lookups = runtimeContext.cached_docker_lookups
+
     if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement:
         return dockerRequirement["http://arvados.org/cwl#dockerCollectionPDH"]
 
@@ -69,11 +72,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         if hasattr(dockerRequirement, 'lc'):
             dockerRequirement.lc.data["dockerImageId"] = dockerRequirement.lc.data["dockerPull"]
 
-    global cached_lookups
-    global cached_lookups_lock
-    with cached_lookups_lock:
-        if dockerRequirement["dockerImageId"] in cached_lookups:
-            return cached_lookups[dockerRequirement["dockerImageId"]]
+    if dockerRequirement["dockerImageId"] in cached_lookups:
+        return cached_lookups[dockerRequirement["dockerImageId"]]
 
     with SourceLine(dockerRequirement, "dockerImageId", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
         sp = dockerRequirement["dockerImageId"].split(":")
@@ -154,13 +154,6 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
 
         pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
 
-        with cached_lookups_lock:
-            cached_lookups[dockerRequirement["dockerImageId"]] = pdh
+        cached_lookups[dockerRequirement["dockerImageId"]] = pdh
 
     return pdh
-
-def arv_docker_clear_cache():
-    global cached_lookups
-    global cached_lookups_lock
-    with cached_lookups_lock:
-        cached_lookups = {}
index 784e1bdb366d49f1a4d68c340987e5679f760e45..299ad5c0d55b2bfadce2b0f6524a6abbbe51521f 100644 (file)
@@ -129,7 +129,6 @@ def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info,
 
 def rel_ref(s, baseuri, urlexpander, merged_map):
     uri = urlexpander(s, baseuri)
-    print("DDD", baseuri, merged_map)
     fileuri = urllib.parse.urldefrag(baseuri)[0]
     if fileuri in merged_map:
         replacements = merged_map[fileuri].resolved
@@ -145,22 +144,24 @@ def rel_ref(s, baseuri, urlexpander, merged_map):
     r = os.path.relpath(p2, p1)
     if r == ".":
         r = ""
-    print("AAA", uri, s)
-    print("BBBB", p1, p2, p3, r)
     return os.path.join(r, p3)
 
 
-def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
+def update_refs(d, baseuri, urlexpander, merged_map, set_block_style, runtimeContext):
     if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
         d.fa.set_block_style()
 
     if isinstance(d, MutableSequence):
         for s in d:
-            update_refs(s, baseuri, urlexpander, merged_map, set_block_style)
+            update_refs(s, baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
     elif isinstance(d, MutableMapping):
         if "id" in d:
             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
 
+        if d.get("class") == "DockerRequirement":
+            dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
+            d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
+
         for s in d:
             for field in ("$include", "$import", "location", "run"):
                 if field in d and isinstance(d[field], str):
@@ -170,13 +171,15 @@ def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
                 for n, s in enumerate(d["$schemas"]):
                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
 
-            update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style)
+            update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
 
 def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
-                    runtimeContext, uuid=None,
-                    submit_runner_ram=0, name=None, merged_map=None,
-                    submit_runner_image=None,
-                    git_info=None):
+                        runtimeContext,
+                        uuid=None,
+                        submit_runner_ram=0, name=None, merged_map=None,
+                        submit_runner_image=None,
+                        git_info=None,
+                        set_defaults=False):
 
     firstfile = None
     workflow_files = set()
@@ -215,8 +218,6 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     col = arvados.collection.Collection()
 
-    #print(merged_map.keys())
-
     for w in workflow_files | import_files:
         # 1. load YAML
 
@@ -235,7 +236,7 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
         # 2. find $import, $include, $schema, run, location
         # 3. update field value
-        update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style)
+        update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style, runtimeContext)
 
         with col.open(w[n+1:], "wt") as f:
             yamlloader.dump(result, stream=f)
@@ -252,10 +253,19 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
 
-    col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True)
-
     toolfile = tool.tool["id"][n+1:]
 
+    properties = {
+        "type": "workflow",
+        "arv:workflowMain": toolfile,
+    }
+
+    col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
+
+    adjustDirObjs(job_order, trim_listing)
+    adjustFileObjs(job_order, trim_anonymous_location)
+    adjustDirObjs(job_order, trim_anonymous_location)
+
     # now construct the wrapper
 
     step = {
@@ -268,6 +278,27 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     main = tool.tool
 
+    wf_runner_resources = None
+
+    hints = main.get("hints", [])
+    found = False
+    for h in hints:
+        if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
+            wf_runner_resources = h
+            found = True
+            break
+    if not found:
+        wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
+        hints.append(wf_runner_resources)
+
+    # uncomment me
+    # wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+    #                                                               submit_runner_image or "arvados/jobs:"+__version__,
+    #                                                               runtimeContext)
+
+    if submit_runner_ram:
+        wf_runner_resources["ramMin"] = submit_runner_ram
+
     newinputs = []
     for i in main["inputs"]:
         inp = {}
@@ -282,6 +313,12 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
                   "loadListing", "default"):
             if f in i:
                 inp[f] = i[f]
+
+        if set_defaults:
+            sn = shortname(i["id"])
+            if sn in job_order:
+                inp["default"] = job_order[sn]
+
         inp["id"] = "#main/%s" % shortname(i["id"])
         newinputs.append(inp)
 
@@ -309,7 +346,7 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     if main.get("requirements"):
         wrapper["requirements"].extend(main["requirements"])
-    if main.get("hints"):
+    if hints:
         wrapper["hints"] = main["hints"]
 
     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
@@ -318,7 +355,12 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
         for g in git_info:
             doc[g] = git_info[g]
 
-    update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False)
+    update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext)
+
+    return doc
+
+
+def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
 
     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
 
@@ -331,8 +373,8 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
     if project_uuid:
         body["workflow"]["owner_uuid"] = project_uuid
 
-    if uuid:
-        call = arvRunner.api.workflows().update(uuid=uuid, body=body)
+    if update_uuid:
+        call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
     else:
         call = arvRunner.api.workflows().create(body=body)
     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
index 3ce561f66d3404e03c4aab19470439af22bf83dd..125527f783f9faa49d66f5e2c09b46c1373eb353 100644 (file)
@@ -42,6 +42,7 @@ class ArvRuntimeContext(RuntimeContext):
         self.defer_downloads = False
         self.varying_url_params = ""
         self.prefer_cached_downloads = False
+        self.cached_docker_lookups = {}
 
         super(ArvRuntimeContext, self).__init__(kwargs)
 
index 1e6344f5e9e3b6866a8e2bc8ffeb899fd82f1135..7e9840db6f4b007755703d3f0c84cc1d5f96a9d3 100644 (file)
@@ -705,21 +705,34 @@ The 'jobs' API is no longer supported.
         #with Perf(metrics, "load_tool"):
         #    tool = load_tool(tool.tool, loadingContext)
 
-        if runtimeContext.update_workflow or runtimeContext.create_workflow:
-            # Create a pipeline template or workflow record and exit.
-            if self.work_api == "containers":
-                uuid = new_upload_workflow(self, tool, job_order,
-                                       runtimeContext.project_uuid,
-                                       runtimeContext,
-                                       uuid=runtimeContext.update_workflow,
-                                       submit_runner_ram=runtimeContext.submit_runner_ram,
-                                       name=runtimeContext.name,
-                                       merged_map=merged_map,
-                                       submit_runner_image=runtimeContext.submit_runner_image,
-                                       git_info=git_info)
+        if runtimeContext.update_workflow or runtimeContext.create_workflow or (runtimeContext.submit and not self.fast_submit):
+            # upload workflow and get back the workflow wrapper
+
+            workflow_wrapper = new_upload_workflow(self, tool, job_order,
+                                                   runtimeContext.project_uuid,
+                                                   runtimeContext,
+                                                   uuid=runtimeContext.update_workflow,
+                                                   submit_runner_ram=runtimeContext.submit_runner_ram,
+                                                   name=runtimeContext.name,
+                                                   merged_map=merged_map,
+                                                   submit_runner_image=runtimeContext.submit_runner_image,
+                                                   git_info=git_info,
+                                                   set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow))
+
+            if runtimeContext.update_workflow or runtimeContext.create_workflow:
+                # Now create a workflow record and exit.
+                uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
+                                            runtimeContext.project_uuid, runtimeContext.update_workflow)
                 self.stdout.write(uuid + "\n")
                 return (None, "success")
 
+            loadingContext.loader.idx["_:main"] = workflow_wrapper
+
+            # Reload just the wrapper workflow.
+            self.fast_submit = True
+            tool = load_tool(workflow_wrapper, loadingContext)
+
+
         self.apply_reqs(job_order, tool)
 
         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
index 5c09e671fa21eac1952c417e10580d332e3612be..716cda335d1c6d49fcc2d4fe24149b00fde7ff3d 100644 (file)
@@ -17,7 +17,7 @@ import logging
 import threading
 from collections import OrderedDict
 
-import ruamel.yaml as yaml
+import ruamel.yaml
 
 import cwltool.stdfsaccess
 from cwltool.pathmapper import abspath
@@ -235,7 +235,8 @@ class CollectionFetcher(DefaultFetcher):
                 return f.read()
         if url.startswith("arvwf:"):
             record = self.api_client.workflows().get(uuid=url[6:]).execute(num_retries=self.num_retries)
-            definition = yaml.round_trip_load(record["definition"])
+            yaml = ruamel.yaml.YAML(typ='rt', pure=True)
+            definition = yaml.load(record["definition"])
             definition["label"] = record["name"]
             return yaml.round_trip_dump(definition)
         return super(CollectionFetcher, self).fetch_text(url)
index dc6d0df3f1f6a4b1ac025d0003ba70595571c769..c2c992d4401db5a7531799ae99f7728098acec34 100644 (file)
@@ -447,17 +447,15 @@ def upload_dependencies(arvrunner, name, document_loader,
                                single_collection=True,
                                optional_deps=optional_deps)
 
-    print("MMM", mapper._pathmap)
-
     keeprefs = set()
     def addkeepref(k):
         if k.startswith("keep:"):
             keeprefs.add(collection_pdh_pattern.match(k).group(1))
 
-    def setloc(p):
+
+    def collectloc(p):
         loc = p.get("location")
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
-            p["location"] = mapper.mapper(p["location"]).resolved
             addkeepref(p["location"])
             return
 
@@ -488,12 +486,10 @@ def upload_dependencies(arvrunner, name, document_loader,
         if uuid not in uuid_map:
             raise SourceLine(p, "location", validate.ValidationException).makeError(
                 "Collection uuid %s not found" % uuid)
-        p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
-        p[collectionUUID] = uuid
 
-    #with Perf(metrics, "setloc"):
-    #    visit_class(workflowobj, ("File", "Directory"), setloc)
-    #    visit_class(discovered, ("File", "Directory"), setloc)
+    with Perf(metrics, "collectloc"):
+        visit_class(workflowobj, ("File", "Directory"), collectloc)
+        visit_class(discovered, ("File", "Directory"), collectloc)
 
     if discovered_secondaryfiles is not None:
         for d in discovered:
@@ -518,7 +514,6 @@ def upload_dependencies(arvrunner, name, document_loader,
                 continue
             col = col["items"][0]
             col["name"] = arvados.util.trim_name(col["name"])
-            print("CCC name", col["name"])
             try:
                 arvrunner.api.collections().create(body={"collection": {
                     "owner_uuid": runtimeContext.project_uuid,
@@ -553,20 +548,10 @@ def upload_docker(arvrunner, tool, runtimeContext):
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
 
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
-                                                       runtimeContext.project_uuid,
-                                                       runtimeContext.force_docker_pull,
-                                                       runtimeContext.tmp_outdir_prefix,
-                                                       runtimeContext.match_local_docker,
-                                                       runtimeContext.copy_deps)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
-                                                       True,
-                                                       runtimeContext.project_uuid,
-                                                       runtimeContext.force_docker_pull,
-                                                       runtimeContext.tmp_outdir_prefix,
-                                                       runtimeContext.match_local_docker,
-                                                       runtimeContext.copy_deps)
+                                                       True, runtimeContext)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
@@ -630,6 +615,45 @@ def tag_git_version(packed):
         else:
             packed["http://schema.org/version"] = githash
 
+def setloc(mapper, p):
+    loc = p.get("location")
+    if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
+        p["location"] = mapper.mapper(p["location"]).resolved
+        return
+
+    if not loc:
+        return
+
+    if collectionUUID in p:
+        uuid = p[collectionUUID]
+        if uuid not in uuid_map:
+            raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+                "Collection uuid %s not found" % uuid)
+        gp = collection_pdh_pattern.match(loc)
+        if gp and uuid_map[uuid] != gp.groups()[0]:
+            # This file entry has both collectionUUID and a PDH
+            # location. If the PDH doesn't match the one returned
+            # the API server, raise an error.
+            raise SourceLine(p, "location", validate.ValidationException).makeError(
+                "Expected collection uuid %s to be %s but API server reported %s" % (
+                    uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
+    gp = collection_uuid_pattern.match(loc)
+    if not gp:
+        # Not a uuid pattern (must be a pdh pattern)
+        return
+
+    uuid = gp.groups()[0]
+    if uuid not in uuid_map:
+        raise SourceLine(p, "location", validate.ValidationException).makeError(
+            "Collection uuid %s not found" % uuid)
+    p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+    p[collectionUUID] = uuid
+
+
+def update_from_mapper(workflowobj, mapper):
+    with Perf(metrics, "setloc"):
+        visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
 
 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
@@ -680,6 +704,8 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     if "job_order" in job_order:
         del job_order["job_order"]
 
+    update_from_mapper(job_order, jobmapper)
+
     return job_order
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
@@ -720,7 +746,6 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
                                      discovered_secondaryfiles=discovered_secondaryfiles,
                                      cache=tool_dep_cache)
 
-        print("PM", pm.items())
         document_loader.idx[deptool["id"]] = deptool
         toolmap = {}
         for k,v in pm.items():
@@ -734,12 +759,7 @@ def arvados_jobs_image(arvrunner, img, runtimeContext):
 
     try:
         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
-                                                          True,
-                                                          runtimeContext.project_uuid,
-                                                          runtimeContext.force_docker_pull,
-                                                          runtimeContext.tmp_outdir_prefix,
-                                                          runtimeContext.match_local_docker,
-                                                          runtimeContext.copy_deps)
+                                                          True, runtimeContext)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
@@ -783,8 +803,8 @@ class Runner(Process):
                  collection_cache_is_default=True,
                  git_info=None):
 
-        loadingContext = loadingContext.copy()
-        loadingContext.metadata = updated_tool.metadata.copy()
+        self.loadingContext = loadingContext.copy()
+        self.loadingContext.metadata = updated_tool.metadata.copy()
 
         super(Runner, self).__init__(updated_tool.tool, loadingContext)
 
@@ -809,9 +829,9 @@ class Runner(Process):
         self.intermediate_output_ttl = intermediate_output_ttl
         self.priority = priority
         self.secret_store = secret_store
-        self.enable_dev = loadingContext.enable_dev
+        self.enable_dev = self.loadingContext.enable_dev
         self.git_info = git_info
-        self.fast_parser = loadingContext.fast_parser
+        self.fast_parser = self.loadingContext.fast_parser
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
index bad4d44087239e8e28330977968308ee5a465f46..5820b56b5771460fee86075d9a6bd256b0e85f5f 100644 (file)
@@ -8,7 +8,7 @@ from builtins import object
 import arvados_cwl
 import arvados_cwl.context
 import arvados_cwl.util
-from arvados_cwl.arvdocker import arv_docker_clear_cache
+#from arvados_cwl.arvdocker import arv_docker_clear_cache
 import copy
 import arvados.config
 import logging
@@ -61,7 +61,7 @@ class TestContainer(unittest.TestCase):
 
     def setUp(self):
         cwltool.process._names = set()
-        arv_docker_clear_cache()
+        #arv_docker_clear_cache()
 
     def tearDown(self):
         root_logger = logging.getLogger('')
@@ -128,7 +128,7 @@ class TestContainer(unittest.TestCase):
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
     def test_run(self, keepdocker):
         for enable_reuse in (True, False):
-            arv_docker_clear_cache()
+            #arv_docker_clear_cache()
 
             runner = mock.MagicMock()
             runner.ignore_docker_for_reuse = False
@@ -929,7 +929,7 @@ class TestContainer(unittest.TestCase):
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
     def test_setting_storage_class(self, keepdocker):
-        arv_docker_clear_cache()
+        #arv_docker_clear_cache()
 
         runner = mock.MagicMock()
         runner.ignore_docker_for_reuse = False
@@ -1005,7 +1005,7 @@ class TestContainer(unittest.TestCase):
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
     def test_setting_process_properties(self, keepdocker):
-        arv_docker_clear_cache()
+        #arv_docker_clear_cache()
 
         runner = mock.MagicMock()
         runner.ignore_docker_for_reuse = False
@@ -1101,7 +1101,7 @@ class TestContainer(unittest.TestCase):
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
     def test_cuda_requirement(self, keepdocker):
         arvados_cwl.add_arv_hints()
-        arv_docker_clear_cache()
+        #arv_docker_clear_cache()
 
         runner = mock.MagicMock()
         runner.ignore_docker_for_reuse = False
@@ -1206,7 +1206,7 @@ class TestContainer(unittest.TestCase):
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
     def test_match_local_docker(self, keepdocker, determine_image_id):
         arvados_cwl.add_arv_hints()
-        arv_docker_clear_cache()
+        #arv_docker_clear_cache()
 
         runner = mock.MagicMock()
         runner.ignore_docker_for_reuse = False
@@ -1280,7 +1280,7 @@ class TestContainer(unittest.TestCase):
             runner.api.container_requests().create.assert_called_with(
                 body=JsonDiffMatcher(container_request))
 
-        arv_docker_clear_cache()
+        #arv_docker_clear_cache()
         runtimeContext.match_local_docker = True
         container_request['container_image'] = '99999999999999999999999999999993+99'
         container_request['name'] = 'test_run_True_2'
@@ -1298,7 +1298,7 @@ class TestContainer(unittest.TestCase):
         arvados_cwl.add_arv_hints()
         for enable_preemptible in (None, True, False):
             for preemptible_hint in (None, True, False):
-                arv_docker_clear_cache()
+                #arv_docker_clear_cache()
 
                 runner = mock.MagicMock()
                 runner.ignore_docker_for_reuse = False
@@ -1445,7 +1445,7 @@ class TestContainer(unittest.TestCase):
 class TestWorkflow(unittest.TestCase):
     def setUp(self):
         cwltool.process._names = set()
-        arv_docker_clear_cache()
+        #arv_docker_clear_cache()
 
     def helper(self, runner, enable_reuse=True):
         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
index dcbee726b6ce4962692d8255d7be2b41b76c5f09..4fbca5c05293ae0e5a9c154af55f1b43c8745533 100644 (file)
@@ -42,7 +42,7 @@ import arvados.keep
 from .matcher import JsonDiffMatcher, StripYAMLComments
 from .mock_discovery import get_rootDesc
 
-import ruamel.yaml as yaml
+import ruamel.yaml
 
 _rootDesc = None
 
@@ -179,12 +179,6 @@ def stubs(wfdetails=('submit_wf.cwl', None)):
             stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
             stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
 
-            stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
-            stubs.api.jobs().create().execute.return_value = {
-                "uuid": stubs.expect_job_uuid,
-                "state": "Queued",
-            }
-
             stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz"
             stubs.api.container_requests().create().execute.return_value = {
                 "uuid": stubs.expect_container_request_uuid,
@@ -192,96 +186,11 @@ def stubs(wfdetails=('submit_wf.cwl', None)):
                 "state": "Queued"
             }
 
-            stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
-            stubs.api.pipeline_templates().create().execute.return_value = {
-                "uuid": stubs.expect_pipeline_template_uuid,
-            }
-            stubs.expect_job_spec = {
-                'runtime_constraints': {
-                    'docker_image': '999999999999999999999999999999d3+99',
-                    'min_ram_mb_per_node': 1024
-                },
-                'script_parameters': {
-                    'x': {
-                        'basename': 'blorp.txt',
-                        'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
-                        'class': 'File'
-                    },
-                    'y': {
-                        'basename': '99999999999999999999999999999998+99',
-                        'location': 'keep:99999999999999999999999999999998+99',
-                        'class': 'Directory'
-                    },
-                    'z': {
-                        'basename': 'anonymous',
-                        "listing": [{
-                            "basename": "renamed.txt",
-                            "class": "File",
-                            "location": "keep:99999999999999999999999999999998+99/file1.txt",
-                            "size": 0
-                        }],
-                        'class': 'Directory'
-                    },
-                    'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main'
-                },
-                'repository': 'arvados',
-                'script_version': 'master',
-                'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
-                'script': 'cwl-runner'
-            }
-            stubs.pipeline_component = stubs.expect_job_spec.copy()
-            stubs.expect_pipeline_instance = {
-                'name': 'submit_wf.cwl',
-                'state': 'RunningOnServer',
-                'owner_uuid': None,
-                "components": {
-                    "cwl-runner": {
-                        'runtime_constraints': {'docker_image': '999999999999999999999999999999d3+99', 'min_ram_mb_per_node': 1024},
-                        'script_parameters': {
-                            'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
-                            'x': {"value": {
-                                'basename': 'blorp.txt',
-                                'class': 'File',
-                                'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
-                                "size": 16
-                            }},
-                            'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
-                                  'listing': [
-                                      {
-                                          'basename': 'renamed.txt',
-                                          'class': 'File', 'location':
-                                          'keep:99999999999999999999999999999998+99/file1.txt',
-                                          'size': 0
-                                      }
-                                  ]}},
-                            'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main',
-                            'arv:debug': True,
-                            'arv:enable_reuse': True,
-                            'arv:on_error': 'continue'
-                        },
-                        'repository': 'arvados',
-                        'script_version': 'master',
-                        'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
-                        'script': 'cwl-runner',
-                        'job': {'state': 'Queued', 'uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}
-                    }
-                }
-            }
-            stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
-            stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
-            stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
-            stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
-            stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
-                "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
-                "state": "Queued"
-            }
-            stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
-            stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
-
             cwd = os.getcwd()
-            filepath = os.path.join(cwd, "tests/wf/submit_wf_packed.cwl")
+            filepath = os.path.join(cwd, "tests/wf/submit_wf_wrapper.cwl")
             with open(filepath) as f:
-                expect_packed_workflow = yaml.round_trip_load(f)
+                yaml = ruamel.yaml.YAML(typ='rt', pure=True)
+                expect_packed_workflow = yaml.load(f)
 
             if wfpath is None:
                 wfpath = wfname
@@ -291,14 +200,14 @@ def stubs(wfdetails=('submit_wf.cwl', None)):
             mocktool = mock.NonCallableMock(tool=gitinfo_workflow["$graph"][0], metadata=gitinfo_workflow)
 
             stubs.git_info = arvados_cwl.executor.ArvCwlExecutor.get_git_info(mocktool)
-            expect_packed_workflow.update(stubs.git_info)
+            #expect_packed_workflow.update(stubs.git_info)
 
             stubs.git_props = {"arv:"+k.split("#", 1)[1]: v for k,v in stubs.git_info.items()}
 
-            if wfname == wfpath:
-                container_name = "%s (%s)" % (wfpath, stubs.git_props["arv:gitDescribe"])
-            else:
-                container_name = wfname
+            #if wfname == wfpath:
+            #    container_name = "%s (%s)" % (wfpath, stubs.git_props["arv:gitDescribe"])
+            #else:
+            container_name = wfname
 
             stubs.expect_container_spec = {
                 'priority': 500,
@@ -358,10 +267,11 @@ def stubs(wfdetails=('submit_wf.cwl', None)):
                     'vcpus': 1,
                     'ram': (1024+256)*1024*1024
                 },
+                'properties': {},
                 'use_existing': False,
-                'properties': stubs.git_props,
                 'secret_mounts': {}
             }
+            #'properties': stubs.git_props,
 
             stubs.expect_workflow_uuid = "zzzzz-7fd4e-zzzzzzzzzzzzzzz"
             stubs.api.workflows().create().execute.return_value = {
@@ -383,7 +293,7 @@ class TestSubmit(unittest.TestCase):
 
     def setUp(self):
         cwltool.process._names = set()
-        arvados_cwl.arvdocker.arv_docker_clear_cache()
+        #arvados_cwl.arvdocker.arv_docker_clear_cache()
 
     def tearDown(self):
         root_logger = logging.getLogger('')
@@ -405,7 +315,7 @@ class TestSubmit(unittest.TestCase):
     @stubs()
     def test_submit_container(self, stubs):
         exited = arvados_cwl.main(
-            ["--submit", "--no-wait", "--api=containers", "--debug",
+            ["--submit", "--no-wait", "--api=containers", "--debug", "--disable-git",
                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
             stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
 
@@ -414,7 +324,7 @@ class TestSubmit(unittest.TestCase):
                 'manifest_text':
                 '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
                 'replication_desired': None,
-                'name': 'submit_wf.cwl ('+ stubs.git_props["arv:gitDescribe"] +') input (169f39d466a5438ac4a90e779bf750c7+53)',
+                'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
             }), ensure_unique_name=False),
             mock.call(body=JsonDiffMatcher({
                 'manifest_text':
@@ -1104,7 +1014,7 @@ class TestSubmit(unittest.TestCase):
     @mock.patch("cwltool.docker.DockerCommandLineJob.get_image")
     @mock.patch("arvados.api")
     def test_arvados_jobs_image(self, api, get_image, find_one_image_hash):
-        arvados_cwl.arvdocker.arv_docker_clear_cache()
+        #arvados_cwl.arvdocker.arv_docker_clear_cache()
 
         arvrunner = mock.MagicMock()
         arvrunner.project_uuid = ""
@@ -1641,7 +1551,7 @@ class TestCreateWorkflow(unittest.TestCase):
 
     def setUp(self):
         cwltool.process._names = set()
-        arvados_cwl.arvdocker.arv_docker_clear_cache()
+        #arvados_cwl.arvdocker.arv_docker_clear_cache()
 
     def tearDown(self):
         root_logger = logging.getLogger('')