3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
24 from arvados.keep import KeepClient
25 from arvados.errors import ApiError
27 from .arvcontainer import ArvadosContainer, RunnerContainer
28 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
29 from. runner import Runner, upload_instance
30 from .arvtool import ArvadosCommandTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
33 from .perf import Perf
34 from .pathmapper import FinalOutputPathMapper
35 from ._version import __version__
37 from cwltool.pack import pack
38 from cwltool.process import shortname, UnsupportedRequirement, getListing
39 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
40 from cwltool.draft2tool import compute_checksums
41 from arvados.api import OrderedJsonModel
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45 logger.setLevel(logging.INFO)
48 class ArvCwlRunner(object):
49 """Execute a CWL tool or workflow, submit work (using either jobs or
50 containers API), wait for them to complete, and report output.
54 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
57 self.lock = threading.Lock()
58 self.cond = threading.Condition(self.lock)
59 self.final_output = None
60 self.final_status = None
62 self.num_retries = num_retries
64 self.stop_polling = threading.Event()
67 self.final_output_collection = None
68 self.output_name = output_name
69 self.output_tags = output_tags
70 self.project_uuid = None
72 if keep_client is not None:
73 self.keep_client = keep_client
75 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
78 expected_api = ["jobs", "containers"]
79 for api in expected_api:
81 methods = self.api._rootDesc.get('resources')[api]['methods']
82 if ('httpMethod' in methods['create'] and
83 (work_api == api or work_api is None)):
91 raise Exception("No supported APIs")
93 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
95 def arv_make_tool(self, toolpath_object, **kwargs):
96 kwargs["work_api"] = self.work_api
97 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
99 keep_client=self.keep_client)
100 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
101 return ArvadosCommandTool(self, toolpath_object, **kwargs)
102 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
103 return ArvadosWorkflow(self, toolpath_object, **kwargs)
105 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
107 def output_callback(self, out, processStatus):
108 if processStatus == "success":
109 logger.info("Overall process status is %s", processStatus)
111 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
112 body={"state": "Complete"}).execute(num_retries=self.num_retries)
114 logger.warn("Overall process status is %s", processStatus)
116 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117 body={"state": "Failed"}).execute(num_retries=self.num_retries)
118 self.final_status = processStatus
119 self.final_output = out
121 def on_message(self, event):
122 if "object_uuid" in event:
123 if event["object_uuid"] in self.processes and event["event_type"] == "update":
124 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
125 uuid = event["object_uuid"]
127 j = self.processes[uuid]
128 logger.info("Job %s (%s) is Running", j.name, uuid)
130 j.update_pipeline_component(event["properties"]["new_attributes"])
131 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
132 uuid = event["object_uuid"]
135 j = self.processes[uuid]
136 txt = self.work_api[0].upper() + self.work_api[1:-1]
137 logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
138 with Perf(metrics, "done %s" % j.name):
139 j.done(event["properties"]["new_attributes"])
144 def poll_states(self):
145 """Poll status of jobs or containers listed in the processes dict.
147 Runs in a separate thread.
152 self.stop_polling.wait(15)
153 if self.stop_polling.is_set():
156 keys = self.processes.keys()
160 if self.work_api == "containers":
161 table = self.poll_api.container_requests()
162 elif self.work_api == "jobs":
163 table = self.poll_api.jobs()
166 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
167 except Exception as e:
168 logger.warn("Error checking states on API server: %s", e)
171 for p in proc_states["items"]:
173 "object_uuid": p["uuid"],
174 "event_type": "update",
180 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
182 self.processes.clear()
186 self.stop_polling.set()
188 def get_uploaded(self):
189 return self.uploaded.copy()
191 def add_uploaded(self, src, pair):
192 self.uploaded[src] = pair
194 def check_writable(self, obj):
195 if isinstance(obj, dict):
196 if obj.get("writable"):
197 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
198 for v in obj.itervalues():
199 self.check_writable(v)
200 if isinstance(obj, list):
202 self.check_writable(v)
204 def make_output_collection(self, name, tagsString, outputObj):
205 outputObj = copy.deepcopy(outputObj)
208 def capture(fileobj):
209 files.append(fileobj)
211 adjustDirObjs(outputObj, capture)
212 adjustFileObjs(outputObj, capture)
214 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
216 final = arvados.collection.Collection(api_client=self.api,
217 keep_client=self.keep_client,
218 num_retries=self.num_retries)
221 for k,v in generatemapper.items():
222 if k.startswith("_:"):
223 if v.type == "Directory":
225 if v.type == "CreateFile":
226 with final.open(v.target, "wb") as f:
227 f.write(v.resolved.encode("utf-8"))
230 if not k.startswith("keep:"):
231 raise Exception("Output source is not in keep or a literal")
233 srccollection = sp[0][5:]
234 if srccollection not in srccollections:
236 srccollections[srccollection] = arvados.collection.CollectionReader(
239 keep_client=self.keep_client,
240 num_retries=self.num_retries)
241 except arvados.errors.ArgumentError as e:
242 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
244 reader = srccollections[srccollection]
246 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
247 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
249 logger.warn("While preparing output collection: %s", e)
251 def rewrite(fileobj):
252 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
253 for k in ("basename", "listing", "contents"):
257 adjustDirObjs(outputObj, rewrite)
258 adjustFileObjs(outputObj, rewrite)
260 with final.open("cwl.output.json", "w") as f:
261 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
263 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
265 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
266 final.api_response()["name"],
267 final.manifest_locator())
269 final_uuid = final.manifest_locator()
270 tags = tagsString.split(',')
272 self.api.links().create(body={
273 "head_uuid": final_uuid, "link_class": "tag", "name": tag
274 }).execute(num_retries=self.num_retries)
276 def finalcollection(fileobj):
277 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
279 adjustDirObjs(outputObj, finalcollection)
280 adjustFileObjs(outputObj, finalcollection)
282 return (outputObj, final)
284 def set_crunch_output(self):
285 if self.work_api == "containers":
287 current = self.api.containers().current().execute(num_retries=self.num_retries)
288 except ApiError as e:
289 # Status code 404 just means we're not running in a container.
290 if e.resp.status != 404:
291 logger.info("Getting current container: %s", e)
294 self.api.containers().update(uuid=current['uuid'],
296 'output': self.final_output_collection.portable_data_hash(),
297 }).execute(num_retries=self.num_retries)
298 except Exception as e:
299 logger.info("Setting container output: %s", e)
300 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
301 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
303 'output': self.final_output_collection.portable_data_hash(),
304 'success': self.final_status == "success",
306 }).execute(num_retries=self.num_retries)
308 def arv_executor(self, tool, job_order, **kwargs):
309 self.debug = kwargs.get("debug")
311 tool.visit(self.check_writable)
313 self.project_uuid = kwargs.get("project_uuid")
315 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
317 keep_client=self.keep_client)
318 self.fs_access = make_fs_access(kwargs["basedir"])
320 existing_uuid = kwargs.get("update_workflow")
321 if existing_uuid or kwargs.get("create_workflow"):
322 if self.work_api == "jobs":
323 tmpl = RunnerTemplate(self, tool, job_order,
324 kwargs.get("enable_reuse"),
326 submit_runner_ram=kwargs.get("submit_runner_ram"))
328 # cwltool.main will write our return value to stdout.
331 return upload_workflow(self, tool, job_order,
334 submit_runner_ram=kwargs.get("submit_runner_ram"))
336 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
338 kwargs["make_fs_access"] = make_fs_access
339 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
340 kwargs["use_container"] = True
341 kwargs["tmpdir_prefix"] = "tmp"
342 kwargs["on_error"] = "continue"
343 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
345 if self.work_api == "containers":
346 kwargs["outdir"] = "/var/spool/cwl"
347 kwargs["docker_outdir"] = "/var/spool/cwl"
348 kwargs["tmpdir"] = "/tmp"
349 kwargs["docker_tmpdir"] = "/tmp"
350 elif self.work_api == "jobs":
351 kwargs["outdir"] = "$(task.outdir)"
352 kwargs["docker_outdir"] = "$(task.outdir)"
353 kwargs["tmpdir"] = "$(task.tmpdir)"
355 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
358 if kwargs.get("submit"):
359 if self.work_api == "containers":
360 if tool.tool["class"] == "CommandLineTool":
361 runnerjob = tool.job(job_order,
362 self.output_callback,
365 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
366 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
368 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
369 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
371 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
372 # Create pipeline for local run
373 self.pipeline = self.api.pipeline_instances().create(
375 "owner_uuid": self.project_uuid,
376 "name": shortname(tool.tool["id"]),
378 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
379 logger.info("Pipeline instance %s", self.pipeline["uuid"])
381 if runnerjob and not kwargs.get("wait"):
382 runnerjob.run(wait=kwargs.get("wait"))
383 return runnerjob.uuid
385 self.poll_api = arvados.api('v1')
386 self.polling_thread = threading.Thread(target=self.poll_states)
387 self.polling_thread.start()
390 jobiter = iter((runnerjob,))
392 if "cwl_runner_job" in kwargs:
393 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
394 jobiter = tool.job(job_order,
395 self.output_callback,
400 # Will continue to hold the lock for the duration of this code
401 # except when in cond.wait(), at which point on_message can update
402 # job state and process output callbacks.
404 loopperf = Perf(metrics, "jobiter")
406 for runnable in jobiter:
409 if self.stop_polling.is_set():
413 with Perf(metrics, "run"):
414 runnable.run(**kwargs)
419 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
424 while self.processes:
427 except UnsupportedRequirement:
430 if sys.exc_info()[0] is KeyboardInterrupt:
431 logger.error("Interrupted, marking pipeline as failed")
433 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
435 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
436 body={"state": "Failed"}).execute(num_retries=self.num_retries)
437 if runnerjob and runnerjob.uuid and self.work_api == "containers":
438 self.api.container_requests().update(uuid=runnerjob.uuid,
439 body={"priority": "0"}).execute(num_retries=self.num_retries)
442 self.stop_polling.set()
443 self.polling_thread.join()
445 if self.final_status == "UnsupportedRequirement":
446 raise UnsupportedRequirement("Check log for details.")
448 if self.final_output is None:
449 raise WorkflowException("Workflow did not return a result.")
451 if kwargs.get("submit") and isinstance(runnerjob, Runner):
452 logger.info("Final output collection %s", runnerjob.final_output)
454 if self.output_name is None:
455 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
456 if self.output_tags is None:
457 self.output_tags = ""
458 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
459 self.set_crunch_output()
461 if self.final_status != "success":
462 raise WorkflowException("Workflow failed.")
464 if kwargs.get("compute_checksum"):
465 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
466 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
468 return self.final_output
472 """Print version string of key packages for provenance and debugging."""
474 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
475 arvpkg = pkg_resources.require("arvados-python-client")
476 cwlpkg = pkg_resources.require("cwltool")
478 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
479 "arvados-python-client", arvpkg[0].version,
480 "cwltool", cwlpkg[0].version)
483 def arg_parser(): # type: () -> argparse.ArgumentParser
484 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
486 parser.add_argument("--basedir", type=str,
487 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
488 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
489 help="Output directory, default current directory")
491 parser.add_argument("--eval-timeout",
492 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
495 parser.add_argument("--version", action="store_true", help="Print version and exit")
497 exgroup = parser.add_mutually_exclusive_group()
498 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
499 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
500 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
502 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
504 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
506 exgroup = parser.add_mutually_exclusive_group()
507 exgroup.add_argument("--enable-reuse", action="store_true",
508 default=True, dest="enable_reuse",
510 exgroup.add_argument("--disable-reuse", action="store_false",
511 default=True, dest="enable_reuse",
514 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
515 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
516 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
517 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
518 help="Ignore Docker image version when deciding whether to reuse past jobs.",
521 exgroup = parser.add_mutually_exclusive_group()
522 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
523 default=True, dest="submit")
524 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
525 default=True, dest="submit")
526 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
527 dest="create_workflow")
528 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
529 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
531 exgroup = parser.add_mutually_exclusive_group()
532 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
533 default=True, dest="wait")
534 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
535 default=True, dest="wait")
537 parser.add_argument("--api", type=str,
538 default=None, dest="work_api",
539 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
541 parser.add_argument("--compute-checksum", action="store_true", default=False,
542 help="Compute checksum of contents while collecting outputs",
543 dest="compute_checksum")
545 parser.add_argument("--submit-runner-ram", type=int,
546 help="RAM (in MiB) required for the workflow runner job (default 1024)",
549 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
550 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
556 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
557 cache["http://arvados.org/cwl"] = res.read()
559 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
560 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
561 for n in extnames.names:
562 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
563 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
564 document_loader.idx["http://arvados.org/cwl#"+n] = {}
566 def main(args, stdout, stderr, api_client=None, keep_client=None):
567 parser = arg_parser()
569 job_order_object = None
570 arvargs = parser.parse_args(args)
573 print versionstring()
576 if arvargs.update_workflow:
577 if arvargs.update_workflow.find('-7fd4e-') == 5:
578 want_api = 'containers'
579 elif arvargs.update_workflow.find('-p5p6p-') == 5:
583 if want_api and arvargs.work_api and want_api != arvargs.work_api:
584 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
585 arvargs.update_workflow, want_api, arvargs.work_api))
587 arvargs.work_api = want_api
589 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
590 job_order_object = ({}, "")
595 if api_client is None:
596 api_client=arvados.api('v1', model=OrderedJsonModel())
597 if keep_client is None:
598 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
599 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
600 num_retries=4, output_name=arvargs.output_name,
601 output_tags=arvargs.output_tags)
602 except Exception as e:
607 logger.setLevel(logging.DEBUG)
610 logger.setLevel(logging.WARN)
611 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
614 metrics.setLevel(logging.DEBUG)
615 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
617 arvargs.conformance_test = None
618 arvargs.use_container = True
619 arvargs.relax_path_checks = True
621 return cwltool.main.main(args=arvargs,
624 executor=runner.arv_executor,
625 makeTool=runner.arv_make_tool,
626 versionfunc=versionstring,
627 job_order_object=job_order_object,
628 make_fs_access=partial(CollectionFsAccess,
629 api_client=api_client,
630 keep_client=keep_client),
631 fetcher_constructor=partial(CollectionFetcher,
632 api_client=api_client,
633 keep_client=keep_client),
634 resolver=partial(collectionResolver, api_client))