outdir = None
keepdir = None
for l in log:
+ # Determine the tmpdir, outdir and keepdir paths from
+ # the job run. Unfortunately, we can't take the first
+ # values we find (which are expected to be near the
+ # top) and stop scanning because if the node fails and
+ # the job restarts on a different node these values
+ # will different runs, and we need to know about the
+ # final run that actually produced output.
+
g = tmpdirre.match(l)
if g:
tmpdir = g.group(1)
if g:
keepdir = g.group(1)
- # It turns out if the job fails and restarts it can
- # come up on a different compute node, so we have to
- # read the log to the end to be sure instead of taking the
- # easy way out.
- #
- #if tmpdir and outdir and keepdir:
- # break
+ colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+ # check if collection already exists with same owner, name and content
+ collection_exists = self.arvrunner.api.collections().list(
+ filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ['portable_data_hash', '=', record["output"]],
+ ["name", "=", colname]]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ if not collection_exists["items"]:
+ # Create a collection located in the same project as the
+ # pipeline with the contents of the output.
+ # First, get output record.
+ collections = self.arvrunner.api.collections().list(
+ limit=1,
+ filters=[['portable_data_hash', '=', record["output"]]],
+ select=["manifest_text"]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ if not collections["items"]:
+ raise WorkflowException(
+ "Job output '%s' cannot be found on API server" % (
+ record["output"]))
+
+ # Create new collection in the parent project
+ # with the output contents.
+ self.arvrunner.api.collections().create(body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": colname,
+ "portable_data_hash": record["output"],
+ "manifest_text": collections["items"][0]["manifest_text"]
+ }, ensure_unique_name=True).execute(
+ num_retries=self.arvrunner.num_retries)
self.builder.outdir = outdir
self.builder.pathmapper.keepdir = keepdir
'task.env': {'TMPDIR': '$(task.tmpdir)'},
'command': ['ls']
}],
- 'crunchrunner': '83db29f08544e1c319572a6bd971088a+140/crunchrunner'
+ 'crunchrunner': arvados_cwl.crunchrunner_pdh + '/crunchrunner'
},
'script_version': 'master',
'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
'task.env': {'TMPDIR': '$(task.tmpdir)'},
'command': ['ls']
}],
- 'crunchrunner': '83db29f08544e1c319572a6bd971088a+140/crunchrunner'
+ 'crunchrunner': arvados_cwl.crunchrunner_pdh + '/crunchrunner'
},
'script_version': 'master',
'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
'min_scratch_mb_per_node': 5024 # tmpdirSize + outdirSize
}
}, find_or_create=True)
+
+ @mock.patch("arvados.collection.Collection")
+ def test_done(self, col):
+ api = mock.MagicMock()
+
+ runner = mock.MagicMock()
+ runner.api = api
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.num_retries = 0
+
+ col().open.return_value = []
+ api.collections().list().execute.side_effect = ({"items": []},
+ {"items": [{"manifest_text": "XYZ"}]})
+
+ arvjob = arvados_cwl.ArvadosJob(runner)
+ arvjob.name = "testjob"
+ arvjob.builder = mock.MagicMock()
+ arvjob.output_callback = mock.MagicMock()
+ arvjob.collect_outputs = mock.MagicMock()
+
+ arvjob.done({
+ "state": "Complete",
+ "output": "99999999999999999999999999999993+99",
+ "log": "99999999999999999999999999999994+99",
+ "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ })
+
+ api.collections().list.assert_has_calls([
+ mock.call(),
+ mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
+ ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
+ ['name', '=', 'Output 9999999 of testjob']]),
+ mock.call().execute(num_retries=0),
+ mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
+ select=['manifest_text']),
+ mock.call().execute(num_retries=0)])
+
+ api.collections().create.assert_called_with(
+ ensure_unique_name=True,
+ body={'portable_data_hash': '99999999999999999999999999999993+99',
+ 'manifest_text': 'XYZ',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'name': 'Output 9999999 of testjob'})
+
+ @mock.patch("arvados.collection.Collection")
+ def test_done_use_existing_collection(self, col):
+ api = mock.MagicMock()
+
+ runner = mock.MagicMock()
+ runner.api = api
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.num_retries = 0
+
+ col().open.return_value = []
+ api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
+
+ arvjob = arvados_cwl.ArvadosJob(runner)
+ arvjob.name = "testjob"
+ arvjob.builder = mock.MagicMock()
+ arvjob.output_callback = mock.MagicMock()
+ arvjob.collect_outputs = mock.MagicMock()
+
+ arvjob.done({
+ "state": "Complete",
+ "output": "99999999999999999999999999999993+99",
+ "log": "99999999999999999999999999999994+99",
+ "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ })
+
+ api.collections().list.assert_has_calls([
+ mock.call(),
+ mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
+ ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
+ ['name', '=', 'Output 9999999 of testjob']]),
+ mock.call().execute(num_retries=0)])
+
+ self.assertFalse(api.collections().create.called)