3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
24 from arvados.keep import KeepClient
25 from arvados.errors import ApiError
27 from .arvcontainer import ArvadosContainer, RunnerContainer
28 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
29 from. runner import Runner, upload_instance
30 from .arvtool import ArvadosCommandTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
33 from .perf import Perf
34 from .pathmapper import FinalOutputPathMapper
35 from ._version import __version__
37 from cwltool.pack import pack
38 from cwltool.process import shortname, UnsupportedRequirement, getListing
39 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
40 from cwltool.draft2tool import compute_checksums
41 from arvados.api import OrderedJsonModel
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45 logger.setLevel(logging.INFO)
48 class ArvCwlRunner(object):
49 """Execute a CWL tool or workflow, submit work (using either jobs or
50 containers API), wait for them to complete, and report output.
54 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
57 self.lock = threading.Lock()
58 self.cond = threading.Condition(self.lock)
59 self.final_output = None
60 self.final_status = None
62 self.num_retries = num_retries
64 self.stop_polling = threading.Event()
67 self.final_output_collection = None
68 self.output_name = output_name
69 self.output_tags = output_tags
70 self.project_uuid = None
72 if keep_client is not None:
73 self.keep_client = keep_client
75 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
78 expected_api = ["jobs", "containers"]
79 for api in expected_api:
81 methods = self.api._rootDesc.get('resources')[api]['methods']
82 if ('httpMethod' in methods['create'] and
83 (work_api == api or work_api is None)):
91 raise Exception("No supported APIs")
93 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
95 def arv_make_tool(self, toolpath_object, **kwargs):
96 kwargs["work_api"] = self.work_api
97 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
99 keep_client=self.keep_client)
100 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
101 return ArvadosCommandTool(self, toolpath_object, **kwargs)
102 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
103 return ArvadosWorkflow(self, toolpath_object, **kwargs)
105 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
107 def output_callback(self, out, processStatus):
108 if processStatus == "success":
109 logger.info("Overall process status is %s", processStatus)
111 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
112 body={"state": "Complete"}).execute(num_retries=self.num_retries)
114 logger.warn("Overall process status is %s", processStatus)
116 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117 body={"state": "Failed"}).execute(num_retries=self.num_retries)
118 self.final_status = processStatus
119 self.final_output = out
121 def on_message(self, event):
122 if "object_uuid" in event:
123 if event["object_uuid"] in self.processes and event["event_type"] == "update":
124 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
125 uuid = event["object_uuid"]
127 j = self.processes[uuid]
128 logger.info("Job %s (%s) is Running", j.name, uuid)
130 j.update_pipeline_component(event["properties"]["new_attributes"])
131 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
132 uuid = event["object_uuid"]
135 j = self.processes[uuid]
136 txt = self.work_api[0].upper() + self.work_api[1:-1]
137 logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
138 with Perf(metrics, "done %s" % j.name):
139 j.done(event["properties"]["new_attributes"])
144 def poll_states(self):
145 """Poll status of jobs or containers listed in the processes dict.
147 Runs in a separate thread.
152 self.stop_polling.wait(15)
153 if self.stop_polling.is_set():
156 keys = self.processes.keys()
160 if self.work_api == "containers":
161 table = self.poll_api.container_requests()
162 elif self.work_api == "jobs":
163 table = self.poll_api.jobs()
166 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
167 except Exception as e:
168 logger.warn("Error checking states on API server: %s", e)
171 for p in proc_states["items"]:
173 "object_uuid": p["uuid"],
174 "event_type": "update",
180 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
182 self.processes.clear()
186 self.stop_polling.set()
188 def get_uploaded(self):
189 return self.uploaded.copy()
191 def add_uploaded(self, src, pair):
192 self.uploaded[src] = pair
194 def check_writable(self, obj):
195 if isinstance(obj, dict):
196 if obj.get("writable"):
197 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
198 for v in obj.itervalues():
199 self.check_writable(v)
200 if isinstance(obj, list):
202 self.check_writable(v)
204 def make_output_collection(self, name, tagsString, outputObj):
205 outputObj = copy.deepcopy(outputObj)
208 def capture(fileobj):
209 files.append(fileobj)
211 adjustDirObjs(outputObj, capture)
212 adjustFileObjs(outputObj, capture)
214 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
216 final = arvados.collection.Collection(api_client=self.api,
217 keep_client=self.keep_client,
218 num_retries=self.num_retries)
221 for k,v in generatemapper.items():
222 if k.startswith("_:"):
223 if v.type == "Directory":
225 if v.type == "CreateFile":
226 with final.open(v.target, "wb") as f:
227 f.write(v.resolved.encode("utf-8"))
230 if not k.startswith("keep:"):
231 raise Exception("Output source is not in keep or a literal")
233 srccollection = sp[0][5:]
234 if srccollection not in srccollections:
236 srccollections[srccollection] = arvados.collection.CollectionReader(
239 keep_client=self.keep_client,
240 num_retries=self.num_retries)
241 except arvados.errors.ArgumentError as e:
242 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
244 reader = srccollections[srccollection]
246 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
247 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
249 logger.warn("While preparing output collection: %s", e)
251 def rewrite(fileobj):
252 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
253 for k in ("basename", "listing", "contents"):
257 adjustDirObjs(outputObj, rewrite)
258 adjustFileObjs(outputObj, rewrite)
260 with final.open("cwl.output.json", "w") as f:
261 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
263 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
265 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
266 final.api_response()["name"],
267 final.manifest_locator())
269 final_uuid = final.manifest_locator()
270 tags = tagsString.split(',')
272 self.api.links().create(body={
273 "head_uuid": final_uuid, "link_class": "tag", "name": tag
274 }).execute(num_retries=self.num_retries)
276 def finalcollection(fileobj):
277 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
279 adjustDirObjs(outputObj, finalcollection)
280 adjustFileObjs(outputObj, finalcollection)
282 return (outputObj, final)
284 def set_crunch_output(self):
285 if self.work_api == "containers":
287 current = self.api.containers().current().execute(num_retries=self.num_retries)
288 except ApiError as e:
289 # Status code 404 just means we're not running in a container.
290 if e.resp.status != 404:
291 logger.info("Getting current container: %s", e)
294 self.api.containers().update(uuid=current['uuid'],
296 'output': self.final_output_collection.portable_data_hash(),
297 }).execute(num_retries=self.num_retries)
298 except Exception as e:
299 logger.info("Setting container output: %s", e)
300 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
301 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
303 'output': self.final_output_collection.portable_data_hash(),
304 'success': self.final_status == "success",
306 }).execute(num_retries=self.num_retries)
308 def arv_executor(self, tool, job_order, **kwargs):
309 self.debug = kwargs.get("debug")
311 tool.visit(self.check_writable)
313 self.project_uuid = kwargs.get("project_uuid")
315 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
317 keep_client=self.keep_client)
318 self.fs_access = make_fs_access(kwargs["basedir"])
320 existing_uuid = kwargs.get("update_workflow")
321 if existing_uuid or kwargs.get("create_workflow"):
322 if self.work_api == "jobs":
323 tmpl = RunnerTemplate(self, tool, job_order,
324 kwargs.get("enable_reuse"),
326 submit_runner_ram=kwargs.get("submit_runner_ram"),
327 name=kwargs.get("name"))
329 # cwltool.main will write our return value to stdout.
332 return upload_workflow(self, tool, job_order,
335 submit_runner_ram=kwargs.get("submit_runner_ram"),
336 name=kwargs.get("name"))
338 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
340 kwargs["make_fs_access"] = make_fs_access
341 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
342 kwargs["use_container"] = True
343 kwargs["tmpdir_prefix"] = "tmp"
344 kwargs["on_error"] = "continue"
345 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
347 if not kwargs["name"]:
350 if self.work_api == "containers":
351 kwargs["outdir"] = "/var/spool/cwl"
352 kwargs["docker_outdir"] = "/var/spool/cwl"
353 kwargs["tmpdir"] = "/tmp"
354 kwargs["docker_tmpdir"] = "/tmp"
355 elif self.work_api == "jobs":
356 kwargs["outdir"] = "$(task.outdir)"
357 kwargs["docker_outdir"] = "$(task.outdir)"
358 kwargs["tmpdir"] = "$(task.tmpdir)"
360 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
363 if kwargs.get("submit"):
364 if self.work_api == "containers":
365 if tool.tool["class"] == "CommandLineTool":
366 kwargs["runnerjob"] = tool.tool["id"]
367 runnerjob = tool.job(job_order,
368 self.output_callback,
371 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
372 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
373 name=kwargs.get("name"))
375 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
376 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
377 name=kwargs.get("name"))
379 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
380 # Create pipeline for local run
381 self.pipeline = self.api.pipeline_instances().create(
383 "owner_uuid": self.project_uuid,
384 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
386 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
387 logger.info("Pipeline instance %s", self.pipeline["uuid"])
389 if runnerjob and not kwargs.get("wait"):
390 runnerjob.run(wait=kwargs.get("wait"))
391 return runnerjob.uuid
393 self.poll_api = arvados.api('v1')
394 self.polling_thread = threading.Thread(target=self.poll_states)
395 self.polling_thread.start()
398 jobiter = iter((runnerjob,))
400 if "cwl_runner_job" in kwargs:
401 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
402 jobiter = tool.job(job_order,
403 self.output_callback,
408 # Will continue to hold the lock for the duration of this code
409 # except when in cond.wait(), at which point on_message can update
410 # job state and process output callbacks.
412 loopperf = Perf(metrics, "jobiter")
414 for runnable in jobiter:
417 if self.stop_polling.is_set():
421 with Perf(metrics, "run"):
422 runnable.run(**kwargs)
427 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
432 while self.processes:
435 except UnsupportedRequirement:
438 if sys.exc_info()[0] is KeyboardInterrupt:
439 logger.error("Interrupted, marking pipeline as failed")
441 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
443 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
444 body={"state": "Failed"}).execute(num_retries=self.num_retries)
445 if runnerjob and runnerjob.uuid and self.work_api == "containers":
446 self.api.container_requests().update(uuid=runnerjob.uuid,
447 body={"priority": "0"}).execute(num_retries=self.num_retries)
450 self.stop_polling.set()
451 self.polling_thread.join()
453 if self.final_status == "UnsupportedRequirement":
454 raise UnsupportedRequirement("Check log for details.")
456 if self.final_output is None:
457 raise WorkflowException("Workflow did not return a result.")
459 if kwargs.get("submit") and isinstance(runnerjob, Runner):
460 logger.info("Final output collection %s", runnerjob.final_output)
462 if self.output_name is None:
463 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
464 if self.output_tags is None:
465 self.output_tags = ""
466 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
467 self.set_crunch_output()
469 if self.final_status != "success":
470 raise WorkflowException("Workflow failed.")
472 if kwargs.get("compute_checksum"):
473 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
474 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
476 return self.final_output
480 """Print version string of key packages for provenance and debugging."""
482 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
483 arvpkg = pkg_resources.require("arvados-python-client")
484 cwlpkg = pkg_resources.require("cwltool")
486 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
487 "arvados-python-client", arvpkg[0].version,
488 "cwltool", cwlpkg[0].version)
491 def arg_parser(): # type: () -> argparse.ArgumentParser
492 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
494 parser.add_argument("--basedir", type=str,
495 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
496 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
497 help="Output directory, default current directory")
499 parser.add_argument("--eval-timeout",
500 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
503 parser.add_argument("--version", action="store_true", help="Print version and exit")
505 exgroup = parser.add_mutually_exclusive_group()
506 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
507 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
508 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
510 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
512 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
514 exgroup = parser.add_mutually_exclusive_group()
515 exgroup.add_argument("--enable-reuse", action="store_true",
516 default=True, dest="enable_reuse",
518 exgroup.add_argument("--disable-reuse", action="store_false",
519 default=True, dest="enable_reuse",
522 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
523 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
524 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
525 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
526 help="Ignore Docker image version when deciding whether to reuse past jobs.",
529 exgroup = parser.add_mutually_exclusive_group()
530 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
531 default=True, dest="submit")
532 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
533 default=True, dest="submit")
534 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
535 dest="create_workflow")
536 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
537 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
539 exgroup = parser.add_mutually_exclusive_group()
540 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
541 default=True, dest="wait")
542 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
543 default=True, dest="wait")
545 parser.add_argument("--api", type=str,
546 default=None, dest="work_api",
547 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
549 parser.add_argument("--compute-checksum", action="store_true", default=False,
550 help="Compute checksum of contents while collecting outputs",
551 dest="compute_checksum")
553 parser.add_argument("--submit-runner-ram", type=int,
554 help="RAM (in MiB) required for the workflow runner job (default 1024)",
557 parser.add_argument("--name", type=str,
558 help="Name to use for workflow execution instance.",
561 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
562 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
568 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
569 cache["http://arvados.org/cwl"] = res.read()
571 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
572 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
573 for n in extnames.names:
574 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
575 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
576 document_loader.idx["http://arvados.org/cwl#"+n] = {}
578 def main(args, stdout, stderr, api_client=None, keep_client=None):
579 parser = arg_parser()
581 job_order_object = None
582 arvargs = parser.parse_args(args)
585 print versionstring()
588 if arvargs.update_workflow:
589 if arvargs.update_workflow.find('-7fd4e-') == 5:
590 want_api = 'containers'
591 elif arvargs.update_workflow.find('-p5p6p-') == 5:
595 if want_api and arvargs.work_api and want_api != arvargs.work_api:
596 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
597 arvargs.update_workflow, want_api, arvargs.work_api))
599 arvargs.work_api = want_api
601 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
602 job_order_object = ({}, "")
607 if api_client is None:
608 api_client=arvados.api('v1', model=OrderedJsonModel())
609 if keep_client is None:
610 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
611 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
612 num_retries=4, output_name=arvargs.output_name,
613 output_tags=arvargs.output_tags)
614 except Exception as e:
619 logger.setLevel(logging.DEBUG)
622 logger.setLevel(logging.WARN)
623 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
626 metrics.setLevel(logging.DEBUG)
627 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
629 arvargs.conformance_test = None
630 arvargs.use_container = True
631 arvargs.relax_path_checks = True
633 return cwltool.main.main(args=arvargs,
636 executor=runner.arv_executor,
637 makeTool=runner.arv_make_tool,
638 versionfunc=versionstring,
639 job_order_object=job_order_object,
640 make_fs_access=partial(CollectionFsAccess,
641 api_client=api_client,
642 keep_client=keep_client),
643 fetcher_constructor=partial(CollectionFetcher,
644 api_client=api_client,
645 keep_client=keep_client),
646 resolver=partial(collectionResolver, api_client))