closes #10290
[arvados.git] / sdk / cwl / arvados_cwl / done.py
1 from cwltool.errors import WorkflowException
2
3 def done(self, record, tmpdir, outdir, keepdir):
4     colname = "Output %s of %s" % (record["output"][0:7], self.name)
5
6     # check if collection already exists with same owner, name and content
7     collection_exists = self.arvrunner.api.collections().list(
8         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
9                  ['portable_data_hash', '=', record["output"]],
10                  ["name", "=", colname]]
11     ).execute(num_retries=self.arvrunner.num_retries)
12
13     if not collection_exists["items"]:
14         # Create a collection located in the same project as the
15         # pipeline with the contents of the output.
16         # First, get output record.
17         collections = self.arvrunner.api.collections().list(
18             limit=1,
19             filters=[['portable_data_hash', '=', record["output"]]],
20             select=["manifest_text"]
21         ).execute(num_retries=self.arvrunner.num_retries)
22
23         if not collections["items"]:
24             raise WorkflowException(
25                 "Job output '%s' cannot be found on API server" % (
26                     record["output"]))
27
28         # Create new collection in the parent project
29         # with the output contents.
30         self.arvrunner.api.collections().create(body={
31             "owner_uuid": self.arvrunner.project_uuid,
32             "name": colname,
33             "portable_data_hash": record["output"],
34             "manifest_text": collections["items"][0]["manifest_text"]
35         }, ensure_unique_name=True).execute(
36             num_retries=self.arvrunner.num_retries)
37
38     self.builder.outdir = outdir
39     self.builder.pathmapper.keepdir = keepdir
40     return self.collect_outputs("keep:" + record["output"])