+ rewrite_to_orig = {}
+ for k,v in rewrites.items():
+ rewrite_to_orig[v] = k
+
+ def visit(v, cur_id):
+ if isinstance(v, dict):
+ if v.get("class") in ("CommandLineTool", "Workflow"):
+ cur_id = rewrite_to_orig.get(v["id"], v["id"])
+ if "location" in v and not v["location"].startswith("keep:"):
+ v["location"] = merged_map[cur_id].resolved[v["location"]]
+ if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
+ v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+ for l in v:
+ visit(v[l], cur_id)
+ if isinstance(v, list):
+ for l in v:
+ visit(l, cur_id)
+ visit(packed, None)
+ return packed
+
+
+def tag_git_version(packed):
+ if tool.tool["id"].startswith("file://"):
+ path = os.path.dirname(tool.tool["id"][7:])
+ try:
+ githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
+ except (OSError, subprocess.CalledProcessError):
+ pass
+ else:
+ packed["http://schema.org/version"] = githash
+
+
+def upload_job_order(arvrunner, name, tool, job_order):
+ """Upload local files referenced in the input object and return updated input
+ object with 'location' updated to the proper keep references.
+ """
+
+ discover_secondary_files(tool.tool["inputs"], job_order)
+
+ jobmapper = upload_dependencies(arvrunner,
+ name,
+ tool.doc_loader,
+ job_order,
+ job_order.get("id", "#"),
+ False)
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ # Need to filter this out, gets added by cwltool when providing
+ # parameters on the command line.
+ if "job_order" in job_order:
+ del job_order["job_order"]
+
+ return job_order
+
+FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
+
+def upload_workflow_deps(arvrunner, tool):
+ # Ensure that Docker images needed by this workflow are available
+
+ upload_docker(arvrunner, tool)
+
+ document_loader = tool.doc_loader
+
+ merged_map = {}
+
+ def upload_tool_deps(deptool):
+ if "id" in deptool:
+ discovered_secondaryfiles = {}
+ pm = upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ include_primary=False,
+ discovered_secondaryfiles=discovered_secondaryfiles)
+ document_loader.idx[deptool["id"]] = deptool
+ toolmap = {}
+ for k,v in pm.items():
+ toolmap[k] = v.resolved
+ merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+
+ tool.visit(upload_tool_deps)
+
+ return merged_map
+
+def arvados_jobs_image(arvrunner, img):
+ """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
+
+ try:
+ arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+ except Exception as e:
+ raise Exception("Docker image %s is not available\n%s" % (img, e) )
+ return img
+
+def upload_workflow_collection(arvrunner, name, packed):
+ collection = arvados.collection.Collection(api_client=arvrunner.api,
+ keep_client=arvrunner.keep_client,
+ num_retries=arvrunner.num_retries)
+ with collection.open("workflow.cwl", "w") as f:
+ f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
+
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", name+"%"]]
+ if arvrunner.project_uuid:
+ filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+ exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
+
+ if exists["items"]:
+ logger.info("Using collection %s", exists["items"][0]["uuid"])
+ else:
+ collection.save_new(name=name,
+ owner_uuid=arvrunner.project_uuid,
+ ensure_unique_name=True,
+ num_retries=arvrunner.num_retries)
+ logger.info("Uploaded to %s", collection.manifest_locator())
+
+ return collection.portable_data_hash()