+def upload_job_order(arvrunner, name, tool, job_order):
+ """Upload local files referenced in the input object and return updated input
+ object with 'location' updated to the proper keep references.
+ """
+
+ discover_secondary_files(tool.tool["inputs"], job_order)
+
+ jobmapper = upload_dependencies(arvrunner,
+ name,
+ tool.doc_loader,
+ job_order,
+ job_order.get("id", "#"),
+ False)
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ # Need to filter this out, gets added by cwltool when providing
+ # parameters on the command line.
+ if "job_order" in job_order:
+ del job_order["job_order"]
+
+ return job_order
+
+FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
+
+def upload_workflow_deps(arvrunner, tool):
+ # Ensure that Docker images needed by this workflow are available
+
+ upload_docker(arvrunner, tool)
+
+ document_loader = tool.doc_loader
+
+ merged_map = {}
+
+ def upload_tool_deps(deptool):
+ if "id" in deptool:
+ discovered_secondaryfiles = {}
+ pm = upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ include_primary=False,
+ discovered_secondaryfiles=discovered_secondaryfiles)
+ document_loader.idx[deptool["id"]] = deptool
+ toolmap = {}
+ for k,v in pm.items():
+ toolmap[k] = v.resolved
+ merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+
+ tool.visit(upload_tool_deps)
+
+ return merged_map
+
+def arvados_jobs_image(arvrunner, img):
+ """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
+
+ try:
+ arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+ except Exception as e:
+ raise Exception("Docker image %s is not available\n%s" % (img, e) )
+ return img
+
+def upload_workflow_collection(arvrunner, name, packed):
+ collection = arvados.collection.Collection(api_client=arvrunner.api,
+ keep_client=arvrunner.keep_client,
+ num_retries=arvrunner.num_retries)
+ with collection.open("workflow.cwl", "w") as f:
+ f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
+
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", name+"%"]]
+ if arvrunner.project_uuid:
+ filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+ exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
+
+ if exists["items"]:
+ logger.info("Using collection %s", exists["items"][0]["uuid"])
+ else:
+ collection.save_new(name=name,
+ owner_uuid=arvrunner.project_uuid,
+ ensure_unique_name=True,
+ num_retries=arvrunner.num_retries)
+ logger.info("Uploaded to %s", collection.manifest_locator())
+
+ return collection.portable_data_hash()
+
+
+class Runner(object):
+ """Base class for runner processes, which submit an instance of
+ arvados-cwl-runner and wait for the final result."""
+
+ def __init__(self, runner, tool, job_order, enable_reuse,
+ output_name, output_tags, submit_runner_ram=0,
+ name=None, on_error=None, submit_runner_image=None,
+ intermediate_output_ttl=0, merged_map=None, priority=None,
+ secret_store=None):
+ self.arvrunner = runner
+ self.tool = tool
+ self.job_order = job_order
+ self.running = False
+ if enable_reuse:
+ # If reuse is permitted by command line arguments but
+ # disabled by the workflow itself, disable it.
+ reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
+ if reuse_req:
+ enable_reuse = reuse_req["enableReuse"]
+ self.enable_reuse = enable_reuse
+ self.uuid = None
+ self.final_output = None
+ self.output_name = output_name
+ self.output_tags = output_tags
+ self.name = name
+ self.on_error = on_error
+ self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
+ self.intermediate_output_ttl = intermediate_output_ttl
+ self.priority = priority
+ self.secret_store = secret_store
+
+ if submit_runner_ram:
+ self.submit_runner_ram = submit_runner_ram
+ else:
+ self.submit_runner_ram = 3000
+
+ if self.submit_runner_ram <= 0:
+ raise Exception("Value of --submit-runner-ram must be greater than zero")
+
+ self.merged_map = merged_map or {}
+
+ def update_pipeline_component(self, record):
+ pass