3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
24 from arvados.keep import KeepClient
25 from arvados.errors import ApiError
27 from .arvcontainer import ArvadosContainer, RunnerContainer
28 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
29 from. runner import Runner, upload_instance
30 from .arvtool import ArvadosCommandTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
33 from .perf import Perf
34 from .pathmapper import FinalOutputPathMapper
35 from ._version import __version__
37 from cwltool.pack import pack
38 from cwltool.process import shortname, UnsupportedRequirement, getListing
39 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
40 from cwltool.draft2tool import compute_checksums
41 from arvados.api import OrderedJsonModel
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45 logger.setLevel(logging.INFO)
48 class ArvCwlRunner(object):
49 """Execute a CWL tool or workflow, submit work (using either jobs or
50 containers API), wait for them to complete, and report output.
54 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
57 self.lock = threading.Lock()
58 self.cond = threading.Condition(self.lock)
59 self.final_output = None
60 self.final_status = None
62 self.num_retries = num_retries
64 self.stop_polling = threading.Event()
67 self.final_output_collection = None
68 self.output_name = output_name
69 self.output_tags = output_tags
70 self.project_uuid = None
72 if keep_client is not None:
73 self.keep_client = keep_client
75 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
78 expected_api = ["jobs", "containers"]
79 for api in expected_api:
81 methods = self.api._rootDesc.get('resources')[api]['methods']
82 if ('httpMethod' in methods['create'] and
83 (work_api == api or work_api is None)):
91 raise Exception("No supported APIs")
93 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
95 def arv_make_tool(self, toolpath_object, **kwargs):
96 kwargs["work_api"] = self.work_api
97 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
99 keep_client=self.keep_client)
100 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
101 return ArvadosCommandTool(self, toolpath_object, **kwargs)
102 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
103 return ArvadosWorkflow(self, toolpath_object, **kwargs)
105 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
107 def output_callback(self, out, processStatus):
108 if processStatus == "success":
109 logger.info("Overall process status is %s", processStatus)
111 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
112 body={"state": "Complete"}).execute(num_retries=self.num_retries)
114 logger.warn("Overall process status is %s", processStatus)
116 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117 body={"state": "Failed"}).execute(num_retries=self.num_retries)
118 self.final_status = processStatus
119 self.final_output = out
121 def on_message(self, event):
122 if "object_uuid" in event:
123 if event["object_uuid"] in self.processes and event["event_type"] == "update":
124 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
125 uuid = event["object_uuid"]
127 j = self.processes[uuid]
128 logger.info("Job %s (%s) is Running", j.name, uuid)
130 j.update_pipeline_component(event["properties"]["new_attributes"])
131 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
132 uuid = event["object_uuid"]
135 j = self.processes[uuid]
136 txt = self.work_api[0].upper() + self.work_api[1:-1]
137 logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
138 with Perf(metrics, "done %s" % j.name):
139 j.done(event["properties"]["new_attributes"])
144 def poll_states(self):
145 """Poll status of jobs or containers listed in the processes dict.
147 Runs in a separate thread.
152 self.stop_polling.wait(15)
153 if self.stop_polling.is_set():
156 keys = self.processes.keys()
160 if self.work_api == "containers":
161 table = self.poll_api.container_requests()
162 elif self.work_api == "jobs":
163 table = self.poll_api.jobs()
166 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
167 except Exception as e:
168 logger.warn("Error checking states on API server: %s", e)
171 for p in proc_states["items"]:
173 "object_uuid": p["uuid"],
174 "event_type": "update",
180 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
182 self.processes.clear()
186 self.stop_polling.set()
188 def get_uploaded(self):
189 return self.uploaded.copy()
191 def add_uploaded(self, src, pair):
192 self.uploaded[src] = pair
194 def check_writable(self, obj):
195 if isinstance(obj, dict):
196 if obj.get("writable"):
197 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
198 for v in obj.itervalues():
199 self.check_writable(v)
200 if isinstance(obj, list):
202 self.check_writable(v)
204 def make_output_collection(self, name, tagsString, outputObj):
205 outputObj = copy.deepcopy(outputObj)
208 def capture(fileobj):
209 files.append(fileobj)
211 adjustDirObjs(outputObj, capture)
212 adjustFileObjs(outputObj, capture)
214 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
216 final = arvados.collection.Collection(api_client=self.api,
217 keep_client=self.keep_client,
218 num_retries=self.num_retries)
221 for k,v in generatemapper.items():
222 if k.startswith("_:"):
223 if v.type == "Directory":
225 if v.type == "CreateFile":
226 with final.open(v.target, "wb") as f:
227 f.write(v.resolved.encode("utf-8"))
230 if not k.startswith("keep:"):
231 raise Exception("Output source is not in keep or a literal")
233 srccollection = sp[0][5:]
234 if srccollection not in srccollections:
236 srccollections[srccollection] = arvados.collection.CollectionReader(
239 keep_client=self.keep_client,
240 num_retries=self.num_retries)
241 except arvados.errors.ArgumentError as e:
242 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
244 reader = srccollections[srccollection]
246 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
247 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
249 logger.warn("While preparing output collection: %s", e)
251 def rewrite(fileobj):
252 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
253 for k in ("basename", "listing", "contents"):
257 adjustDirObjs(outputObj, rewrite)
258 adjustFileObjs(outputObj, rewrite)
260 with final.open("cwl.output.json", "w") as f:
261 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
263 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
265 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
266 final.api_response()["name"],
267 final.manifest_locator())
269 final_uuid = final.manifest_locator()
270 tags = tagsString.split(',')
272 self.api.links().create(body={
273 "head_uuid": final_uuid, "link_class": "tag", "name": tag
274 }).execute(num_retries=self.num_retries)
276 def finalcollection(fileobj):
277 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
279 adjustDirObjs(outputObj, finalcollection)
280 adjustFileObjs(outputObj, finalcollection)
282 return (outputObj, final)
284 def set_crunch_output(self):
285 if self.work_api == "containers":
287 current = self.api.containers().current().execute(num_retries=self.num_retries)
288 except ApiError as e:
289 # Status code 404 just means we're not running in a container.
290 if e.resp.status != 404:
291 logger.info("Getting current container: %s", e)
294 self.api.containers().update(uuid=current['uuid'],
296 'output': self.final_output_collection.portable_data_hash(),
297 }).execute(num_retries=self.num_retries)
298 except Exception as e:
299 logger.info("Setting container output: %s", e)
300 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
301 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
303 'output': self.final_output_collection.portable_data_hash(),
304 'success': self.final_status == "success",
306 }).execute(num_retries=self.num_retries)
308 def arv_executor(self, tool, job_order, **kwargs):
309 self.debug = kwargs.get("debug")
311 tool.visit(self.check_writable)
313 self.project_uuid = kwargs.get("project_uuid")
315 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
317 keep_client=self.keep_client)
318 self.fs_access = make_fs_access(kwargs["basedir"])
320 existing_uuid = kwargs.get("update_workflow")
321 if existing_uuid or kwargs.get("create_workflow"):
322 if self.work_api == "jobs":
323 tmpl = RunnerTemplate(self, tool, job_order,
324 kwargs.get("enable_reuse"),
326 submit_runner_ram=kwargs.get("submit_runner_ram"),
327 name=kwargs.get("name"))
329 # cwltool.main will write our return value to stdout.
332 return upload_workflow(self, tool, job_order,
335 submit_runner_ram=kwargs.get("submit_runner_ram"),
336 name=kwargs.get("name"))
338 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
340 kwargs["make_fs_access"] = make_fs_access
341 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
342 kwargs["use_container"] = True
343 kwargs["tmpdir_prefix"] = "tmp"
344 kwargs["on_error"] = "continue"
345 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
347 if self.work_api == "containers":
348 kwargs["outdir"] = "/var/spool/cwl"
349 kwargs["docker_outdir"] = "/var/spool/cwl"
350 kwargs["tmpdir"] = "/tmp"
351 kwargs["docker_tmpdir"] = "/tmp"
352 elif self.work_api == "jobs":
353 kwargs["outdir"] = "$(task.outdir)"
354 kwargs["docker_outdir"] = "$(task.outdir)"
355 kwargs["tmpdir"] = "$(task.tmpdir)"
357 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
360 if kwargs.get("submit"):
361 if self.work_api == "containers":
362 if tool.tool["class"] == "CommandLineTool":
363 runnerjob = tool.job(job_order,
364 self.output_callback,
367 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
368 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
369 name=kwargs.get("name"))
371 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
372 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
373 name=kwargs.get("name"))
375 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
376 # Create pipeline for local run
377 self.pipeline = self.api.pipeline_instances().create(
379 "owner_uuid": self.project_uuid,
380 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
382 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
383 logger.info("Pipeline instance %s", self.pipeline["uuid"])
385 if runnerjob and not kwargs.get("wait"):
386 runnerjob.run(wait=kwargs.get("wait"))
387 return runnerjob.uuid
389 self.poll_api = arvados.api('v1')
390 self.polling_thread = threading.Thread(target=self.poll_states)
391 self.polling_thread.start()
394 jobiter = iter((runnerjob,))
396 if "cwl_runner_job" in kwargs:
397 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
398 jobiter = tool.job(job_order,
399 self.output_callback,
404 # Will continue to hold the lock for the duration of this code
405 # except when in cond.wait(), at which point on_message can update
406 # job state and process output callbacks.
408 loopperf = Perf(metrics, "jobiter")
410 for runnable in jobiter:
413 if self.stop_polling.is_set():
417 with Perf(metrics, "run"):
418 runnable.run(**kwargs)
423 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
428 while self.processes:
431 except UnsupportedRequirement:
434 if sys.exc_info()[0] is KeyboardInterrupt:
435 logger.error("Interrupted, marking pipeline as failed")
437 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
439 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
440 body={"state": "Failed"}).execute(num_retries=self.num_retries)
441 if runnerjob and runnerjob.uuid and self.work_api == "containers":
442 self.api.container_requests().update(uuid=runnerjob.uuid,
443 body={"priority": "0"}).execute(num_retries=self.num_retries)
446 self.stop_polling.set()
447 self.polling_thread.join()
449 if self.final_status == "UnsupportedRequirement":
450 raise UnsupportedRequirement("Check log for details.")
452 if self.final_output is None:
453 raise WorkflowException("Workflow did not return a result.")
455 if kwargs.get("submit") and isinstance(runnerjob, Runner):
456 logger.info("Final output collection %s", runnerjob.final_output)
458 if self.output_name is None:
459 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
460 if self.output_tags is None:
461 self.output_tags = ""
462 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
463 self.set_crunch_output()
465 if self.final_status != "success":
466 raise WorkflowException("Workflow failed.")
468 if kwargs.get("compute_checksum"):
469 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
470 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
472 return self.final_output
476 """Print version string of key packages for provenance and debugging."""
478 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
479 arvpkg = pkg_resources.require("arvados-python-client")
480 cwlpkg = pkg_resources.require("cwltool")
482 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
483 "arvados-python-client", arvpkg[0].version,
484 "cwltool", cwlpkg[0].version)
487 def arg_parser(): # type: () -> argparse.ArgumentParser
488 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
490 parser.add_argument("--basedir", type=str,
491 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
492 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
493 help="Output directory, default current directory")
495 parser.add_argument("--eval-timeout",
496 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
499 parser.add_argument("--version", action="store_true", help="Print version and exit")
501 exgroup = parser.add_mutually_exclusive_group()
502 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
503 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
504 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
506 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
508 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
510 exgroup = parser.add_mutually_exclusive_group()
511 exgroup.add_argument("--enable-reuse", action="store_true",
512 default=True, dest="enable_reuse",
514 exgroup.add_argument("--disable-reuse", action="store_false",
515 default=True, dest="enable_reuse",
518 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
519 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
520 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
521 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
522 help="Ignore Docker image version when deciding whether to reuse past jobs.",
525 exgroup = parser.add_mutually_exclusive_group()
526 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
527 default=True, dest="submit")
528 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
529 default=True, dest="submit")
530 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
531 dest="create_workflow")
532 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
533 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
535 exgroup = parser.add_mutually_exclusive_group()
536 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
537 default=True, dest="wait")
538 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
539 default=True, dest="wait")
541 parser.add_argument("--api", type=str,
542 default=None, dest="work_api",
543 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
545 parser.add_argument("--compute-checksum", action="store_true", default=False,
546 help="Compute checksum of contents while collecting outputs",
547 dest="compute_checksum")
549 parser.add_argument("--submit-runner-ram", type=int,
550 help="RAM (in MiB) required for the workflow runner job (default 1024)",
553 parser.add_argument("--name", type=str,
554 help="Name to use for workflow execution instance.",
557 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
558 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
564 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
565 cache["http://arvados.org/cwl"] = res.read()
567 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
568 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
569 for n in extnames.names:
570 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
571 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
572 document_loader.idx["http://arvados.org/cwl#"+n] = {}
574 def main(args, stdout, stderr, api_client=None, keep_client=None):
575 parser = arg_parser()
577 job_order_object = None
578 arvargs = parser.parse_args(args)
581 print versionstring()
584 if arvargs.update_workflow:
585 if arvargs.update_workflow.find('-7fd4e-') == 5:
586 want_api = 'containers'
587 elif arvargs.update_workflow.find('-p5p6p-') == 5:
591 if want_api and arvargs.work_api and want_api != arvargs.work_api:
592 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
593 arvargs.update_workflow, want_api, arvargs.work_api))
595 arvargs.work_api = want_api
597 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
598 job_order_object = ({}, "")
603 if api_client is None:
604 api_client=arvados.api('v1', model=OrderedJsonModel())
605 if keep_client is None:
606 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
607 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
608 num_retries=4, output_name=arvargs.output_name,
609 output_tags=arvargs.output_tags)
610 except Exception as e:
615 logger.setLevel(logging.DEBUG)
618 logger.setLevel(logging.WARN)
619 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
622 metrics.setLevel(logging.DEBUG)
623 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
625 arvargs.conformance_test = None
626 arvargs.use_container = True
627 arvargs.relax_path_checks = True
629 return cwltool.main.main(args=arvargs,
632 executor=runner.arv_executor,
633 makeTool=runner.arv_make_tool,
634 versionfunc=versionstring,
635 job_order_object=job_order_object,
636 make_fs_access=partial(CollectionFsAccess,
637 api_client=api_client,
638 keep_client=keep_client),
639 fetcher_constructor=partial(CollectionFetcher,
640 api_client=api_client,
641 keep_client=keep_client),
642 resolver=partial(collectionResolver, api_client))