3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
24 from arvados.keep import KeepClient
25 from arvados.errors import ApiError
27 from .arvcontainer import ArvadosContainer, RunnerContainer
28 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
29 from. runner import Runner, upload_instance
30 from .arvtool import ArvadosCommandTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher
33 from .perf import Perf
34 from .pathmapper import FinalOutputPathMapper
35 from ._version import __version__
37 from cwltool.pack import pack
38 from cwltool.process import shortname, UnsupportedRequirement, getListing
39 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
40 from cwltool.draft2tool import compute_checksums
41 from arvados.api import OrderedJsonModel
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45 logger.setLevel(logging.INFO)
48 class ArvCwlRunner(object):
49 """Execute a CWL tool or workflow, submit work (using either jobs or
50 containers API), wait for them to complete, and report output.
54 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
57 self.lock = threading.Lock()
58 self.cond = threading.Condition(self.lock)
59 self.final_output = None
60 self.final_status = None
62 self.num_retries = num_retries
64 self.stop_polling = threading.Event()
67 self.final_output_collection = None
68 self.output_name = output_name
69 self.output_tags = output_tags
70 self.project_uuid = None
72 if keep_client is not None:
73 self.keep_client = keep_client
75 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
78 expected_api = ["jobs", "containers"]
79 for api in expected_api:
81 methods = self.api._rootDesc.get('resources')[api]['methods']
82 if ('httpMethod' in methods['create'] and
83 (work_api == api or work_api is None)):
91 raise Exception("No supported APIs")
93 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
95 def arv_make_tool(self, toolpath_object, **kwargs):
96 kwargs["work_api"] = self.work_api
97 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
98 return ArvadosCommandTool(self, toolpath_object, **kwargs)
99 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
100 return ArvadosWorkflow(self, toolpath_object, **kwargs)
102 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
104 def output_callback(self, out, processStatus):
105 if processStatus == "success":
106 logger.info("Overall process status is %s", processStatus)
108 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
109 body={"state": "Complete"}).execute(num_retries=self.num_retries)
111 logger.warn("Overall process status is %s", processStatus)
113 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
114 body={"state": "Failed"}).execute(num_retries=self.num_retries)
115 self.final_status = processStatus
116 self.final_output = out
118 def on_message(self, event):
119 if "object_uuid" in event:
120 if event["object_uuid"] in self.processes and event["event_type"] == "update":
121 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
122 uuid = event["object_uuid"]
124 j = self.processes[uuid]
125 logger.info("Job %s (%s) is Running", j.name, uuid)
127 j.update_pipeline_component(event["properties"]["new_attributes"])
128 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
129 uuid = event["object_uuid"]
132 j = self.processes[uuid]
133 txt = self.work_api[0].upper() + self.work_api[1:-1]
134 logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
135 with Perf(metrics, "done %s" % j.name):
136 j.done(event["properties"]["new_attributes"])
141 def poll_states(self):
142 """Poll status of jobs or containers listed in the processes dict.
144 Runs in a separate thread.
149 self.stop_polling.wait(15)
150 if self.stop_polling.is_set():
153 keys = self.processes.keys()
157 if self.work_api == "containers":
158 table = self.poll_api.container_requests()
159 elif self.work_api == "jobs":
160 table = self.poll_api.jobs()
163 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
164 except Exception as e:
165 logger.warn("Error checking states on API server: %s", e)
168 for p in proc_states["items"]:
170 "object_uuid": p["uuid"],
171 "event_type": "update",
177 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
179 self.processes.clear()
183 self.stop_polling.set()
185 def get_uploaded(self):
186 return self.uploaded.copy()
188 def add_uploaded(self, src, pair):
189 self.uploaded[src] = pair
191 def check_writable(self, obj):
192 if isinstance(obj, dict):
193 if obj.get("writable"):
194 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
195 for v in obj.itervalues():
196 self.check_writable(v)
197 if isinstance(obj, list):
199 self.check_writable(v)
201 def make_output_collection(self, name, tagsString, outputObj):
202 outputObj = copy.deepcopy(outputObj)
205 def capture(fileobj):
206 files.append(fileobj)
208 adjustDirObjs(outputObj, capture)
209 adjustFileObjs(outputObj, capture)
211 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
213 final = arvados.collection.Collection(api_client=self.api,
214 keep_client=self.keep_client,
215 num_retries=self.num_retries)
218 for k,v in generatemapper.items():
219 if k.startswith("_:"):
220 if v.type == "Directory":
222 if v.type == "CreateFile":
223 with final.open(v.target, "wb") as f:
224 f.write(v.resolved.encode("utf-8"))
227 if not k.startswith("keep:"):
228 raise Exception("Output source is not in keep or a literal")
230 srccollection = sp[0][5:]
231 if srccollection not in srccollections:
233 srccollections[srccollection] = arvados.collection.CollectionReader(
236 keep_client=self.keep_client,
237 num_retries=self.num_retries)
238 except arvados.errors.ArgumentError as e:
239 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
241 reader = srccollections[srccollection]
243 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
244 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
246 logger.warn("While preparing output collection: %s", e)
248 def rewrite(fileobj):
249 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
250 for k in ("basename", "listing", "contents"):
254 adjustDirObjs(outputObj, rewrite)
255 adjustFileObjs(outputObj, rewrite)
257 with final.open("cwl.output.json", "w") as f:
258 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
260 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
262 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
263 final.api_response()["name"],
264 final.manifest_locator())
266 final_uuid = final.manifest_locator()
267 tags = tagsString.split(',')
269 self.api.links().create(body={
270 "head_uuid": final_uuid, "link_class": "tag", "name": tag
271 }).execute(num_retries=self.num_retries)
273 def finalcollection(fileobj):
274 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
276 adjustDirObjs(outputObj, finalcollection)
277 adjustFileObjs(outputObj, finalcollection)
279 return (outputObj, final)
281 def set_crunch_output(self):
282 if self.work_api == "containers":
284 current = self.api.containers().current().execute(num_retries=self.num_retries)
285 except ApiError as e:
286 # Status code 404 just means we're not running in a container.
287 if e.resp.status != 404:
288 logger.info("Getting current container: %s", e)
291 self.api.containers().update(uuid=current['uuid'],
293 'output': self.final_output_collection.portable_data_hash(),
294 }).execute(num_retries=self.num_retries)
295 except Exception as e:
296 logger.info("Setting container output: %s", e)
297 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
298 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
300 'output': self.final_output_collection.portable_data_hash(),
301 'success': self.final_status == "success",
303 }).execute(num_retries=self.num_retries)
305 def arv_executor(self, tool, job_order, **kwargs):
306 self.debug = kwargs.get("debug")
308 tool.visit(self.check_writable)
310 self.project_uuid = kwargs.get("project_uuid")
312 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
314 keep_client=self.keep_client)
315 self.fs_access = make_fs_access(kwargs["basedir"])
317 existing_uuid = kwargs.get("update_workflow")
318 if existing_uuid or kwargs.get("create_workflow"):
319 if self.work_api == "jobs":
320 tmpl = RunnerTemplate(self, tool, job_order,
321 kwargs.get("enable_reuse"),
323 submit_runner_ram=kwargs.get("submit_runner_ram"))
325 # cwltool.main will write our return value to stdout.
328 return upload_workflow(self, tool, job_order,
331 submit_runner_ram=kwargs.get("submit_runner_ram"))
333 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
335 kwargs["make_fs_access"] = make_fs_access
336 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
337 kwargs["use_container"] = True
338 kwargs["tmpdir_prefix"] = "tmp"
339 kwargs["on_error"] = "continue"
340 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
342 if self.work_api == "containers":
343 kwargs["outdir"] = "/var/spool/cwl"
344 kwargs["docker_outdir"] = "/var/spool/cwl"
345 kwargs["tmpdir"] = "/tmp"
346 kwargs["docker_tmpdir"] = "/tmp"
347 elif self.work_api == "jobs":
348 kwargs["outdir"] = "$(task.outdir)"
349 kwargs["docker_outdir"] = "$(task.outdir)"
350 kwargs["tmpdir"] = "$(task.tmpdir)"
352 upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
355 if kwargs.get("submit"):
356 if self.work_api == "containers":
357 if tool.tool["class"] == "CommandLineTool":
358 runnerjob = tool.job(job_order,
359 self.output_callback,
362 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
363 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
365 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
366 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
368 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
369 # Create pipeline for local run
370 self.pipeline = self.api.pipeline_instances().create(
372 "owner_uuid": self.project_uuid,
373 "name": shortname(tool.tool["id"]),
375 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
376 logger.info("Pipeline instance %s", self.pipeline["uuid"])
378 if runnerjob and not kwargs.get("wait"):
379 runnerjob.run(wait=kwargs.get("wait"))
380 return runnerjob.uuid
382 self.poll_api = arvados.api('v1')
383 self.polling_thread = threading.Thread(target=self.poll_states)
384 self.polling_thread.start()
387 jobiter = iter((runnerjob,))
389 if "cwl_runner_job" in kwargs:
390 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
391 jobiter = tool.job(job_order,
392 self.output_callback,
397 # Will continue to hold the lock for the duration of this code
398 # except when in cond.wait(), at which point on_message can update
399 # job state and process output callbacks.
401 loopperf = Perf(metrics, "jobiter")
403 for runnable in jobiter:
406 if self.stop_polling.is_set():
410 with Perf(metrics, "run"):
411 runnable.run(**kwargs)
416 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
421 while self.processes:
424 except UnsupportedRequirement:
427 if sys.exc_info()[0] is KeyboardInterrupt:
428 logger.error("Interrupted, marking pipeline as failed")
430 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
432 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
433 body={"state": "Failed"}).execute(num_retries=self.num_retries)
434 if runnerjob and runnerjob.uuid and self.work_api == "containers":
435 self.api.container_requests().update(uuid=runnerjob.uuid,
436 body={"priority": "0"}).execute(num_retries=self.num_retries)
439 self.stop_polling.set()
440 self.polling_thread.join()
442 if self.final_status == "UnsupportedRequirement":
443 raise UnsupportedRequirement("Check log for details.")
445 if self.final_output is None:
446 raise WorkflowException("Workflow did not return a result.")
448 if kwargs.get("submit") and isinstance(runnerjob, Runner):
449 logger.info("Final output collection %s", runnerjob.final_output)
451 if self.output_name is None:
452 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
453 if self.output_tags is None:
454 self.output_tags = ""
455 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
456 self.set_crunch_output()
458 if self.final_status != "success":
459 raise WorkflowException("Workflow failed.")
461 if kwargs.get("compute_checksum"):
462 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
463 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
465 return self.final_output
469 """Print version string of key packages for provenance and debugging."""
471 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
472 arvpkg = pkg_resources.require("arvados-python-client")
473 cwlpkg = pkg_resources.require("cwltool")
475 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
476 "arvados-python-client", arvpkg[0].version,
477 "cwltool", cwlpkg[0].version)
480 def arg_parser(): # type: () -> argparse.ArgumentParser
481 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
483 parser.add_argument("--basedir", type=str,
484 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
485 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
486 help="Output directory, default current directory")
488 parser.add_argument("--eval-timeout",
489 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
492 parser.add_argument("--version", action="store_true", help="Print version and exit")
494 exgroup = parser.add_mutually_exclusive_group()
495 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
496 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
497 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
499 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
501 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
503 exgroup = parser.add_mutually_exclusive_group()
504 exgroup.add_argument("--enable-reuse", action="store_true",
505 default=True, dest="enable_reuse",
507 exgroup.add_argument("--disable-reuse", action="store_false",
508 default=True, dest="enable_reuse",
511 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
512 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
513 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
514 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
515 help="Ignore Docker image version when deciding whether to reuse past jobs.",
518 exgroup = parser.add_mutually_exclusive_group()
519 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
520 default=True, dest="submit")
521 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
522 default=True, dest="submit")
523 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
524 dest="create_workflow")
525 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
526 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
528 exgroup = parser.add_mutually_exclusive_group()
529 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
530 default=True, dest="wait")
531 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
532 default=True, dest="wait")
534 parser.add_argument("--api", type=str,
535 default=None, dest="work_api",
536 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
538 parser.add_argument("--compute-checksum", action="store_true", default=False,
539 help="Compute checksum of contents while collecting outputs",
540 dest="compute_checksum")
542 parser.add_argument("--submit-runner-ram", type=int,
543 help="RAM (in MiB) required for the workflow runner job (default 1024)",
546 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
547 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
553 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
554 cache["http://arvados.org/cwl"] = res.read()
556 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
557 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
558 for n in extnames.names:
559 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
560 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
561 document_loader.idx["http://arvados.org/cwl#"+n] = {}
563 def main(args, stdout, stderr, api_client=None, keep_client=None):
564 parser = arg_parser()
566 job_order_object = None
567 arvargs = parser.parse_args(args)
569 if arvargs.update_workflow:
570 if arvargs.update_workflow.find('-7fd4e-') == 5:
571 want_api = 'containers'
572 elif arvargs.update_workflow.find('-p5p6p-') == 5:
576 if want_api and arvargs.work_api and want_api != arvargs.work_api:
577 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
578 arvargs.update_workflow, want_api, arvargs.work_api))
580 arvargs.work_api = want_api
582 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
583 job_order_object = ({}, "")
588 if api_client is None:
589 api_client=arvados.api('v1', model=OrderedJsonModel())
590 if keep_client is None:
591 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
592 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
593 num_retries=4, output_name=arvargs.output_name,
594 output_tags=arvargs.output_tags)
595 except Exception as e:
600 logger.setLevel(logging.DEBUG)
603 logger.setLevel(logging.WARN)
604 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
607 metrics.setLevel(logging.DEBUG)
608 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
610 arvargs.conformance_test = None
611 arvargs.use_container = True
612 arvargs.relax_path_checks = True
614 return cwltool.main.main(args=arvargs,
617 executor=runner.arv_executor,
618 makeTool=runner.arv_make_tool,
619 versionfunc=versionstring,
620 job_order_object=job_order_object,
621 make_fs_access=partial(CollectionFsAccess,
622 api_client=api_client,
623 keep_client=keep_client),
624 fetcher_constructor=partial(CollectionFetcher,
625 api_client=api_client,
626 keep_client=keep_client))