From 19da21ab8e56154d7db15c2643524cb8348a7a8a Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 12 Apr 2018 20:54:57 -0400 Subject: [PATCH] 13108: crunch_script uses safeapi Also take workflow lock on update_pipeline_component to prevent update races. Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/arvjob.py | 39 ++++++++++++++-------------- sdk/cwl/arvados_cwl/crunch_script.py | 3 ++- sdk/cwl/arvados_cwl/runner.py | 3 +-- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index e222152a16..decd692931 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -184,27 +184,28 @@ class ArvadosJob(object): self.output_callback({}, "permanentFail") def update_pipeline_component(self, record): - if self.arvrunner.pipeline: - self.arvrunner.pipeline["components"][self.name] = {"job": record} - with Perf(metrics, "update_pipeline_component %s" % self.name): - self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update( - uuid=self.arvrunner.pipeline["uuid"], - body={ - "components": self.arvrunner.pipeline["components"] - }).execute(num_retries=self.arvrunner.num_retries) - if self.arvrunner.uuid: - try: - job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute() - if job: - components = job["components"] - components[self.name] = record["uuid"] - self.arvrunner.api.jobs().update( - uuid=self.arvrunner.uuid, + with self.arvrunner.workflow_eval_lock: + if self.arvrunner.pipeline: + self.arvrunner.pipeline["components"][self.name] = {"job": record} + with Perf(metrics, "update_pipeline_component %s" % self.name): + self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update( + uuid=self.arvrunner.pipeline["uuid"], body={ - "components": components + "components": self.arvrunner.pipeline["components"] }).execute(num_retries=self.arvrunner.num_retries) - except Exception as e: - logger.info("Error adding to components: %s", e) + if self.arvrunner.uuid: + try: + job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute() + if job: + components = job["components"] + components[self.name] = record["uuid"] + self.arvrunner.api.jobs().update( + uuid=self.arvrunner.uuid, + body={ + "components": components + }).execute(num_retries=self.arvrunner.num_retries) + except Exception as e: + logger.info("Error adding to components: %s", e) def done(self, record): try: diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py index aaeffea24b..bf940eca4b 100644 --- a/sdk/cwl/arvados_cwl/crunch_script.py +++ b/sdk/cwl/arvados_cwl/crunch_script.py @@ -97,7 +97,8 @@ def run(): debug = job_order_object["arv:debug"] del job_order_object["arv:debug"] - runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()), + runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache( + api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}), output_name=output_name, output_tags=output_tags) make_fs_access = functools.partial(CollectionFsAccess, diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 053c99502b..bf0eb08129 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -400,5 +400,4 @@ class Runner(object): else: self.arvrunner.output_callback(outputs, processStatus) finally: - if record["uuid"] in self.arvrunner.processes: - del self.arvrunner.processes[record["uuid"]] + self.arvrunner.process_done(record["uuid"]) -- 2.30.2