Merge master to output-tags branch and resolve conflict
authorJiayong Li <jiayong@math.mit.edu>
Tue, 15 Nov 2016 19:59:55 +0000 (14:59 -0500)
committerJiayong Li <jiayong@math.mit.edu>
Tue, 15 Nov 2016 19:59:55 +0000 (14:59 -0500)
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/test_make_output.py
sdk/cwl/tests/test_submit.py

index 92be92d6e0469fba63a1504a5f0c834fb4a9b2b7..b3d47dd8d05e5981ae4f645fa9c968ee7707e747 100644 (file)
@@ -49,7 +49,7 @@ class ArvCwlRunner(object):
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
+    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
         self.api = api_client
         self.processes = {}
         self.lock = threading.Lock()
@@ -64,6 +64,7 @@ class ArvCwlRunner(object):
         self.pipeline = None
         self.final_output_collection = None
         self.output_name = output_name
+        self.output_tags = output_tags
         self.project_uuid = None
 
         if keep_client is not None:
@@ -183,7 +184,7 @@ class ArvCwlRunner(object):
             for v in obj:
                 self.check_writable(v)
 
-    def make_output_collection(self, name, outputObj):
+    def make_output_collection(self, name, tagsString, outputObj):
         outputObj = copy.deepcopy(outputObj)
 
         files = []
@@ -248,6 +249,13 @@ class ArvCwlRunner(object):
                     final.api_response()["name"],
                     final.manifest_locator())
 
+        final_uuid = final.manifest_locator()
+        tags = tagsString.split(',')
+        for tag in tags:
+             self.api.links().create(body={
+                "head_uuid": final_uuid, "link_class": "tag", "name": tag
+                }).execute(num_retries=self.num_retries)
+
         def finalcollection(fileobj):
             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
 
@@ -325,9 +333,9 @@ class ArvCwlRunner(object):
                                          self.output_callback,
                                          **kwargs).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
             else:
-                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
 
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
             # Create pipeline for local run
@@ -410,7 +418,9 @@ class ArvCwlRunner(object):
         else:
             if self.output_name is None:
                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
-            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.final_output)
+            if self.output_tags is None:
+                self.output_tags = ""
+            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
             self.set_crunch_output()
 
         if self.final_status != "success":
@@ -468,6 +478,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
+    parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
@@ -525,7 +536,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
-        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
+        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
     except Exception as e:
         logger.error(e)
         return 1
index aa088c5e8a06fa00ec086483b9f628c79687965f..e7cd617baee8d063da303aac89a93f434234c551 100644 (file)
@@ -191,6 +191,9 @@ class RunnerContainer(Runner):
         if self.output_name:
             command.append("--output-name=" + self.output_name)
 
+        if self.output_tags:
+            command.append("--output-tags=" + self.output_tags)
+
         if self.enable_reuse:
             command.append("--enable-reuse")
         else:
index 8a62204f8fb9ec22298abec15529411ace70ed9e..4db23b98a961904675727a13c33bf91cd3aa1f55 100644 (file)
@@ -241,6 +241,9 @@ class RunnerJob(Runner):
         if self.output_name:
             self.job_order["arv:output_name"] = self.output_name
 
+        if self.output_tags:
+            self.job_order["arv:output_tags"] = self.output_tags
+
         self.job_order["arv:enable_reuse"] = self.enable_reuse
 
         return {
@@ -308,7 +311,8 @@ class RunnerTemplate(object):
             tool=tool,
             job_order=job_order,
             enable_reuse=enable_reuse,
-            output_name=None)
+            output_name=None,
+            output_tags=None)
 
     def pipeline_component_spec(self):
         """Return a component that Workbench and a-r-p-i will understand.
index 173eb93daf2c4070ba92f28fca2ac053952f1662..849b177aebbd7c4f5a507a8d0bb05ec915cf3b58 100644 (file)
@@ -63,17 +63,22 @@ def run():
         adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
 
         output_name = None
+        output_tags = None
         enable_reuse = True
         if "arv:output_name" in job_order_object:
             output_name = job_order_object["arv:output_name"]
             del job_order_object["arv:output_name"]
 
+        if "arv:output_tags" in job_order_object:
+            output_tags = job_order_object["arv:output_tags"]
+            del job_order_object["arv:output_tags"]
+
         if "arv:enable_reuse" in job_order_object:
             enable_reuse = job_order_object["arv:enable_reuse"]
             del job_order_object["arv:enable_reuse"]
 
         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
-                                          output_name=output_name)
+                                          output_name=output_name, output_tags=output_tags)
 
         t = load_tool(job_order_object, runner.arv_make_tool)
 
index a1142544f5bf2e16150d56dd4d0b707cfd4db984..5cc447e9a3bad9202d9e77fb53919dcc66b804c8 100644 (file)
@@ -161,7 +161,7 @@ def arvados_jobs_image(arvrunner):
     return img
 
 class Runner(object):
-    def __init__(self, runner, tool, job_order, enable_reuse, output_name):
+    def __init__(self, runner, tool, job_order, enable_reuse, output_name, output_tags):
         self.arvrunner = runner
         self.tool = tool
         self.job_order = job_order
@@ -170,6 +170,7 @@ class Runner(object):
         self.uuid = None
         self.final_output = None
         self.output_name = output_name
+        self.output_tags = output_tags
 
     def update_pipeline_component(self, record):
         pass
index 3228ad77b3ca9343c0d6ff736b0714c23acd060b..53f379f1a5ac0cc488af5157080e78933d543367 100644 (file)
@@ -27,12 +27,15 @@ class TestMakeOutput(unittest.TestCase):
         readermock = mock.MagicMock()
         reader.return_value = readermock
 
+        final_uuid = final.manifest_locator()
+        num_retries = runner.num_retries
+
         cwlout = StringIO.StringIO()
         openmock = mock.MagicMock()
         final.open.return_value = openmock
         openmock.__enter__.return_value = cwlout
 
-        _, runner.final_output_collection = runner.make_output_collection("Test output", {
+        _, runner.final_output_collection = runner.make_output_collection("Test output", "tag0,tag1,tag2", {
             "foo": {
                 "class": "File",
                 "location": "keep:99999999999999999999999999999991+99/foo.txt",
@@ -64,3 +67,7 @@ class TestMakeOutput(unittest.TestCase):
 }""", cwlout.getvalue())
 
         self.assertIs(final, runner.final_output_collection)
+        self.assertIs(final_uuid, runner.final_output_collection.manifest_locator())
+        self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag0"}), mock.call().execute(num_retries=num_retries)])
+        self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag1"}), mock.call().execute(num_retries=num_retries)])
+        self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag2"}), mock.call().execute(num_retries=num_retries)])
index c195b03916992561f5e52b1d970c3cabd778df30..085509fbb17b334fc24d8c3bcdad99fa1068c416 100644 (file)
@@ -269,6 +269,48 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_pipeline_uuid + '\n')
 
+    @mock.patch("time.sleep")
+    @stubs
+    def test_submit_output_name(self, stubs, tm):
+        output_name = "test_output_name"
+
+        capture_stdout = cStringIO.StringIO()
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--debug", "--output-name", output_name,
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
+
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_pipeline_uuid + '\n')
+
+    @mock.patch("time.sleep")
+    @stubs
+    def test_submit_output_tags(self, stubs, tm):
+        output_tags = "tag0,tag1,tag2"
+
+        capture_stdout = cStringIO.StringIO()
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--debug", "--output-tags", output_tags,
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
+
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_pipeline_uuid + '\n')
+
     @mock.patch("time.sleep")
     @stubs
     def test_submit_with_project_uuid(self, stubs, tm):
@@ -352,6 +394,52 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_container_request_uuid + '\n')
 
+    @stubs
+    def test_submit_container_output_name(self, stubs):
+        output_name = "test_output_name"
+
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--output-name", output_name,
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', "--output-name="+output_name, '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.container_requests().create.assert_called_with(
+            body=expect_container)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+    @stubs
+    def test_submit_container_output_tags(self, stubs):
+        output_tags = "tag0,tag1,tag2"
+
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--output-tags", output_tags,
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', "--output-tags="+output_tags, '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.container_requests().create.assert_called_with(
+            body=expect_container)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
     @mock.patch("arvados.commands.keepdocker.find_one_image_hash")
     @mock.patch("cwltool.docker.get_image")
     @mock.patch("arvados.api")