From d071c34ca20aa86a5a053abcffb7414dbd8f4933 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 16 Oct 2014 16:32:30 -0400 Subject: [PATCH] 3609: Further documentation improvements, --local now runs pipeline runner instead of just arv-crunch-job, added --no-reuse and --no-wait --- crunch_scripts/run-command | 5 ++++- doc/user/topics/arv-run.html.textile.liquid | 12 ++++++++++ .../topics/run-command.html.textile.liquid | 4 ++++ sdk/python/arvados/commands/run.py | 22 +++++++++++++++---- sdk/python/arvados/commands/ws.py | 14 +++++++++--- sdk/python/arvados/events.py | 12 +++++++++- 6 files changed, 60 insertions(+), 9 deletions(-) diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command index e21089ee80..6c27a94572 100755 --- a/crunch_scripts/run-command +++ b/crunch_scripts/run-command @@ -352,7 +352,10 @@ try: while len(pids) > 0: (pid, status) = os.wait() pids.discard(pid) - rcode[pid] = (status >> 8) + if not taskp.get("task.ignore_rcode"): + rcode[pid] = (status >> 8) + else: + rcode[pid] = 0 if sig.sig is not None: logger.critical("terminating on signal %s" % sig.sig) diff --git a/doc/user/topics/arv-run.html.textile.liquid b/doc/user/topics/arv-run.html.textile.liquid index dcf03ed850..0d7d8c1643 100644 --- a/doc/user/topics/arv-run.html.textile.liquid +++ b/doc/user/topics/arv-run.html.textile.liquid @@ -44,6 +44,18 @@ h2. Parallel tasks HWI-ST1027_129_D0THKACXX.1_1.fastq HWI-ST1027_129_D0THKACXX.1_2.fastq $ arv-run grep -H -n ATTGGAGGAAAGATGAGTGAC -- *.fastq Running pipeline qr1hi-d1hrv-mg3bju0u7r6w241 +[...] +Thu Oct 16 19:27:42 2014 qr1hi-8i9sb-r0n6w78aq0knsoj 2331 0 stderr run-command: parallelizing on input0 with items [u'/keep/3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq', u'/keep/3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_2.fastq'] +[...] +Thu Oct 16 19:27:45 2014 qr1hi-8i9sb-r0n6w78aq0knsoj 2331 1 stderr run-command: grep -H -n ATTGGAGGAAAGATGAGTGAC /keep/3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq +Thu Oct 16 19:27:46 2014 qr1hi-8i9sb-r0n6w78aq0knsoj 2331 2 stderr run-command: grep -H -n ATTGGAGGAAAGATGAGTGAC /keep/3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_2.fastq +[...] +Thu Oct 16 19:27:46 2014 qr1hi-8i9sb-r0n6w78aq0knsoj 2331 1 stderr /keep/3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq:14:TCTGGCCCCTGTTGTCTGCATGTAACTTAATACCACAACCAGGCATAGGGGAAAGATTGGAGGAAAGATGAGTGACAGCATCAACTTCTCTCCCAACCTA +Thu Oct 16 19:27:46 2014 qr1hi-8i9sb-r0n6w78aq0knsoj 2331 1 stderr /keep/3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq:18:AACCAGGCATAGGGGAAAGATTGGAGGAAAGATGAGTGACAGCATCAACTTCTCTCACAACCTAGGCCAGTAAGTAGTGCTTGTGCTCATCTCCTTGGCT +Thu Oct 16 19:27:46 2014 qr1hi-8i9sb-r0n6w78aq0knsoj 2331 1 stderr /keep/3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq:30:ATAGGGGAAAGATTGGAGGAAAGATGAGTGACAGCATCAACTTCTCTCACAACCTAGGCCAGTAAGTAGTGCTTGTGCTCATCTCCTTGGCTGTGATACG +Thu Oct 16 19:27:47 2014 qr1hi-8i9sb-r0n6w78aq0knsoj 2331 1 stderr run-command: completed with exit code 0 (success) +Thu Oct 16 19:27:47 2014 qr1hi-8i9sb-r0n6w78aq0knsoj 2331 2 stderr /keep/3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_2.fastq:34:CTGGCCCCTGTTGTCTGCATGTAACTTAATACCACAACCAGGCATAGGGGAAAGATTGGAGGAAAGATGAGTGACAGCATCAACTTCTCTCACAACCTAG +Thu Oct 16 19:27:47 2014 qr1hi-8i9sb-r0n6w78aq0knsoj 2331 2 stderr run-command: completed with exit code 0 (success) diff --git a/doc/user/topics/run-command.html.textile.liquid b/doc/user/topics/run-command.html.textile.liquid index c025008fb3..7d92876e96 100644 --- a/doc/user/topics/run-command.html.textile.liquid +++ b/doc/user/topics/run-command.html.textile.liquid @@ -180,6 +180,10 @@ h3. task.cwd This directive sets the initial current working directory that your command will run in. If @task.cwd@ is not specified, the default current working directory is @task.outdir@. +h3. task.ignore_rcode + +By Unix convention a task which exits with a non-zero return code is considered failed. However, some programs (such as @grep@) return non-zero codes for conditions that should not be considered fatal errors. Set @"task.ignore_rcode": true@ to indicate the task should always succeed. + h3. task.stdin and task.stdout Provide standard input and standard output redirection. diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py index 2375f5831e..7b0cb35bfc 100644 --- a/sdk/python/arvados/commands/run.py +++ b/sdk/python/arvados/commands/run.py @@ -18,6 +18,9 @@ arvrun_parser = argparse.ArgumentParser() arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit") arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-crunch-job") arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs", help="Docker image to use, default arvados/jobs") +arvrun_parser.add_argument('--ignore-rcode', action="store_true", help="Set this to indicate commands that return non-zero return codes should not be considered failed.") +arvrun_parser.add_argument('--no-reuse', action="store_true", help="Do not reuse past jobs.") +arvrun_parser.add_argument('--no-wait', action="store_true", help="Do not wait and display logs after submitting command, just exit.") arvrun_parser.add_argument('--git-dir', type=str, default="", help="Git repository passed to arv-crunch-job when using --local") arvrun_parser.add_argument('--repository', type=str, default="arvados", help="repository field of component, default 'arvados'") arvrun_parser.add_argument('--script-version', type=str, default="master", help="script_version field of component, default 'master'") @@ -196,6 +199,8 @@ def main(arguments=None): component["script_parameters"]["task.foreach"] = task_foreach component["script_parameters"]["command"] = slots[2:] + if args.ignore_rcode: + component["script_parameters"]["task.ignore_rcode"] = args.ignore_rcode pipeline = { "name": " | ".join([s[0] for s in slots[2:]]), @@ -203,18 +208,27 @@ def main(arguments=None): "components": { "command": component }, - "state":"RunningOnServer" + "state": "RunningOnClient" if args.local else "RunningOnServer" } if args.dry_run: print(json.dumps(pipeline, indent=4)) - elif args.local: - subprocess.call(["arv-crunch-job", "--job", json.dumps(component), "--git-dir", args.git_dir]) else: api = arvados.api('v1') pi = api.pipeline_instances().create(body=pipeline).execute() print "Running pipeline %s" % pi["uuid"] - ws.main(["--pipeline", pi["uuid"]]) + + if args.local: + subprocess.call(["arv-run-pipeline-instance", "--instance", pi["uuid"], "--run-jobs-here"] + (["--no-reuse"] if args.no_reuse else [])) + elif not args.no_wait: + ws.main(["--pipeline", pi["uuid"]]) + + pi = api.pipeline_instances().get(uuid=pi["uuid"]).execute() + print "Pipeline is %s" % pi["state"] + if "output_uuid" in pi["components"]["command"]: + print "Output is %s" % pi["components"]["command"]["output_uuid"] + else: + print "No output" if __name__ == '__main__': main() diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py index 674daadcd9..04e3f6414b 100644 --- a/sdk/python/arvados/commands/ws.py +++ b/sdk/python/arvados/commands/ws.py @@ -71,6 +71,14 @@ def main(arguments=None): elif ev["event_type"] in ("create", "update"): if ev["object_kind"] == "arvados#pipelineInstance": update_subscribed_components(ev["properties"]["new_attributes"]["components"]) + + if ev["object_kind"] == "arvados#pipelineInstance" and args.pipeline: + if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Paused"): + ws.close() + + if ev["object_kind"] == "arvados#job" and args.job: + if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"): + ws.close() elif 'status' in ev and ev['status'] == 200: pass else: @@ -82,9 +90,9 @@ def main(arguments=None): if args.pipeline: c = api.pipeline_instances().get(uuid=args.pipeline).execute() update_subscribed_components(c["components"]) - - while True: - signal.pause() + if c["state"] in ("Complete", "Failed", "Paused"): + ws.close() + ws.run_forever() except KeyboardInterrupt: pass except Exception as e: diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py index e6038fcd7d..d1abc0f7de 100644 --- a/sdk/python/arvados/events.py +++ b/sdk/python/arvados/events.py @@ -78,9 +78,19 @@ class PollClient(threading.Thread): self.id = max_id self.stop.wait(self.poll_time) + def run_forever(self): + self.stop.wait() + def close(self): self.stop.set() - self.join() + try: + self.join() + except RuntimeError: + # "join() raises a RuntimeError if an attempt is made to join the + # current thread as that would cause a deadlock. It is also an + # error to join() a thread before it has been started and attempts + # to do so raises the same exception." + pass def subscribe(self, filters): self.on_event({'status': 200}) -- 2.39.5