10812: Use packed workflows for all run modes.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 23 Jan 2017 15:31:08 +0000 (10:31 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 23 Jan 2017 15:31:08 +0000 (10:31 -0500)
Conflicts:
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/runner.py

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/wf/expect_arvworkflow.cwl
sdk/cwl/tests/wf/expect_packed.cwl
sdk/cwl/tests/wf/submit_wf_packed.cwl [new file with mode: 0644]

index b766cec30c4b7c1258e8277016c48dac6dd7288f..02eca644d30eee769d38cfe083ae513f4b5f49e8 100644 (file)
@@ -28,7 +28,7 @@ from arvados.errors import ApiError
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_instance
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
@@ -177,7 +177,7 @@ class ArvCwlRunner(object):
 
                 for p in proc_states["items"]:
                     self.on_message({
-                        "object_uuid": p["uuid"],
+                       "object_uuid": p["uuid"],
                         "event_type": "update",
                         "properties": {
                             "new_attributes": p
@@ -334,23 +334,44 @@ class ArvCwlRunner(object):
                                                                  keep_client=self.keep_client)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
+        if not kwargs.get("name"):
+            kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+
+        # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
+        # Also uploads docker images.
+        upload_workflow_deps(self, tool)
+
+        # Reload tool object which may have been updated by
+        # upload_workflow_deps
+        tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
+                                  makeTool=self.arv_make_tool,
+                                  loader=tool.doc_loader,
+                                  avsc_names=tool.doc_schema,
+                                  metadata=tool.metadata)
+
+        # Upload local file references in the job order.
+        job_order = upload_job_order(self, "%s input" % kwargs["name"],
+                                     tool, job_order)
+
         existing_uuid = kwargs.get("update_workflow")
         if existing_uuid or kwargs.get("create_workflow"):
+            # Create a pipeline template or workflow record and exit.
             if self.work_api == "jobs":
                 tmpl = RunnerTemplate(self, tool, job_order,
                                       kwargs.get("enable_reuse"),
                                       uuid=existing_uuid,
                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                      name=kwargs.get("name"))
+                                      name=kwargs["name"])
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
                 return (tmpl.uuid, "success")
-            else:
+            elif self.work_api == "containers":
                 return (upload_workflow(self, tool, job_order,
-                                       self.project_uuid,
-                                       uuid=existing_uuid,
-                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                        name=kwargs.get("name")), "success")
+                                        self.project_uuid,
+                                        uuid=existing_uuid,
+                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                        name=kwargs["name"]),
+                        "success")
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
@@ -360,9 +381,6 @@ class ArvCwlRunner(object):
         kwargs["tmpdir_prefix"] = "tmp"
         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
 
-        if not kwargs["name"]:
-            del kwargs["name"]
-
         if self.work_api == "containers":
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
@@ -373,13 +391,18 @@ class ArvCwlRunner(object):
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
-        upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
-
         runnerjob = None
         if kwargs.get("submit"):
+            # Submit a runner job to run the workflow for us.
             if self.work_api == "containers":
                 if tool.tool["class"] == "CommandLineTool":
                     kwargs["runnerjob"] = tool.tool["id"]
+                    upload_dependencies(self,
+                                        kwargs["name"],
+                                        tool.doc_loader,
+                                        tool.tool,
+                                        tool.tool["id"],
+                                        False)
                     runnerjob = tool.job(job_order,
                                          self.output_callback,
                                          **kwargs).next()
@@ -400,7 +423,7 @@ class ArvCwlRunner(object):
                                       on_error=kwargs.get("on_error"),
                                       submit_runner_image=kwargs.get("submit_runner_image"))
 
-        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
+        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
index a3a26aae0a4bf6ab180429cad98193f47f3a0113..14513c7c32a70dfbc75bb14873c1a3f6d19790b4 100644 (file)
@@ -6,14 +6,14 @@ import ruamel.yaml as yaml
 
 from cwltool.errors import WorkflowException
 from cwltool.process import get_feature, UnsupportedRequirement, shortname
-from cwltool.pathmapper import adjustFiles
+from cwltool.pathmapper import adjustFiles, adjustDirObjs
 from cwltool.utils import aslist
 
 import arvados.collection
 
 from .arvdocker import arv_docker_get_image
 from . import done
-from .runner import Runner, arvados_jobs_image
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
 from .fsaccess import CollectionFetcher
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -190,7 +190,9 @@ class RunnerContainer(Runner):
         the +body+ argument to container_requests().create().
         """
 
-        workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+        packed = packed_workflow(self.arvrunner, self.tool)
+
+        adjustDirObjs(self.job_order, trim_listing)
 
         container_req = {
             "owner_uuid": self.arvrunner.project_uuid,
@@ -222,27 +224,13 @@ class RunnerContainer(Runner):
             "properties": {}
         }
 
-        workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
-        if workflowcollection.startswith("keep:"):
-            workflowcollection = workflowcollection[5:workflowcollection.index('/')]
-            workflowname = os.path.basename(self.tool.tool["id"])
-            workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
-            container_req["mounts"]["/var/lib/cwl/workflow"] = {
-                "kind": "collection",
-                "portable_data_hash": "%s" % workflowcollection
-                }
-        elif workflowcollection.startswith("arvwf:"):
-            workflowpath = "/var/lib/cwl/workflow.json#main"
-            wfuuid = workflowcollection[6:workflowcollection.index("#")]
-            wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
-            wfobj = yaml.safe_load(wfrecord["definition"])
-            if container_req["name"].startswith("arvwf:"):
-                container_req["name"] = wfrecord["name"]
-            container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
-                "kind": "json",
-                "json": wfobj
-            }
-            container_req["properties"]["template_uuid"] = wfuuid
+        workflowpath = "/var/lib/cwl/workflow.json#main"
+        container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+            "kind": "json",
+            "json": packed
+        }
+        if self.tool.tool.get("id", "").startswith("arvwf:"):
+            container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
 
         command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
         if self.output_name:
index 4bbbda26fb692f40551cce7f88a4396d7a5871ae..a4cd5cd3f8d79491e66f7e6a4aecfe9b852268b9 100644 (file)
@@ -9,11 +9,14 @@ from cwltool.errors import WorkflowException
 from cwltool.draft2tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
+from cwltool.pathmapper import adjustDirObjs
+
+import ruamel.yaml as yaml
 
 import arvados.collection
 
 from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
 from .pathmapper import InitialWorkDirPathMapper
 from .perf import Perf
 from . import done
@@ -231,6 +234,28 @@ class ArvadosJob(object):
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
 
+    def upload_workflow_collection(self, packed):
+        collection = arvados.collection.Collection(api_client=self.arvrunner.api,
+                                                   keep_client=self.arvrunner.keep_client,
+                                                   num_retries=self.arvrunner.num_retries)
+        with collection.open("workflow.cwl", "w") as f:
+            f.write(yaml.round_trip_dump(packed))
+
+        exists = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                                                 ["portable_data_hash", "=", collection.portable_data_hash()],
+                                                 ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
+
+        if exists["items"]:
+            logger.info("Using collection %s", exists["items"][0]["uuid"])
+        else:
+            collection.save_new(name=self.name,
+                                owner_uuid=self.arvrunner.project_uuid,
+                                ensure_unique_name=True,
+                                num_retries=self.arvrunner.num_retries)
+            logger.info("Uploaded to %s", collection.manifest_locator())
+
+        return collection.portable_data_hash()
+
     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
         """Create an Arvados job specification for this workflow.
 
@@ -239,9 +264,14 @@ class RunnerJob(Runner):
         a pipeline template or pipeline instance.
         """
 
-        workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+        if self.tool.tool["id"].startswith("keep:"):
+            pass
+        else:
+            packed = packed_workflow(self.arvrunner, self.tool)
+            wf_pdh = self.upload_workflow_collection(packed)
+            self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
 
-        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
+        adjustDirObjs(self.job_order, trim_listing)
 
         if self.output_name:
             self.job_order["arv:output_name"] = self.output_name
index 9e70a6e66f2b36f5393cd2f32039a38ec2489dd4..cd7e41a906b8fdacdea740a26d188a933348d9d9 100644 (file)
@@ -13,7 +13,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 
 import ruamel.yaml as yaml
 
-from .runner import upload_docker, upload_dependencies, trim_listing
+from .runner import upload_dependencies, trim_listing, packed_workflow
 from .arvtool import ArvadosCommandTool
 from .perf import Perf
 
@@ -22,11 +22,8 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
 def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
                     submit_runner_ram=0, name=None):
-    upload_docker(arvRunner, tool)
 
-    document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
-
-    packed = pack(document_loader, workflowobj, uri, tool.metadata)
+    packed = packed_workflow(arvRunner, tool)
 
     adjustDirObjs(job_order, trim_listing)
 
@@ -39,8 +36,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
     if not name:
         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
 
-    upload_dependencies(arvRunner, name, document_loader,
-                        packed, uri, False)
+    upload_dependencies(arvRunner, name, tool.doc_loader,
+                        packed, tool.tool["id"], False)
 
     # TODO nowhere for submit_runner_ram to go.
 
index 500ea0f4203793fac7fdf418f79b0ae562f64334..b249da77dd3e1ac4bda297fcab4fdbbd76d0e231 100644 (file)
@@ -139,7 +139,9 @@ class CollectionFetcher(DefaultFetcher):
             with self.fsaccess.open(url, "r") as f:
                 return f.read()
         if url.startswith("arvwf:"):
-            return self.api_client.workflows().get(uuid=url[6:]).execute()["definition"]
+            record = self.api_client.workflows().get(uuid=url[6:]).execute()
+            definition = record["definition"] + ('\nlabel: "%s"\n' % record["name"].replace('"', '\\"'))
+            return definition
         return super(CollectionFetcher, self).fetch_text(url)
 
     def check_exists(self, url):
index 09ca63b1a1b6fce6b00b9d68b073cfc14cc83e33..ddeac3ab0691e0d33b05b2ce774a2805a8a13deb 100644 (file)
@@ -4,6 +4,8 @@ from functools import partial
 import logging
 import json
 import re
+import subprocess
+
 from cStringIO import StringIO
 
 from schema_salad.sourceline import SourceLine
@@ -16,6 +18,7 @@ from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 from cwltool.utils import aslist
 from cwltool.builder import substitute
+from cwltool.pack import pack
 
 import arvados.collection
 import ruamel.yaml as yaml
@@ -45,7 +48,7 @@ def trim_listing(obj):
         del obj["location"]
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, loadref_run):
+                        workflowobj, uri, loadref_run, include_primary=True):
     """Upload the dependencies of the workflowobj document to Keep.
 
     Returns a pathmapper object mapping local paths to keep references.  Also
@@ -92,7 +95,7 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     normalizeFilesDirs(sc)
 
-    if "id" in workflowobj:
+    if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
     mapper = ArvPathMapper(arvrunner, sc, "",
@@ -110,6 +113,7 @@ def upload_dependencies(arvrunner, name, document_loader,
 
 
 def upload_docker(arvrunner, tool):
+    """Visitor which uploads Docker images referenced in CommandLineTool objects."""
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
         if docker_req:
@@ -118,43 +122,78 @@ def upload_docker(arvrunner, tool):
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
-    elif isinstance(tool, cwltool.workflow.Workflow):
-        for s in tool.steps:
-            upload_docker(arvrunner, s.embedded_tool)
-
-def upload_instance(arvrunner, name, tool, job_order):
-        upload_docker(arvrunner, tool)
-
-        for t in tool.tool["inputs"]:
-            def setSecondary(fileobj):
-                if isinstance(fileobj, dict) and fileobj.get("class") == "File":
-                    if "secondaryFiles" not in fileobj:
-                        fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
-
-                if isinstance(fileobj, list):
-                    for e in fileobj:
-                        setSecondary(e)
-
-            if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
-                setSecondary(job_order[shortname(t["id"])])
-
-        workflowmapper = upload_dependencies(arvrunner,
-                                             name,
-                                             tool.doc_loader,
-                                             tool.tool,
-                                             tool.tool["id"],
-                                             True)
-        jobmapper = upload_dependencies(arvrunner,
-                                        os.path.basename(job_order.get("id", "#")),
-                                        tool.doc_loader,
-                                        job_order,
-                                        job_order.get("id", "#"),
-                                        False)
-
-        if "id" in job_order:
-            del job_order["id"]
-
-        return workflowmapper
+
+def packed_workflow(arvrunner, tool):
+    """Create a packed workflow.
+
+    A "packed" workflow is one where all the components have been combined into a single document."""
+
+    return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
+                tool.tool["id"], tool.metadata)
+
+def tag_git_version(packed):
+    if tool.tool["id"].startswith("file://"):
+        path = os.path.dirname(tool.tool["id"][7:])
+        try:
+            githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
+        except (OSError, subprocess.CalledProcessError):
+            pass
+        else:
+            packed["http://schema.org/version"] = githash
+
+
+def upload_job_order(arvrunner, name, tool, job_order):
+    """Upload local files referenced in the input object and return updated input
+    object with 'location' updated to the proper keep references.
+    """
+
+    for t in tool.tool["inputs"]:
+        def setSecondary(fileobj):
+            if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+                if "secondaryFiles" not in fileobj:
+                    fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+            if isinstance(fileobj, list):
+                for e in fileobj:
+                    setSecondary(e)
+
+        if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+            setSecondary(job_order[shortname(t["id"])])
+
+    jobmapper = upload_dependencies(arvrunner,
+                                    name,
+                                    tool.doc_loader,
+                                    job_order,
+                                    job_order.get("id", "#"),
+                                    False)
+
+    if "id" in job_order:
+        del job_order["id"]
+
+    # Need to filter this out, gets added by cwltool when providing
+    # parameters on the command line.
+    if "job_order" in job_order:
+        del job_order["job_order"]
+
+    return job_order
+
+def upload_workflow_deps(arvrunner, tool):
+    # Ensure that Docker images needed by this workflow are available
+    tool.visit(partial(upload_docker, arvrunner))
+
+    document_loader = tool.doc_loader
+
+    def upload_tool_deps(deptool):
+        upload_dependencies(arvrunner,
+                            "%s dependencies" % (shortname(deptool["id"])),
+                            document_loader,
+                            deptool,
+                            deptool["id"],
+                            False,
+                            include_primary=False)
+        document_loader.idx[deptool["id"]] = deptool
+
+    tool.visit(upload_tool_deps)
 
 def arvados_jobs_image(arvrunner, img):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
@@ -166,6 +205,9 @@ def arvados_jobs_image(arvrunner, img):
     return img
 
 class Runner(object):
+    """Base class for runner processes, which submit an instance of
+    arvados-cwl-runner and wait for the final result."""
+
     def __init__(self, runner, tool, job_order, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None):
@@ -193,20 +235,9 @@ class Runner(object):
     def update_pipeline_component(self, record):
         pass
 
-    def arvados_job_spec(self, *args, **kwargs):
-        if self.name is None:
-            self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
-
-        # Need to filter this out, gets added by cwltool when providing
-        # parameters on the command line.
-        if "job_order" in self.job_order:
-            del self.job_order["job_order"]
-
-        workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
-        adjustDirObjs(self.job_order, trim_listing)
-        return workflowmapper
-
     def done(self, record):
+        """Base method for handling a completed runner."""
+
         try:
             if record["state"] == "Complete":
                 if record.get("exit_code") is not None:
index 042272efff63dc4a522c5b4835a6934020fcc435..9e918fc9a13da1d4ac2bdb4f06eeeccecd93a3d2 100644 (file)
@@ -17,6 +17,8 @@ import arvados.keep
 from .matcher import JsonDiffMatcher
 from .mock_discovery import get_rootDesc
 
+import ruamel.yaml as yaml
+
 _rootDesc = None
 
 def stubs(func):
@@ -104,7 +106,7 @@ def stubs(func):
             'script_parameters': {
                 'x': {
                     'basename': 'blorp.txt',
-                    'location': 'keep:99999999999999999999999999999994+99/blorp.txt',
+                    'location': 'keep:99999999999999999999999999999992+99/blorp.txt',
                     'class': 'File'
                 },
                 'y': {
@@ -122,7 +124,7 @@ def stubs(func):
                     'class': 'Directory'
                 },
                 'cwl:tool':
-                '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+                'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl'
             },
             'repository': 'arvados',
             'script_version': 'master',
@@ -139,12 +141,12 @@ def stubs(func):
                     'runtime_constraints': {'docker_image': 'arvados/jobs:'+arvados_cwl.__version__, 'min_ram_mb_per_node': 1024},
                     'script_parameters': {
                         'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
-                        'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'}},
+                        'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999992+99/blorp.txt'}},
                         'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
                               'listing': [
                                   {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
                               ]}},
-                        'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl',
+                        'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl',
                         'arv:enable_reuse': True,
                         'arv:on_error': 'continue'
                     },
@@ -167,6 +169,9 @@ def stubs(func):
         stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
         stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
 
+        with open("tests/wf/submit_wf_packed.cwl") as f:
+            expect_packed_workflow = yaml.round_trip_load(f)
+
         stubs.expect_container_spec = {
             'priority': 1,
             'mounts': {
@@ -174,9 +179,9 @@ def stubs(func):
                     'writable': True,
                     'kind': 'collection'
                 },
-                '/var/lib/cwl/workflow': {
-                    'portable_data_hash': '99999999999999999999999999999991+99',
-                    'kind': 'collection'
+                '/var/lib/cwl/workflow.json': {
+                    'json': expect_packed_workflow,
+                    'kind': 'json'
                 },
                 'stdout': {
                     'path': '/var/spool/cwl/cwl.output.json',
@@ -186,7 +191,7 @@ def stubs(func):
                     'kind': 'json',
                     'content': {
                         'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'},
-                        'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999994+99/blorp.txt'},
+                        'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999992+99/blorp.txt'},
                         'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
                             {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
                         ]}
@@ -198,7 +203,7 @@ def stubs(func):
             'owner_uuid': None,
             'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
                         '--enable-reuse', '--on-error=continue',
-                        '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
+                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'name': 'submit_wf.cwl',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
             'output_path': '/var/spool/cwl',
@@ -240,32 +245,24 @@ class TestSubmit(unittest.TestCase):
 
         stubs.api.collections().create.assert_has_calls([
             mock.call(),
-            mock.call(body={
+            mock.call(body=JsonDiffMatcher({
                 'manifest_text':
-                './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
-                '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
-                'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
+                '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
                 'owner_uuid': None,
-                'name': 'submit_wf.cwl',
-            }, ensure_unique_name=True),
+                'name': 'submit_tool.cwl dependencies',
+            }), ensure_unique_name=True),
             mock.call().execute(),
-            mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
-                            '0:0:blub.txt 0:0:submit_tool.cwl\n',
-                            'replication_desired': None,
-                            'name': 'New collection'
-            }, ensure_unique_name=True),
-            mock.call().execute(num_retries=4),
-            mock.call(body={
+            mock.call(body=JsonDiffMatcher({
                 'manifest_text':
                 '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
                 'owner_uuid': None,
-                'name': '#',
-            }, ensure_unique_name=True),
+                'name': 'submit_wf.cwl input',
+            }), ensure_unique_name=True),
             mock.call().execute()])
 
         expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
         stubs.api.pipeline_instances().create.assert_called_with(
-            body=expect_pipeline)
+            body=JsonDiffMatcher(expect_pipeline))
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_pipeline_uuid + '\n')
 
@@ -426,27 +423,19 @@ class TestSubmit(unittest.TestCase):
 
         stubs.api.collections().create.assert_has_calls([
             mock.call(),
-            mock.call(body={
+            mock.call(body=JsonDiffMatcher({
                 'manifest_text':
-                './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
-                '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
-                'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
+                '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
                 'owner_uuid': None,
-                'name': 'submit_wf.cwl',
-            }, ensure_unique_name=True),
+                'name': 'submit_tool.cwl dependencies',
+            }), ensure_unique_name=True),
             mock.call().execute(),
-            mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
-                            '0:0:blub.txt 0:0:submit_tool.cwl\n',
-                            'name': 'New collection',
-                            'replication_desired': None,
-            }, ensure_unique_name=True),
-            mock.call().execute(num_retries=4),
-            mock.call(body={
+            mock.call(body=JsonDiffMatcher({
                 'manifest_text':
                 '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
                 'owner_uuid': None,
-                'name': '#',
-            }, ensure_unique_name=True),
+                'name': 'submit_wf.cwl input',
+            }), ensure_unique_name=True),
             mock.call().execute()])
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
@@ -469,7 +458,7 @@ class TestSubmit(unittest.TestCase):
 
         stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
                                                   '--disable-reuse', '--on-error=continue',
-                                                  '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+                                                  '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         stubs.api.container_requests().create.assert_called_with(
@@ -492,7 +481,7 @@ class TestSubmit(unittest.TestCase):
 
         stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
                                                   '--enable-reuse', '--on-error=stop',
-                                                  '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+                                                  '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         stubs.api.container_requests().create.assert_called_with(
@@ -516,7 +505,7 @@ class TestSubmit(unittest.TestCase):
 
         stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
                                                   "--output-name="+output_name, '--enable-reuse', '--on-error=continue',
-                                                  '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+                                                  '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         stubs.expect_container_spec["output_name"] = output_name
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
@@ -541,7 +530,7 @@ class TestSubmit(unittest.TestCase):
 
         stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
                                                   "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
-                                                  '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+                                                  '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         stubs.api.container_requests().create.assert_called_with(
@@ -669,6 +658,20 @@ class TestSubmit(unittest.TestCase):
                     'json': {
                         'cwlVersion': 'v1.0',
                         '$graph': [
+                            {
+                                'id': '#main',
+                                'inputs': [
+                                    {'type': 'string', 'id': '#main/x'}
+                                ],
+                                'steps': [
+                                    {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
+                                     'run': '#submit_tool.cwl',
+                                     'id': '#main/step1',
+                                     'out': []}
+                                ],
+                                'class': 'Workflow',
+                                'outputs': []
+                            },
                             {
                                 'inputs': [
                                     {
@@ -683,19 +686,6 @@ class TestSubmit(unittest.TestCase):
                                 'outputs': [],
                                 'baseCommand': 'cat',
                                 'class': 'CommandLineTool'
-                            }, {
-                                'id': '#main',
-                                'inputs': [
-                                    {'type': 'string', 'id': '#main/x'}
-                                ],
-                                'steps': [
-                                    {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
-                                     'run': '#submit_tool.cwl',
-                                     'id': '#main/step1',
-                                     'out': []}
-                                ],
-                                'class': 'Workflow',
-                                'outputs': []
                             }
                         ]
                     }
@@ -747,7 +737,7 @@ class TestSubmit(unittest.TestCase):
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         stubs.api.container_requests().create.assert_called_with(
-            body=expect_container)
+            body=JsonDiffMatcher(expect_container))
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_container_request_uuid + '\n')
 
@@ -844,7 +834,7 @@ class TestCreateTemplate(unittest.TestCase):
             'dataclass': 'File',
             'required': True,
             'type': 'File',
-            'value': '99999999999999999999999999999994+99/blorp.txt',
+            'value': '99999999999999999999999999999992+99/blorp.txt',
         }
         expect_component['script_parameters']['y'] = {
             'dataclass': 'Collection',
@@ -1167,7 +1157,7 @@ class TestTemplateInputs(unittest.TestCase):
         expect_template = copy.deepcopy(self.expect_template)
         params = expect_template[
             "components"]["inputs_test.cwl"]["script_parameters"]
-        params["fileInput"]["value"] = '99999999999999999999999999999994+99/blorp.txt'
+        params["fileInput"]["value"] = '99999999999999999999999999999992+99/blorp.txt'
         params["floatInput"]["value"] = 1.234
         params["boolInput"]["value"] = True
 
index 56ce0d54747ff08244df622fcbb9edee219cd8f5..4a3a0379a106c85fe9f50a670d6d157a42737708 100644 (file)
@@ -1,14 +1,5 @@
+cwlVersion: v1.0
 $graph:
-- baseCommand: cat
-  class: CommandLineTool
-  id: '#submit_tool.cwl'
-  inputs:
-  - id: '#submit_tool.cwl/x'
-    inputBinding: {position: 1}
-    type: string
-  outputs: []
-  requirements:
-  - {class: DockerRequirement, dockerPull: 'debian:8'}
 - class: Workflow
   id: '#main'
   inputs:
@@ -21,4 +12,13 @@ $graph:
     - {id: '#main/step1/x', source: '#main/x'}
     out: []
     run: '#submit_tool.cwl'
-cwlVersion: v1.0
+- baseCommand: cat
+  class: CommandLineTool
+  id: '#submit_tool.cwl'
+  inputs:
+  - id: '#submit_tool.cwl/x'
+    inputBinding: {position: 1}
+    type: string
+  outputs: []
+  requirements:
+  - {class: DockerRequirement, dockerPull: 'debian:8'}
index f4d60dbfe8cad258da5e6eb81a7ec3315d629979..d7b9d61a749d51db0aca57581ad1c51feca1969d 100644 (file)
@@ -9,7 +9,7 @@ $graph:
     type: File
     default:
       class: File
-      location: keep:99999999999999999999999999999991+99/tool/blub.txt
+      location: keep:99999999999999999999999999999991+99/blub.txt
     inputBinding:
       position: 1
   outputs: []
@@ -19,7 +19,7 @@ $graph:
   inputs:
   - id: '#main/x'
     type: File
-    default: {class: File, location: 'keep:99999999999999999999999999999991+99/input/blorp.txt',
+    default: {class: File, location: 'keep:99999999999999999999999999999992+99/blorp.txt',
       basename: blorp.txt}
   - id: '#main/y'
     type: Directory
diff --git a/sdk/cwl/tests/wf/submit_wf_packed.cwl b/sdk/cwl/tests/wf/submit_wf_packed.cwl
new file mode 100644 (file)
index 0000000..1f7fee0
--- /dev/null
@@ -0,0 +1,33 @@
+cwlVersion: v1.0
+$graph:
+- class: CommandLineTool
+  requirements:
+  - class: DockerRequirement
+    dockerPull: debian:8
+  inputs:
+  - id: '#submit_tool.cwl/x'
+    type: File
+    default:
+      class: File
+      location: keep:99999999999999999999999999999991+99/blub.txt
+    inputBinding:
+      position: 1
+  outputs: []
+  baseCommand: cat
+  id: '#submit_tool.cwl'
+- class: Workflow
+  inputs:
+  - id: '#main/x'
+    type: File
+  - id: '#main/y'
+    type: Directory
+  - id: '#main/z'
+    type: Directory
+  outputs: []
+  steps:
+  - id: '#main/step1'
+    in:
+    - {id: '#main/step1/x', source: '#main/x'}
+    out: []
+    run: '#submit_tool.cwl'
+  id: '#main'