3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
21 from schema_salad.sourceline import SourceLine
25 from arvados.keep import KeepClient
26 from arvados.errors import ApiError
28 from .arvcontainer import ArvadosContainer, RunnerContainer
29 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
30 from. runner import Runner, upload_instance
31 from .arvtool import ArvadosCommandTool
32 from .arvworkflow import ArvadosWorkflow, upload_workflow
33 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
34 from .perf import Perf
35 from .pathmapper import FinalOutputPathMapper
36 from ._version import __version__
38 from cwltool.pack import pack
39 from cwltool.process import shortname, UnsupportedRequirement, getListing
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
41 from cwltool.draft2tool import compute_checksums
42 from arvados.api import OrderedJsonModel
44 logger = logging.getLogger('arvados.cwl-runner')
45 metrics = logging.getLogger('arvados.cwl-runner.metrics')
46 logger.setLevel(logging.INFO)
49 class ArvCwlRunner(object):
50 """Execute a CWL tool or workflow, submit work (using either jobs or
51 containers API), wait for them to complete, and report output.
55 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
58 self.lock = threading.Lock()
59 self.cond = threading.Condition(self.lock)
60 self.final_output = None
61 self.final_status = None
63 self.num_retries = num_retries
65 self.stop_polling = threading.Event()
68 self.final_output_collection = None
69 self.output_name = output_name
70 self.output_tags = output_tags
71 self.project_uuid = None
73 if keep_client is not None:
74 self.keep_client = keep_client
76 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
79 expected_api = ["jobs", "containers"]
80 for api in expected_api:
82 methods = self.api._rootDesc.get('resources')[api]['methods']
83 if ('httpMethod' in methods['create'] and
84 (work_api == api or work_api is None)):
92 raise Exception("No supported APIs")
94 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
96 def arv_make_tool(self, toolpath_object, **kwargs):
97 kwargs["work_api"] = self.work_api
98 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
100 keep_client=self.keep_client)
101 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
102 return ArvadosCommandTool(self, toolpath_object, **kwargs)
103 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
104 return ArvadosWorkflow(self, toolpath_object, **kwargs)
106 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
108 def output_callback(self, out, processStatus):
109 if processStatus == "success":
110 logger.info("Overall process status is %s", processStatus)
112 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
113 body={"state": "Complete"}).execute(num_retries=self.num_retries)
115 logger.warn("Overall process status is %s", processStatus)
117 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
118 body={"state": "Failed"}).execute(num_retries=self.num_retries)
119 self.final_status = processStatus
120 self.final_output = out
122 def on_message(self, event):
123 if "object_uuid" in event:
124 if event["object_uuid"] in self.processes and event["event_type"] == "update":
125 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
126 uuid = event["object_uuid"]
128 j = self.processes[uuid]
129 logger.info("Job %s (%s) is Running", j.name, uuid)
131 j.update_pipeline_component(event["properties"]["new_attributes"])
132 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
133 uuid = event["object_uuid"]
136 j = self.processes[uuid]
137 txt = self.work_api[0].upper() + self.work_api[1:-1]
138 logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
139 with Perf(metrics, "done %s" % j.name):
140 j.done(event["properties"]["new_attributes"])
145 def poll_states(self):
146 """Poll status of jobs or containers listed in the processes dict.
148 Runs in a separate thread.
153 self.stop_polling.wait(15)
154 if self.stop_polling.is_set():
157 keys = self.processes.keys()
161 if self.work_api == "containers":
162 table = self.poll_api.container_requests()
163 elif self.work_api == "jobs":
164 table = self.poll_api.jobs()
167 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
168 except Exception as e:
169 logger.warn("Error checking states on API server: %s", e)
172 for p in proc_states["items"]:
174 "object_uuid": p["uuid"],
175 "event_type": "update",
181 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
183 self.processes.clear()
187 self.stop_polling.set()
189 def get_uploaded(self):
190 return self.uploaded.copy()
192 def add_uploaded(self, src, pair):
193 self.uploaded[src] = pair
195 def check_writable(self, obj):
196 if isinstance(obj, dict):
197 if obj.get("writable"):
198 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
199 for v in obj.itervalues():
200 self.check_writable(v)
201 if isinstance(obj, list):
203 self.check_writable(v)
205 def make_output_collection(self, name, tagsString, outputObj):
206 outputObj = copy.deepcopy(outputObj)
209 def capture(fileobj):
210 files.append(fileobj)
212 adjustDirObjs(outputObj, capture)
213 adjustFileObjs(outputObj, capture)
215 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
217 final = arvados.collection.Collection(api_client=self.api,
218 keep_client=self.keep_client,
219 num_retries=self.num_retries)
222 for k,v in generatemapper.items():
223 if k.startswith("_:"):
224 if v.type == "Directory":
226 if v.type == "CreateFile":
227 with final.open(v.target, "wb") as f:
228 f.write(v.resolved.encode("utf-8"))
231 if not k.startswith("keep:"):
232 raise Exception("Output source is not in keep or a literal")
234 srccollection = sp[0][5:]
235 if srccollection not in srccollections:
237 srccollections[srccollection] = arvados.collection.CollectionReader(
240 keep_client=self.keep_client,
241 num_retries=self.num_retries)
242 except arvados.errors.ArgumentError as e:
243 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
245 reader = srccollections[srccollection]
247 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
248 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
250 logger.warn("While preparing output collection: %s", e)
252 def rewrite(fileobj):
253 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
254 for k in ("basename", "listing", "contents"):
258 adjustDirObjs(outputObj, rewrite)
259 adjustFileObjs(outputObj, rewrite)
261 with final.open("cwl.output.json", "w") as f:
262 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
264 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
266 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
267 final.api_response()["name"],
268 final.manifest_locator())
270 final_uuid = final.manifest_locator()
271 tags = tagsString.split(',')
273 self.api.links().create(body={
274 "head_uuid": final_uuid, "link_class": "tag", "name": tag
275 }).execute(num_retries=self.num_retries)
277 def finalcollection(fileobj):
278 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
280 adjustDirObjs(outputObj, finalcollection)
281 adjustFileObjs(outputObj, finalcollection)
283 return (outputObj, final)
285 def set_crunch_output(self):
286 if self.work_api == "containers":
288 current = self.api.containers().current().execute(num_retries=self.num_retries)
289 except ApiError as e:
290 # Status code 404 just means we're not running in a container.
291 if e.resp.status != 404:
292 logger.info("Getting current container: %s", e)
295 self.api.containers().update(uuid=current['uuid'],
297 'output': self.final_output_collection.portable_data_hash(),
298 }).execute(num_retries=self.num_retries)
299 except Exception as e:
300 logger.info("Setting container output: %s", e)
301 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
302 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
304 'output': self.final_output_collection.portable_data_hash(),
305 'success': self.final_status == "success",
307 }).execute(num_retries=self.num_retries)
309 def arv_executor(self, tool, job_order, **kwargs):
310 self.debug = kwargs.get("debug")
312 tool.visit(self.check_writable)
314 self.project_uuid = kwargs.get("project_uuid")
316 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
318 keep_client=self.keep_client)
319 self.fs_access = make_fs_access(kwargs["basedir"])
321 existing_uuid = kwargs.get("update_workflow")
322 if existing_uuid or kwargs.get("create_workflow"):
323 if self.work_api == "jobs":
324 tmpl = RunnerTemplate(self, tool, job_order,
325 kwargs.get("enable_reuse"),
327 submit_runner_ram=kwargs.get("submit_runner_ram"),
328 name=kwargs.get("name"))
330 # cwltool.main will write our return value to stdout.
333 return upload_workflow(self, tool, job_order,
336 submit_runner_ram=kwargs.get("submit_runner_ram"),
337 name=kwargs.get("name"))
339 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
341 kwargs["make_fs_access"] = make_fs_access
342 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
343 kwargs["use_container"] = True
344 kwargs["tmpdir_prefix"] = "tmp"
345 kwargs["on_error"] = "continue"
346 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
348 if not kwargs["name"]:
351 if self.work_api == "containers":
352 kwargs["outdir"] = "/var/spool/cwl"
353 kwargs["docker_outdir"] = "/var/spool/cwl"
354 kwargs["tmpdir"] = "/tmp"
355 kwargs["docker_tmpdir"] = "/tmp"
356 elif self.work_api == "jobs":
357 kwargs["outdir"] = "$(task.outdir)"
358 kwargs["docker_outdir"] = "$(task.outdir)"
359 kwargs["tmpdir"] = "$(task.tmpdir)"
361 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
364 if kwargs.get("submit"):
365 if self.work_api == "containers":
366 if tool.tool["class"] == "CommandLineTool":
367 kwargs["runnerjob"] = tool.tool["id"]
368 runnerjob = tool.job(job_order,
369 self.output_callback,
372 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
373 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
374 name=kwargs.get("name"))
376 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
377 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
378 name=kwargs.get("name"))
380 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
381 # Create pipeline for local run
382 self.pipeline = self.api.pipeline_instances().create(
384 "owner_uuid": self.project_uuid,
385 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
387 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
388 logger.info("Pipeline instance %s", self.pipeline["uuid"])
390 if runnerjob and not kwargs.get("wait"):
391 runnerjob.run(wait=kwargs.get("wait"))
392 return runnerjob.uuid
394 self.poll_api = arvados.api('v1')
395 self.polling_thread = threading.Thread(target=self.poll_states)
396 self.polling_thread.start()
399 jobiter = iter((runnerjob,))
401 if "cwl_runner_job" in kwargs:
402 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
403 jobiter = tool.job(job_order,
404 self.output_callback,
409 # Will continue to hold the lock for the duration of this code
410 # except when in cond.wait(), at which point on_message can update
411 # job state and process output callbacks.
413 loopperf = Perf(metrics, "jobiter")
415 for runnable in jobiter:
418 if self.stop_polling.is_set():
422 with Perf(metrics, "run"):
423 runnable.run(**kwargs)
428 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
433 while self.processes:
436 except UnsupportedRequirement:
439 if sys.exc_info()[0] is KeyboardInterrupt:
440 logger.error("Interrupted, marking pipeline as failed")
442 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
444 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
445 body={"state": "Failed"}).execute(num_retries=self.num_retries)
446 if runnerjob and runnerjob.uuid and self.work_api == "containers":
447 self.api.container_requests().update(uuid=runnerjob.uuid,
448 body={"priority": "0"}).execute(num_retries=self.num_retries)
451 self.stop_polling.set()
452 self.polling_thread.join()
454 if self.final_status == "UnsupportedRequirement":
455 raise UnsupportedRequirement("Check log for details.")
457 if self.final_output is None:
458 raise WorkflowException("Workflow did not return a result.")
460 if kwargs.get("submit") and isinstance(runnerjob, Runner):
461 logger.info("Final output collection %s", runnerjob.final_output)
463 if self.output_name is None:
464 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
465 if self.output_tags is None:
466 self.output_tags = ""
467 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
468 self.set_crunch_output()
470 if self.final_status != "success":
471 raise WorkflowException("Workflow failed.")
473 if kwargs.get("compute_checksum"):
474 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
475 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
477 return self.final_output
481 """Print version string of key packages for provenance and debugging."""
483 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
484 arvpkg = pkg_resources.require("arvados-python-client")
485 cwlpkg = pkg_resources.require("cwltool")
487 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
488 "arvados-python-client", arvpkg[0].version,
489 "cwltool", cwlpkg[0].version)
492 def arg_parser(): # type: () -> argparse.ArgumentParser
493 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
495 parser.add_argument("--basedir", type=str,
496 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
497 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
498 help="Output directory, default current directory")
500 parser.add_argument("--eval-timeout",
501 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
504 parser.add_argument("--version", action="store_true", help="Print version and exit")
506 exgroup = parser.add_mutually_exclusive_group()
507 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
508 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
509 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
511 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
513 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
515 exgroup = parser.add_mutually_exclusive_group()
516 exgroup.add_argument("--enable-reuse", action="store_true",
517 default=True, dest="enable_reuse",
519 exgroup.add_argument("--disable-reuse", action="store_false",
520 default=True, dest="enable_reuse",
523 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
524 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
525 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
526 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
527 help="Ignore Docker image version when deciding whether to reuse past jobs.",
530 exgroup = parser.add_mutually_exclusive_group()
531 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
532 default=True, dest="submit")
533 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
534 default=True, dest="submit")
535 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
536 dest="create_workflow")
537 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
538 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
540 exgroup = parser.add_mutually_exclusive_group()
541 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
542 default=True, dest="wait")
543 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
544 default=True, dest="wait")
546 parser.add_argument("--api", type=str,
547 default=None, dest="work_api",
548 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
550 parser.add_argument("--compute-checksum", action="store_true", default=False,
551 help="Compute checksum of contents while collecting outputs",
552 dest="compute_checksum")
554 parser.add_argument("--submit-runner-ram", type=int,
555 help="RAM (in MiB) required for the workflow runner job (default 1024)",
558 parser.add_argument("--name", type=str,
559 help="Name to use for workflow execution instance.",
562 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
563 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
569 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
570 cache["http://arvados.org/cwl"] = res.read()
572 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
573 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
574 for n in extnames.names:
575 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
576 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
577 document_loader.idx["http://arvados.org/cwl#"+n] = {}
579 def main(args, stdout, stderr, api_client=None, keep_client=None):
580 parser = arg_parser()
582 job_order_object = None
583 arvargs = parser.parse_args(args)
586 print versionstring()
589 if arvargs.update_workflow:
590 if arvargs.update_workflow.find('-7fd4e-') == 5:
591 want_api = 'containers'
592 elif arvargs.update_workflow.find('-p5p6p-') == 5:
596 if want_api and arvargs.work_api and want_api != arvargs.work_api:
597 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
598 arvargs.update_workflow, want_api, arvargs.work_api))
600 arvargs.work_api = want_api
602 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
603 job_order_object = ({}, "")
608 if api_client is None:
609 api_client=arvados.api('v1', model=OrderedJsonModel())
610 if keep_client is None:
611 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
612 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
613 num_retries=4, output_name=arvargs.output_name,
614 output_tags=arvargs.output_tags)
615 except Exception as e:
620 logger.setLevel(logging.DEBUG)
623 logger.setLevel(logging.WARN)
624 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
627 metrics.setLevel(logging.DEBUG)
628 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
630 arvargs.conformance_test = None
631 arvargs.use_container = True
632 arvargs.relax_path_checks = True
633 arvargs.validate = None
635 return cwltool.main.main(args=arvargs,
638 executor=runner.arv_executor,
639 makeTool=runner.arv_make_tool,
640 versionfunc=versionstring,
641 job_order_object=job_order_object,
642 make_fs_access=partial(CollectionFsAccess,
643 api_client=api_client,
644 keep_client=keep_client),
645 fetcher_constructor=partial(CollectionFetcher,
646 api_client=api_client,
647 keep_client=keep_client),
648 resolver=partial(collectionResolver, api_client))