Merge branch '19109-upload-secondary' refs #19109
authorPeter Amstutz <peter.amstutz@curii.com>
Fri, 13 May 2022 15:08:31 +0000 (11:08 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Fri, 13 May 2022 15:08:31 +0000 (11:08 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

1  2 
sdk/cwl/arvados_cwl/runner.py

index 50b3bb94d8934c1234220c25a3aa6bfb1a3075a0,c8668afcac2be23c6ccb38b15f6a5b77b7327839..f232178c5d5400ab90aad300d2f1d2d70909b98d
@@@ -39,7 -39,6 +39,7 @@@ from cwltool.builder import Builde
  import schema_salad.validate as validate
  
  import arvados.collection
 +import arvados.util
  from .util import collectionUUID
  from ruamel.yaml import YAML
  from ruamel.yaml.comments import CommentedMap, CommentedSeq
@@@ -129,6 -128,9 +129,9 @@@ def set_secondary(fsaccess, builder, in
              set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
          return
  
+     if inputschema == "File":
+         inputschema = {"type": "File"}
      if isinstance(inputschema, basestring):
          sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
          if sd:
              set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
  
      elif (inputschema["type"] == "File" and
-           secondaryspec and
            isinstance(primary, Mapping) and
-           primary.get("class") == "File" and
-           "secondaryFiles" not in primary):
+           primary.get("class") == "File"):
+         if "secondaryFiles" in primary or not secondaryspec:
+             # Nothing to do.
+             return
          #
          # Found a file, check for secondaryFiles
          #
          primary["secondaryFiles"] = secondaryspec
          for i, sf in enumerate(aslist(secondaryspec)):
              if builder.cwlVersion == "v1.0":
-                 pattern = builder.do_eval(sf, context=primary)
+                 pattern = sf
              else:
-                 pattern = builder.do_eval(sf["pattern"], context=primary)
+                 pattern = sf["pattern"]
              if pattern is None:
                  continue
              if isinstance(pattern, list):
                      "Expression must return list, object, string or null")
  
              if pattern is not None:
-                 sfpath = substitute(primary["location"], pattern)
+                 if "${" in pattern or "$(" in pattern:
+                     sfname = builder.do_eval(pattern, context=primary)
+                 else:
+                     sfname = substitute(primary["basename"], pattern)
+                 if sfname is None:
+                     continue
+                 p_location = primary["location"]
+                 if "/" in p_location:
+                     sfpath = (
+                         p_location[0 : p_location.rindex("/") + 1]
+                         + sfname
+                     )
  
              required = builder.do_eval(required, context=primary)
  
@@@ -240,7 -258,7 +259,7 @@@ def discover_secondary_files(fsaccess, 
              set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
  
  def upload_dependencies(arvrunner, name, document_loader,
 -                        workflowobj, uri, loadref_run,
 +                        workflowobj, uri, loadref_run, runtimeContext,
                          include_primary=True, discovered_secondaryfiles=None):
      """Upload the dependencies of the workflowobj document to Keep.
  
                             single_collection=True,
                             optional_deps=optional_deps)
  
 +    keeprefs = set()
 +    def addkeepref(k):
 +        if k.startswith("keep:"):
 +            keeprefs.add(collection_pdh_pattern.match(k).group(1))
 +
      def setloc(p):
          loc = p.get("location")
          if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
              p["location"] = mapper.mapper(p["location"]).resolved
 +            addkeepref(p["location"])
              return
  
          if not loc:
  
          gp = collection_uuid_pattern.match(loc)
          if not gp:
 +            # Not a uuid pattern (must be a pdh pattern)
 +            addkeepref(p["location"])
              return
 +
          uuid = gp.groups()[0]
          if uuid not in uuid_map:
              raise SourceLine(p, "location", validate.ValidationException).makeError(
          for d in discovered:
              discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
  
 +    if runtimeContext.copy_deps:
 +        # Find referenced collections and copy them into the
 +        # destination project, for easy sharing.
 +        already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
 +                                     filters=[["portable_data_hash", "in", list(keeprefs)],
 +                                              ["owner_uuid", "=", runtimeContext.project_uuid]],
 +                                     select=["uuid", "portable_data_hash", "created_at"]))
 +
 +        keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
 +        for kr in keeprefs:
 +            col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
 +                                                  order="created_at desc",
 +                                                   select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
 +                                                   limit=1).execute()
 +            if len(col["items"]) == 0:
 +                logger.warning("Cannot find collection with portable data hash %s", kr)
 +                continue
 +            col = col["items"][0]
 +            try:
 +                arvrunner.api.collections().create(body={"collection": {
 +                    "owner_uuid": runtimeContext.project_uuid,
 +                    "name": col["name"],
 +                    "description": col["description"],
 +                    "properties": col["properties"],
 +                    "portable_data_hash": col["portable_data_hash"],
 +                    "manifest_text": col["manifest_text"],
 +                    "storage_classes_desired": col["storage_classes_desired"],
 +                    "trash_at": col["trash_at"]
 +                }}, ensure_unique_name=True).execute()
 +            except Exception as e:
 +                logger.warning("Unable copy collection to destination: %s", e)
 +
      if "$schemas" in workflowobj:
          sch = CommentedSeq()
          for s in workflowobj["$schemas"]:
      return mapper
  
  
 -def upload_docker(arvrunner, tool):
 +def upload_docker(arvrunner, tool, runtimeContext):
      """Uploads Docker images used in CommandLineTool objects."""
  
      if isinstance(tool, CommandLineTool):
          (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
          if docker_req:
              if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
 -                # TODO: can be supported by containers API, but not jobs API.
                  raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                      "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
 -            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
 -                                                       arvrunner.runtimeContext.force_docker_pull,
 -                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
 -                                                       arvrunner.runtimeContext.match_local_docker)
 +
 +            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
 +                                                       runtimeContext.project_uuid,
 +                                                       runtimeContext.force_docker_pull,
 +                                                       runtimeContext.tmp_outdir_prefix,
 +                                                       runtimeContext.match_local_docker,
 +                                                       runtimeContext.copy_deps)
          else:
              arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
 -                                                       True, arvrunner.project_uuid,
 -                                                       arvrunner.runtimeContext.force_docker_pull,
 -                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
 -                                                       arvrunner.runtimeContext.match_local_docker)
 +                                                       True,
 +                                                       runtimeContext.project_uuid,
 +                                                       runtimeContext.force_docker_pull,
 +                                                       runtimeContext.tmp_outdir_prefix,
 +                                                       runtimeContext.match_local_docker,
 +                                                       runtimeContext.copy_deps)
      elif isinstance(tool, cwltool.workflow.Workflow):
          for s in tool.steps:
 -            upload_docker(arvrunner, s.embedded_tool)
 +            upload_docker(arvrunner, s.embedded_tool, runtimeContext)
  
  
 -def packed_workflow(arvrunner, tool, merged_map):
 +def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
      """Create a packed workflow.
  
      A "packed" workflow is one where all the components have been combined into a single document."""
                      v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
              if v.get("class") == "DockerRequirement":
                  v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
 -                                                                                                             arvrunner.project_uuid,
 -                                                                                                             arvrunner.runtimeContext.force_docker_pull,
 -                                                                                                             arvrunner.runtimeContext.tmp_outdir_prefix,
 -                                                                                                             arvrunner.runtimeContext.match_local_docker)
 +                                                                                                             runtimeContext.project_uuid,
 +                                                                                                             runtimeContext.force_docker_pull,
 +                                                                                                             runtimeContext.tmp_outdir_prefix,
 +                                                                                                             runtimeContext.match_local_docker,
 +                                                                                                             runtimeContext.copy_deps)
              for l in v:
                  visit(v[l], cur_id)
          if isinstance(v, list):
@@@ -574,7 -546,7 +593,7 @@@ def tag_git_version(packed)
              packed["http://schema.org/version"] = githash
  
  
 -def upload_job_order(arvrunner, name, tool, job_order):
 +def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
      """Upload local files referenced in the input object and return updated input
      object with 'location' updated to the proper keep references.
      """
                                      tool.doc_loader,
                                      job_order,
                                      job_order.get("id", "#"),
 -                                    False)
 +                                    False,
 +                                    runtimeContext)
  
      if "id" in job_order:
          del job_order["id"]
  
  FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
  
 -def upload_workflow_deps(arvrunner, tool):
 +def upload_workflow_deps(arvrunner, tool, runtimeContext):
      # Ensure that Docker images needed by this workflow are available
  
 -    upload_docker(arvrunner, tool)
 +    upload_docker(arvrunner, tool, runtimeContext)
  
      document_loader = tool.doc_loader
  
                                       deptool,
                                       deptool["id"],
                                       False,
 +                                     runtimeContext,
                                       include_primary=False,
                                       discovered_secondaryfiles=discovered_secondaryfiles)
              document_loader.idx[deptool["id"]] = deptool
  
      return merged_map
  
 -def arvados_jobs_image(arvrunner, img):
 +def arvados_jobs_image(arvrunner, img, runtimeContext):
      """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
  
      try:
 -        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
 -                                                          arvrunner.runtimeContext.force_docker_pull,
 -                                                          arvrunner.runtimeContext.tmp_outdir_prefix,
 -                                                          arvrunner.runtimeContext.match_local_docker)
 +        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
 +                                                          True,
 +                                                          runtimeContext.project_uuid,
 +                                                          runtimeContext.force_docker_pull,
 +                                                          runtimeContext.tmp_outdir_prefix,
 +                                                          runtimeContext.match_local_docker,
 +                                                          runtimeContext.copy_deps)
      except Exception as e:
          raise Exception("Docker image %s is not available\n%s" % (img, e) )
  
  
 -def upload_workflow_collection(arvrunner, name, packed):
 +def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
      collection = arvados.collection.Collection(api_client=arvrunner.api,
                                                 keep_client=arvrunner.keep_client,
                                                 num_retries=arvrunner.num_retries)
  
      filters = [["portable_data_hash", "=", collection.portable_data_hash()],
                 ["name", "like", name+"%"]]
 -    if arvrunner.project_uuid:
 -        filters.append(["owner_uuid", "=", arvrunner.project_uuid])
 +    if runtimeContext.project_uuid:
 +        filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
      exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
  
      if exists["items"]:
          logger.info("Using collection %s", exists["items"][0]["uuid"])
      else:
          collection.save_new(name=name,
 -                            owner_uuid=arvrunner.project_uuid,
 +                            owner_uuid=runtimeContext.project_uuid,
                              ensure_unique_name=True,
                              num_retries=arvrunner.num_retries)
          logger.info("Uploaded to %s", collection.manifest_locator())