if response["state"] == "Complete":
logger.info("%s reused job %s", self.arvrunner.label(self), response["uuid"])
+ # Give read permission to the desired project on reused jobs
+ for job_name, job_uuid in response.get('components', {}).items():
+ self.arvrunner.api.links().create(body={
+ 'link_class': 'can_read',
+ 'tail_uuid': self.arvrunner.project_uuid,
+ 'head_uuid': job_uuid,
+ }).execute(num_retries=self.arvrunner.num_retries)
+
with Perf(metrics, "done %s" % self.name):
self.done(response)
else:
find_or_create=self.enable_reuse
).execute(num_retries=self.arvrunner.num_retries)
- if self.enable_reuse:
- # When reusing jobs, copy its output/log collection to the desired project
- reused_collections = [('Output', job.get('output', None)),
- ('Log', job.get('log', None))]
- for col_type, pdh in [(n, p) for n, p in reused_collections if p]:
- c = arvados.collection.Collection(pdh,
- api_client=self.arvrunner.api,
- keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries)
- c.save_new(name="{} of {}".format(col_type, self.name),
- owner_uuid=self.arvrunner.project_uuid,
- ensure_unique_name=True,
- num_retries=self.arvrunner.num_retries)
- logger.info("Copied reused job's %s to collection %s",
- col_type.lower(),
- c.manifest_locator())
- # Give read permission to the desired project on reused jobs
- for job_name, job_uuid in job.get('components', {}).items():
- self.arvrunner.api.links().create(body={
- 'link_class': 'can_read',
- 'tail_uuid': self.arvrunner.project_uuid,
- 'head_uuid': job_uuid,
- }).execute(num_retries=self.arvrunner.num_retries)
-
for k,v in job_spec["script_parameters"].items():
if v is False or v is None or isinstance(v, dict):
job_spec["script_parameters"][k] = {"value": v}
from collections import deque
def done(self, record, tmpdir, outdir, keepdir):
- colname = "Output %s of %s" % (record["output"][0:7], self.name)
+ cols = [
+ ("output", "Output %s of %s" % (record["output"][0:7], self.name), record["output"]),
+ ("log", "Log of %s" % (record["uuid"]), record["log"])
+ ]
- # check if collection already exists with same owner, name and content
- collection_exists = self.arvrunner.api.collections().list(
- filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
- ['portable_data_hash', '=', record["output"]],
- ["name", "=", colname]]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collection_exists["items"]:
- # Create a collection located in the same project as the
- # pipeline with the contents of the output.
- # First, get output record.
- collections = self.arvrunner.api.collections().list(
- limit=1,
- filters=[['portable_data_hash', '=', record["output"]]],
- select=["manifest_text"]
+ for coltype, colname, colpdh in cols:
+ # check if collection already exists with same owner, name and content
+ collection_exists = self.arvrunner.api.collections().list(
+ filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ['portable_data_hash', '=', colpdh],
+ ["name", "=", colname]]
).execute(num_retries=self.arvrunner.num_retries)
- if not collections["items"]:
- raise WorkflowException(
- "[job %s] output '%s' cannot be found on API server" % (
- self.name, record["output"]))
+ if not collection_exists["items"]:
+ # Create a collection located in the same project as the
+ # pipeline with the contents of the output/log.
+ # First, get output/log record.
+ collections = self.arvrunner.api.collections().list(
+ limit=1,
+ filters=[['portable_data_hash', '=', colpdh]],
+ select=["manifest_text"]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ if not collections["items"]:
+ raise WorkflowException(
+ "[job %s] %s '%s' cannot be found on API server" % (
+ self.name, coltype, colpdh))
- # Create new collection in the parent project
- # with the output contents.
- self.arvrunner.api.collections().create(body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": colname,
- "portable_data_hash": record["output"],
- "manifest_text": collections["items"][0]["manifest_text"]
- }, ensure_unique_name=True).execute(
- num_retries=self.arvrunner.num_retries)
+ # Create new collection in the parent project
+ # with the output/log contents.
+ self.arvrunner.api.collections().create(body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": colname,
+ "portable_data_hash": colpdh,
+ "manifest_text": collections["items"][0]["manifest_text"]
+ }, ensure_unique_name=True).execute(
+ num_retries=self.arvrunner.num_retries)
return done_outputs(self, record, tmpdir, outdir, keepdir)
2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
""")
api.collections().list().execute.side_effect = ({"items": []},
- {"items": [{"manifest_text": "XYZ"}]})
+ {"items": [{"manifest_text": "XYZ"}]},
+ {"items": []},
+ {"items": [{"manifest_text": "ABC"}]})
arvjob = arvados_cwl.ArvadosJob(runner)
arvjob.name = "testjob"
api.collections().list.assert_has_calls([
mock.call(),
+ # Output collection check
mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
['portable_data_hash', '=', '99999999999999999999999999999993+99'],
['name', '=', 'Output 9999999 of testjob']]),
mock.call().execute(num_retries=0),
mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
select=['manifest_text']),
+ mock.call().execute(num_retries=0),
+ # Log collection's turn
+ mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
+ ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
+ ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
+ mock.call().execute(num_retries=0),
+ mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999994+99']],
+ select=['manifest_text']),
mock.call().execute(num_retries=0)])
- api.collections().create.assert_called_with(
- ensure_unique_name=True,
- body={'portable_data_hash': '99999999999999999999999999999993+99',
- 'manifest_text': 'XYZ',
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'name': 'Output 9999999 of testjob'})
+ api.collections().create.assert_has_calls([
+ mock.call(ensure_unique_name=True,
+ body={'portable_data_hash': '99999999999999999999999999999993+99',
+ 'manifest_text': 'XYZ',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'name': 'Output 9999999 of testjob'}),
+ mock.call().execute(num_retries=0),
+ mock.call(ensure_unique_name=True,
+ body={'portable_data_hash': '99999999999999999999999999999994+99',
+ 'manifest_text': 'ABC',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'name': 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
+ mock.call().execute(num_retries=0),
+ ])
arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
""")
- api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
+ api.collections().list().execute.side_effect = (
+ {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
+ {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
+ )
arvjob = arvados_cwl.ArvadosJob(runner)
arvjob.name = "testjob"
api.collections().list.assert_has_calls([
mock.call(),
+ # Output collection
mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
['portable_data_hash', '=', '99999999999999999999999999999993+99'],
['name', '=', 'Output 9999999 of testjob']]),
- mock.call().execute(num_retries=0)])
+ mock.call().execute(num_retries=0),
+ # Log collection
+ mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
+ ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
+ ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
+ mock.call().execute(num_retries=0)
+ ])
self.assertFalse(api.collections().create.called)