11557: Merge branch 'master' into 11557-acr-output-col-perms
authorLucas Di Pentima <lucas@curoverse.com>
Fri, 16 Jun 2017 16:34:13 +0000 (13:34 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Fri, 16 Jun 2017 16:34:13 +0000 (13:34 -0300)
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/done.py
sdk/cwl/tests/test_job.py

index 43082dfb854b4bf073ec5024ca306517d809d6e6..f7da563cd4aca1bd650bb19d35dcd28cdab08757 100644 (file)
@@ -449,8 +449,7 @@ class ArvCwlRunner(object):
                                       name=kwargs.get("name"),
                                       on_error=kwargs.get("on_error"),
                                       submit_runner_image=kwargs.get("submit_runner_image"))
-
-        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
+        elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
index 0bf91f24a0a9804ebc93683ca9b004cafd530e32..346d3e104a2f0c683981ff66dd41fb718dd2a3f4 100644 (file)
@@ -145,6 +145,14 @@ class ArvadosJob(object):
 
             if response["state"] == "Complete":
                 logger.info("%s reused job %s", self.arvrunner.label(self), response["uuid"])
+                # Give read permission to the desired project on reused jobs
+                if response["owner_uuid"] != self.arvrunner.project_uuid:
+                    self.arvrunner.api.links().create(body={
+                        'link_class': 'can_read',
+                        'tail_uuid': self.arvrunner.project_uuid,
+                        'head_uuid': response["uuid"],
+                        }).execute(num_retries=self.arvrunner.num_retries)
+
                 with Perf(metrics, "done %s" % self.name):
                     self.done(response)
             else:
index 69b074cc73ccb02d0b4c12e096075c12f2d02871..48466f00c242b04e3bd143f8b319d0004c10eb45 100644 (file)
@@ -3,39 +3,43 @@ from cwltool.errors import WorkflowException
 from collections import deque
 
 def done(self, record, tmpdir, outdir, keepdir):
-    colname = "Output %s of %s" % (record["output"][0:7], self.name)
+    cols = [
+        ("output", "Output %s of %s" % (record["output"][0:7], self.name), record["output"]),
+        ("log", "Log of %s" % (record["uuid"]), record["log"])
+    ]
 
-    # check if collection already exists with same owner, name and content
-    collection_exists = self.arvrunner.api.collections().list(
-        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
-                 ['portable_data_hash', '=', record["output"]],
-                 ["name", "=", colname]]
-    ).execute(num_retries=self.arvrunner.num_retries)
-
-    if not collection_exists["items"]:
-        # Create a collection located in the same project as the
-        # pipeline with the contents of the output.
-        # First, get output record.
-        collections = self.arvrunner.api.collections().list(
-            limit=1,
-            filters=[['portable_data_hash', '=', record["output"]]],
-            select=["manifest_text"]
+    for coltype, colname, colpdh in cols:
+        # check if collection already exists with same owner, name and content
+        collection_exists = self.arvrunner.api.collections().list(
+            filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                     ['portable_data_hash', '=', colpdh],
+                     ["name", "=", colname]]
         ).execute(num_retries=self.arvrunner.num_retries)
 
-        if not collections["items"]:
-            raise WorkflowException(
-                "[job %s] output '%s' cannot be found on API server" % (
-                    self.name, record["output"]))
+        if not collection_exists["items"]:
+            # Create a collection located in the same project as the
+            # pipeline with the contents of the output/log.
+            # First, get output/log record.
+            collections = self.arvrunner.api.collections().list(
+                limit=1,
+                filters=[['portable_data_hash', '=', colpdh]],
+                select=["manifest_text"]
+            ).execute(num_retries=self.arvrunner.num_retries)
+
+            if not collections["items"]:
+                raise WorkflowException(
+                    "[job %s] %s '%s' cannot be found on API server" % (
+                        self.name, coltype, colpdh))
 
-        # Create new collection in the parent project
-        # with the output contents.
-        self.arvrunner.api.collections().create(body={
-            "owner_uuid": self.arvrunner.project_uuid,
-            "name": colname,
-            "portable_data_hash": record["output"],
-            "manifest_text": collections["items"][0]["manifest_text"]
-        }, ensure_unique_name=True).execute(
-            num_retries=self.arvrunner.num_retries)
+            # Create new collection in the parent project
+            # with the output/log contents.
+            self.arvrunner.api.collections().create(body={
+                "owner_uuid": self.arvrunner.project_uuid,
+                "name": colname,
+                "portable_data_hash": colpdh,
+                "manifest_text": collections["items"][0]["manifest_text"]
+            }, ensure_unique_name=True).execute(
+                num_retries=self.arvrunner.num_retries)
 
     return done_outputs(self, record, tmpdir, outdir, keepdir)
 
index 99dd3cb669729f09d6c45d914178a5e56a57ffd6..5cd7f2a7b6706ccc5435083f9cd91902e2795547 100644 (file)
@@ -33,7 +33,15 @@ class TestJob(unittest.TestCase):
             document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
 
             list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
-            runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
+            runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
+            # Simulate reused job from another project so that we can check is a can_read
+            # link is added.
+            runner.api.jobs().create().execute.return_value = {
+                'state': 'Complete' if enable_reuse else 'Queued',
+                'owner_uuid': 'zzzzz-tpzed-yyyyyyyyyyyyyyy' if enable_reuse else 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                'uuid': 'zzzzz-819sb-yyyyyyyyyyyyyyy',
+                'output': None,
+            }
 
             tool = cmap({
                 "inputs": [],
@@ -75,6 +83,16 @@ class TestJob(unittest.TestCase):
                              ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
                              ['docker_image_locator', 'in docker', 'arvados/jobs']]
                 )
+                if enable_reuse:
+                    runner.api.links().create.assert_called_with(
+                        body=JsonDiffMatcher({
+                            'link_class': 'can_read',
+                            "tail_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+                            "head_uuid": "zzzzz-819sb-yyyyyyyyyyyyyyy",
+                        })
+                    )
+                else:
+                    assert not runner.api.links().create.called
 
     # The test passes some fields in builder.resources
     # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@@ -161,7 +179,9 @@ class TestJob(unittest.TestCase):
 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
         """)
         api.collections().list().execute.side_effect = ({"items": []},
-                                                        {"items": [{"manifest_text": "XYZ"}]})
+                                                        {"items": [{"manifest_text": "XYZ"}]},
+                                                        {"items": []},
+                                                        {"items": [{"manifest_text": "ABC"}]})
 
         arvjob = arvados_cwl.ArvadosJob(runner)
         arvjob.name = "testjob"
@@ -179,20 +199,37 @@ class TestJob(unittest.TestCase):
 
         api.collections().list.assert_has_calls([
             mock.call(),
+            # Output collection check
             mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
                           ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
                           ['name', '=', 'Output 9999999 of testjob']]),
             mock.call().execute(num_retries=0),
             mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
                  select=['manifest_text']),
+            mock.call().execute(num_retries=0),
+            # Log collection's turn
+            mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
+                          ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
+                          ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
+            mock.call().execute(num_retries=0),
+            mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999994+99']],
+                 select=['manifest_text']),
             mock.call().execute(num_retries=0)])
 
-        api.collections().create.assert_called_with(
-            ensure_unique_name=True,
-            body={'portable_data_hash': '99999999999999999999999999999993+99',
-                  'manifest_text': 'XYZ',
-                  'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
-                  'name': 'Output 9999999 of testjob'})
+        api.collections().create.assert_has_calls([
+            mock.call(ensure_unique_name=True,
+                      body={'portable_data_hash': '99999999999999999999999999999993+99',
+                            'manifest_text': 'XYZ',
+                            'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                            'name': 'Output 9999999 of testjob'}),
+            mock.call().execute(num_retries=0),
+            mock.call(ensure_unique_name=True,
+                      body={'portable_data_hash': '99999999999999999999999999999994+99',
+                            'manifest_text': 'ABC',
+                            'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                            'name': 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
+            mock.call().execute(num_retries=0),
+        ])
 
         arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
 
@@ -211,7 +248,10 @@ class TestJob(unittest.TestCase):
 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
         """)
 
-        api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
+        api.collections().list().execute.side_effect = (
+            {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
+            {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
+        )
 
         arvjob = arvados_cwl.ArvadosJob(runner)
         arvjob.name = "testjob"
@@ -229,10 +269,17 @@ class TestJob(unittest.TestCase):
 
         api.collections().list.assert_has_calls([
             mock.call(),
+            # Output collection
             mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
                                ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
                                ['name', '=', 'Output 9999999 of testjob']]),
-            mock.call().execute(num_retries=0)])
+            mock.call().execute(num_retries=0),
+            # Log collection
+            mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
+                               ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
+                               ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
+            mock.call().execute(num_retries=0)
+        ])
 
         self.assertFalse(api.collections().create.called)