Add tests for intermediate collections
[arvados.git] / sdk / cwl / arvados_cwl / arvjob.py
index 737c9580fb8ceef04576168a15a53d991f9cf496..ea599ea8371caaba0cae30a8f97a010ff13f1425 100644 (file)
@@ -6,11 +6,12 @@ import logging
 import re
 import copy
 import json
+import datetime
 import time
 
 from cwltool.process import get_feature, shortname, UnsupportedRequirement
 from cwltool.errors import WorkflowException
-from cwltool.draft2tool import revmap_file, CommandLineTool
+from cwltool.command_line_tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
@@ -67,7 +68,11 @@ class ArvadosJob(object):
 
                 if vwd:
                     with Perf(metrics, "generatefiles.save_new %s" % self.name):
-                        vwd.save_new()
+                        info = self._get_intermediate_collection_info()
+                        vwd.save_new(name=info["name"], 
+                                     ensure_unique_name=True, 
+                                     trash_at=info["trash_at"], 
+                                     properties=info["properties"])
 
                 for f, p in generatemapper.items():
                     if p.type == "File":
@@ -134,6 +139,8 @@ class ArvadosJob(object):
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
 
+        self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
         try:
             with Perf(metrics, "create %s" % self.name):
                 response = self.arvrunner.api.jobs().create(
@@ -150,7 +157,8 @@ class ArvadosJob(object):
                     find_or_create=enable_reuse
                 ).execute(num_retries=self.arvrunner.num_retries)
 
-            self.arvrunner.processes[response["uuid"]] = self
+            self.uuid = response["uuid"]
+            self.arvrunner.process_submitted(self)
 
             self.update_pipeline_component(response)
 
@@ -171,9 +179,6 @@ class ArvadosJob(object):
                         logger.info("Creating read permission on job %s: %s",
                                     response["uuid"],
                                     e)
-
-                with Perf(metrics, "done %s" % self.name):
-                    self.done(response)
             else:
                 logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
         except Exception as e:
@@ -181,27 +186,28 @@ class ArvadosJob(object):
             self.output_callback({}, "permanentFail")
 
     def update_pipeline_component(self, record):
-        if self.arvrunner.pipeline:
-            self.arvrunner.pipeline["components"][self.name] = {"job": record}
-            with Perf(metrics, "update_pipeline_component %s" % self.name):
-                self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
-                    uuid=self.arvrunner.pipeline["uuid"],
-                    body={
-                        "components": self.arvrunner.pipeline["components"]
-                    }).execute(num_retries=self.arvrunner.num_retries)
-        if self.arvrunner.uuid:
-            try:
-                job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
-                if job:
-                    components = job["components"]
-                    components[self.name] = record["uuid"]
-                    self.arvrunner.api.jobs().update(
-                        uuid=self.arvrunner.uuid,
+        with self.arvrunner.workflow_eval_lock:
+            if self.arvrunner.pipeline:
+                self.arvrunner.pipeline["components"][self.name] = {"job": record}
+                with Perf(metrics, "update_pipeline_component %s" % self.name):
+                    self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
+                        uuid=self.arvrunner.pipeline["uuid"],
                         body={
-                            "components": components
+                            "components": self.arvrunner.pipeline["components"]
                         }).execute(num_retries=self.arvrunner.num_retries)
-            except Exception as e:
-                logger.info("Error adding to components: %s", e)
+            if self.arvrunner.uuid:
+                try:
+                    job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
+                    if job:
+                        components = job["components"]
+                        components[self.name] = record["uuid"]
+                        self.arvrunner.api.jobs().update(
+                            uuid=self.arvrunner.uuid,
+                            body={
+                                "components": components
+                            }).execute(num_retries=self.arvrunner.num_retries)
+                except Exception as e:
+                    logger.info("Error adding to components: %s", e)
 
     def done(self, record):
         try:
@@ -263,8 +269,27 @@ class ArvadosJob(object):
                 processStatus = "permanentFail"
         finally:
             self.output_callback(outputs, processStatus)
-            if record["uuid"] in self.arvrunner.processes:
-                del self.arvrunner.processes[record["uuid"]]
+
+    def _get_intermediate_collection_info(self):
+            trash_time = None 
+            if self.arvrunner.intermediate_output_ttl > 0: 
+                trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl) 
+
+            current_container_uuid = None 
+            try: 
+                current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries) 
+                current_container_uuid = current_container['uuid'] 
+            except ApiError as e: 
+                # Status code 404 just means we're not running in a container. 
+                if e.resp.status != 404: 
+                    logger.info("Getting current container: %s", e)
+            props = {"type": "Intermediate", 
+                          "container": current_container_uuid}
+
+            return {"name" : "Intermediate collection",
+                    "trash_at" : trash_time,
+                    "properties" : props}
+
 
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
@@ -280,7 +305,7 @@ class RunnerJob(Runner):
         if self.tool.tool["id"].startswith("keep:"):
             self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
         else:
-            packed = packed_workflow(self.arvrunner, self.tool)
+            packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
             wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
             self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
 
@@ -299,6 +324,9 @@ class RunnerJob(Runner):
         if self.on_error:
             self.job_order["arv:on_error"] = self.on_error
 
+        if kwargs.get("debug"):
+            self.job_order["arv:debug"] = True
+
         return {
             "script": "cwl-runner",
             "script_version": "master",
@@ -311,8 +339,8 @@ class RunnerJob(Runner):
             }
         }
 
-    def run(self, *args, **kwargs):
-        job_spec = self.arvados_job_spec(*args, **kwargs)
+    def run(self, **kwargs):
+        job_spec = self.arvados_job_spec(**kwargs)
 
         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
 
@@ -348,10 +376,7 @@ class RunnerJob(Runner):
             return
 
         self.uuid = job["uuid"]
-        self.arvrunner.processes[self.uuid] = self
-
-        if job["state"] in ("Complete", "Failed", "Cancelled"):
-            self.done(job)
+        self.arvrunner.process_submitted(self)
 
 
 class RunnerTemplate(object):
@@ -367,7 +392,7 @@ class RunnerTemplate(object):
     }
 
     def __init__(self, runner, tool, job_order, enable_reuse, uuid,
-                 submit_runner_ram=0, name=None):
+                 submit_runner_ram=0, name=None, merged_map=None):
         self.runner = runner
         self.tool = tool
         self.job = RunnerJob(
@@ -378,7 +403,8 @@ class RunnerTemplate(object):
             output_name=None,
             output_tags=None,
             submit_runner_ram=submit_runner_ram,
-            name=name)
+            name=name,
+            merged_map=merged_map)
         self.uuid = uuid
 
     def pipeline_component_spec(self):