-def upload_instance(arvrunner, name, tool, job_order):
- upload_docker(arvrunner, tool)
-
- for t in tool.tool["inputs"]:
- def setSecondary(fileobj):
- if isinstance(fileobj, dict) and fileobj.get("class") == "File":
- if "secondaryFiles" not in fileobj:
- fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
-
- if isinstance(fileobj, list):
- for e in fileobj:
- setSecondary(e)
-
- if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
- setSecondary(job_order[shortname(t["id"])])
-
- workflowmapper = upload_dependencies(arvrunner,
- name,
- tool.doc_loader,
- tool.tool,
- tool.tool["id"],
- True)
- jobmapper = upload_dependencies(arvrunner,
- os.path.basename(job_order.get("id", "#")),
- tool.doc_loader,
- job_order,
- job_order.get("id", "#"),
- False)
-
- if "id" in job_order:
- del job_order["id"]
-
- return workflowmapper
-
-def arvados_jobs_image(arvrunner):
- img = "arvados/jobs:"+__version__
+def packed_workflow(arvrunner, tool):
+ """Create a packed workflow.
+
+ A "packed" workflow is one where all the components have been combined into a single document."""
+
+ return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
+ tool.tool["id"], tool.metadata)
+
+def tag_git_version(packed):
+ if tool.tool["id"].startswith("file://"):
+ path = os.path.dirname(tool.tool["id"][7:])
+ try:
+ githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
+ except (OSError, subprocess.CalledProcessError):
+ pass
+ else:
+ packed["http://schema.org/version"] = githash
+
+
+def upload_job_order(arvrunner, name, tool, job_order):
+ """Upload local files referenced in the input object and return updated input
+ object with 'location' updated to the proper keep references.
+ """
+
+ for t in tool.tool["inputs"]:
+ def setSecondary(fileobj):
+ if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+ if "secondaryFiles" not in fileobj:
+ fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+ if isinstance(fileobj, list):
+ for e in fileobj:
+ setSecondary(e)
+
+ if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+ setSecondary(job_order[shortname(t["id"])])
+
+ jobmapper = upload_dependencies(arvrunner,
+ name,
+ tool.doc_loader,
+ job_order,
+ job_order.get("id", "#"),
+ False)
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ # Need to filter this out, gets added by cwltool when providing
+ # parameters on the command line.
+ if "job_order" in job_order:
+ del job_order["job_order"]
+
+ return job_order
+
+def upload_workflow_deps(arvrunner, tool):
+ # Ensure that Docker images needed by this workflow are available
+
+ upload_docker(arvrunner, tool)
+
+ document_loader = tool.doc_loader
+
+ def upload_tool_deps(deptool):
+ if "id" in deptool:
+ upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ include_primary=False)
+ document_loader.idx[deptool["id"]] = deptool
+
+ tool.visit(upload_tool_deps)
+
+def arvados_jobs_image(arvrunner, img):
+ """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
+