Merge branch '10172-crunch2-container-output' closes #10172
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 25 Oct 2016 17:00:23 +0000 (13:00 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 25 Oct 2016 17:00:23 +0000 (13:00 -0400)
1  2 
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py

index e11f6a12a679f0d5cf2b6bf6e6b1726d512637a8,936f445c50be22d1d3318bd5b82b20e8c600f720..7a951e93ca2f3fc75c8cca76bb8c50a5bc887d37
@@@ -30,7 -30,6 +30,7 @@@ from .arvworkflow import ArvadosWorkflo
  from .fsaccess import CollectionFsAccess
  from .perf import Perf
  from .pathmapper import FinalOutputPathMapper
 +from ._version import __version__
  
  from cwltool.pack import pack
  from cwltool.process import shortname, UnsupportedRequirement, getListing
@@@ -124,7 -123,8 +124,8 @@@ class ArvCwlRunner(object)
                      try:
                          self.cond.acquire()
                          j = self.processes[uuid]
-                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+                         txt = self.work_api[0].upper() + self.work_api[1:-1]
+                         logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
                          with Perf(metrics, "done %s" % j.name):
                              j.done(event["properties"]["new_attributes"])
                          self.cond.notify()
  
          self.final_output_collection = final
  
+     def set_crunch_output(self):
+         if self.work_api == "containers":
+             try:
+                 current = self.api.containers().current().execute(num_retries=self.num_retries)
+                 self.api.containers().update(uuid=current['uuid'],
+                                              body={
+                                                  'output': self.final_output_collection.portable_data_hash(),
+                                              }).execute(num_retries=self.num_retries)
+             except Exception as e:
+                 logger.info("Setting container output: %s", e)
+         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
+             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
+                                    body={
+                                        'output': self.final_output_collection.portable_data_hash(),
+                                        'success': self.final_status == "success",
+                                        'progress':1.0
+                                    }).execute(num_retries=self.num_retries)
      def arv_executor(self, tool, job_order, **kwargs):
          self.debug = kwargs.get("debug")
  
          if self.final_status == "UnsupportedRequirement":
              raise UnsupportedRequirement("Check log for details.")
  
-         if self.final_status != "success":
-             raise WorkflowException("Workflow failed.")
          if self.final_output is None:
              raise WorkflowException("Workflow did not return a result.")
  
              if self.output_name is None:
                  self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
              self.make_output_collection(self.output_name, self.final_output)
+             self.set_crunch_output()
+         if self.final_status != "success":
+             raise WorkflowException("Workflow failed.")
  
          if kwargs.get("compute_checksum"):
              adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
@@@ -390,7 -409,7 +410,7 @@@ def versionstring()
      arvpkg = pkg_resources.require("arvados-python-client")
      cwlpkg = pkg_resources.require("cwltool")
  
 -    return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
 +    return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
                                      "arvados-python-client", arvpkg[0].version,
                                      "cwltool", cwlpkg[0].version)
  
index 56f29c5cb1d1bc37db1e135e9e66778633a3047a,9f21b3fed560c4192222739a088cd931454f45be..1581d20d2f62920a5259fe936248900966947ccb
@@@ -11,7 -11,7 +11,7 @@@ import arvados.collectio
  
  from .arvdocker import arv_docker_get_image
  from . import done
 -from .runner import Runner
 +from .runner import Runner, arvados_jobs_image
  
  logger = logging.getLogger('arvados.cwl-runner')
  
@@@ -79,7 -79,7 +79,7 @@@ class ArvadosContainer(object)
  
          (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
          if not docker_req:
 -            docker_req = {"dockerImageId": "arvados/jobs"}
 +            docker_req = {"dockerImageId": arvados_jobs_image(self.arvrunner)}
  
          container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
                                                                       docker_req,
  
              self.arvrunner.processes[response["container_uuid"]] = self
  
-             logger.info("Container %s (%s) request state is %s", self.name, response["uuid"], response["state"])
+             container = self.arvrunner.api.containers().get(
+                 uuid=response["container_uuid"]
+             ).execute(num_retries=self.arvrunner.num_retries)
+             logger.info("Container request %s (%s) state is %s with container %s %s", self.name, response["uuid"], response["state"], container["uuid"], container["state"])
  
-             if response["state"] == "Final":
-                 self.done(response)
+             if container["state"] in ("Complete", "Cancelled"):
+                 self.done(container)
          except Exception as e:
              logger.error("Got error %s" % str(e))
              self.output_callback({}, "permanentFail")
@@@ -179,6 -183,11 +183,6 @@@ class RunnerContainer(Runner)
          workflowcollection = workflowcollection[5:workflowcollection.index('/')]
          jobpath = "/var/lib/cwl/job/cwl.input.json"
  
 -        container_image = arv_docker_get_image(self.arvrunner.api,
 -                                               {"dockerImageId": "arvados/jobs"},
 -                                               pull_image,
 -                                               self.arvrunner.project_uuid)
 -
          command = ["arvados-cwl-runner", "--local", "--api=containers"]
          if self.output_name:
              command.append("--output-name=" + self.output_name)
              "cwd": "/var/spool/cwl",
              "priority": 1,
              "state": "Committed",
 -            "container_image": container_image,
 +            "container_image": arvados_jobs_image(self.arvrunner),
              "mounts": {
                  "/var/lib/cwl/workflow": {
                      "kind": "collection",