8815: Rely on system-provided crunchrunner. Also use arvados/jobs by default if...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index f587ad16b9b3538b783d0e3d7b210220690c60ee..e3fd1fccd372d75abf06445f32fe2a13f8a8e66c 100644 (file)
@@ -26,10 +26,6 @@ from arvados.api import OrderedJsonModel
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
 
-crunchrunner_pdh = "ff6fc71e593081ef9733afacaeee15ea+140"
-crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
-certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
-
 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
@@ -152,6 +148,8 @@ class ArvadosJob(object):
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if docker_req and kwargs.get("use_container") is not False:
             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+        else:
+            runtime_constraints["docker_image"] = "arvados/jobs"
 
         resources = self.builder.resources
         if resources is not None:
@@ -166,7 +164,7 @@ class ArvadosJob(object):
                 "repository": "arvados",
                 "script_version": "master",
                 "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
-                "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
+                "script_parameters": {"tasks": [script_parameters]},
                 "runtime_constraints": runtime_constraints
             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
 
@@ -213,7 +211,15 @@ class ArvadosJob(object):
                     tmpdir = None
                     outdir = None
                     keepdir = None
-                    for l in log.readlines():
+                    for l in log:
+                        # Determine the tmpdir, outdir and keepdir paths from
+                        # the job run.  Unfortunately, we can't take the first
+                        # values we find (which are expected to be near the
+                        # top) and stop scanning because if the node fails and
+                        # the job restarts on a different node these values
+                        # will different runs, and we need to know about the
+                        # final run that actually produced output.
+
                         g = tmpdirre.match(l)
                         if g:
                             tmpdir = g.group(1)
@@ -224,41 +230,39 @@ class ArvadosJob(object):
                         if g:
                             keepdir = g.group(1)
 
-                        # It turns out if the job fails and restarts it can
-                        # come up on a different compute node, so we have to
-                        # read the log to the end to be sure instead of taking the
-                        # easy way out.
-                        #
-                        #if tmpdir and outdir and keepdir:
-                        #    break
+                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
 
-                        # Create a collection located in the same project as the pipeline with the contents of the output.
+                    # check if collection already exists with same owner, name and content
+                    collection_exists = self.arvrunner.api.collections().list(
+                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                                 ['portable_data_hash', '=', record["output"]],
+                                 ["name", "=", colname]]
+                    ).execute(num_retries=self.arvrunner.num_retries)
+
+                    if not collection_exists["items"]:
+                        # Create a collection located in the same project as the
+                        # pipeline with the contents of the output.
                         # First, get output record.
-                        collections = self.arvrunner.api.collections().list(limit=1,
-                                                                           filters=[['portable_data_hash', '=', record["output"]]],
-                                                                           select=["portable_data_hash", "manifest_text"]
+                        collections = self.arvrunner.api.collections().list(
+                            limit=1,
+                            filters=[['portable_data_hash', '=', record["output"]]],
+                            select=["manifest_text"]
                         ).execute(num_retries=self.arvrunner.num_retries)
 
-                        if collections["items"]:
-                            colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
-                            # check if there is a name collision.
-                            name_collision = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
-                                                                                            ["name", "=", name]]
-                            ).execute(num_retries=self.arvrunner.num_retries)
-
-                            if not name_collision["items"]:
-                                # Create new collection in the parent project
-                                # with the output contents.
-                                self.arvrunner.api.collections().create(body={
-                                    "owner_uuid": self.arvrunner.project_uuid,
-                                    "name": colname,
-                                    "portable_data_hash": collections[0]["portable_data_hash"],
-                                    "manifest_text": collections[0]["manifest_text"]
-                                }, ensure_unique_name=True).execute(num_retries=self.arvrunner.num_retries)
-
-                            # else: there is already a collection with the same name and the
-                            # same contents, so nothing to do.
+                        if not collections["items"]:
+                            raise WorkflowException(
+                                "Job output '%s' cannot be found on API server" % (
+                                    record["output"]))
+
+                        # Create new collection in the parent project
+                        # with the output contents.
+                        self.arvrunner.api.collections().create(body={
+                            "owner_uuid": self.arvrunner.project_uuid,
+                            "name": colname,
+                            "portable_data_hash": record["output"],
+                            "manifest_text": collections["items"][0]["manifest_text"]
+                        }, ensure_unique_name=True).execute(
+                            num_retries=self.arvrunner.num_retries)
 
                     self.builder.outdir = outdir
                     self.builder.pathmapper.keepdir = keepdir
@@ -392,22 +396,6 @@ class ArvCwlRunner(object):
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
         self.debug = args.debug
-
-        try:
-            self.api.collections().get(uuid=crunchrunner_pdh).execute()
-        except arvados.errors.ApiError as e:
-            import httplib2
-            h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
-            resp, content = h.request(crunchrunner_download, "GET")
-            resp2, content2 = h.request(certs_download, "GET")
-            with arvados.collection.Collection() as col:
-                with col.open("crunchrunner", "w") as f:
-                    f.write(content)
-                with col.open("ca-certificates.crt", "w") as f:
-                    f.write(content2)
-
-                col.save_new("crunchrunner binary", ensure_unique_name=True)
-
         self.fs_access = CollectionFsAccess(input_basedir)
 
         kwargs["fs_access"] = self.fs_access