import arvados.collection
from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing, upload_workflow_collection
from .pathmapper import InitialWorkDirPathMapper
from .perf import Perf
from . import done
self.update_pipeline_component(response)
- logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-
if response["state"] in ("Complete", "Failed", "Cancelled"):
+ logger.info("%s reuse job %s", self.arvrunner.label(self), response["uuid"])
with Perf(metrics, "done %s" % self.name):
self.done(response)
+ else:
+ logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
except Exception as e:
logger.exception("%s error" % (self.arvrunner.label(self)))
self.output_callback({}, "permanentFail")
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
- def upload_workflow_collection(self, packed):
- collection = arvados.collection.Collection(api_client=self.arvrunner.api,
- keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries)
- with collection.open("workflow.cwl", "w") as f:
- f.write(yaml.round_trip_dump(packed))
-
- filters = [["portable_data_hash", "=", collection.portable_data_hash()],
- ["name", "like", self.name+"%"]]
- if self.arvrunner.project_uuid:
- filters.append(["owner_uuid", "=", self.arvrunner.project_uuid])
- exists = self.arvrunner.api.collections().list(filters=filters).execute(num_retries=self.arvrunner.num_retries)
-
- if exists["items"]:
- logger.info("Using collection %s", exists["items"][0]["uuid"])
- else:
- collection.save_new(name=self.name,
- owner_uuid=self.arvrunner.project_uuid,
- ensure_unique_name=True,
- num_retries=self.arvrunner.num_retries)
- logger.info("Uploaded to %s", collection.manifest_locator())
-
- return collection.portable_data_hash()
-
def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
"""Create an Arvados job specification for this workflow.
self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
else:
packed = packed_workflow(self.arvrunner, self.tool)
- wf_pdh = self.upload_workflow_collection(packed)
+ wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
adjustDirObjs(self.job_order, trim_listing)