closes #3821
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, getListing
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77
78         if keep_client is not None:
79             self.keep_client = keep_client
80         else:
81             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
82
83         self.work_api = None
84         expected_api = ["jobs", "containers"]
85         for api in expected_api:
86             try:
87                 methods = self.api._rootDesc.get('resources')[api]['methods']
88                 if ('httpMethod' in methods['create'] and
89                     (work_api == api or work_api is None)):
90                     self.work_api = api
91                     break
92             except KeyError:
93                 pass
94
95         if not self.work_api:
96             if work_api is None:
97                 raise Exception("No supported APIs")
98             else:
99                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
100
101     def arv_make_tool(self, toolpath_object, **kwargs):
102         kwargs["work_api"] = self.work_api
103         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
104                                                 api_client=self.api,
105                                                 keep_client=self.keep_client)
106         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
107             return ArvadosCommandTool(self, toolpath_object, **kwargs)
108         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
109             return ArvadosWorkflow(self, toolpath_object, **kwargs)
110         else:
111             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
112
113     def output_callback(self, out, processStatus):
114         if processStatus == "success":
115             logger.info("Overall process status is %s", processStatus)
116             if self.pipeline:
117                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
118                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
119         else:
120             logger.warn("Overall process status is %s", processStatus)
121             if self.pipeline:
122                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
123                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
124         self.final_status = processStatus
125         self.final_output = out
126
127     def on_message(self, event):
128         if "object_uuid" in event:
129             if event["object_uuid"] in self.processes and event["event_type"] == "update":
130                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
131                     uuid = event["object_uuid"]
132                     with self.lock:
133                         j = self.processes[uuid]
134                         logger.info("%s %s is Running", self.label(j), uuid)
135                         j.running = True
136                         j.update_pipeline_component(event["properties"]["new_attributes"])
137                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
138                     uuid = event["object_uuid"]
139                     try:
140                         self.cond.acquire()
141                         j = self.processes[uuid]
142                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
143                         with Perf(metrics, "done %s" % j.name):
144                             j.done(event["properties"]["new_attributes"])
145                         self.cond.notify()
146                     finally:
147                         self.cond.release()
148
149     def label(self, obj):
150         return "[%s %s]" % (self.work_api[0:-1], obj.name)
151
152     def poll_states(self):
153         """Poll status of jobs or containers listed in the processes dict.
154
155         Runs in a separate thread.
156         """
157
158         try:
159             while True:
160                 self.stop_polling.wait(15)
161                 if self.stop_polling.is_set():
162                     break
163                 with self.lock:
164                     keys = self.processes.keys()
165                 if not keys:
166                     continue
167
168                 if self.work_api == "containers":
169                     table = self.poll_api.container_requests()
170                 elif self.work_api == "jobs":
171                     table = self.poll_api.jobs()
172
173                 try:
174                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
175                 except Exception as e:
176                     logger.warn("Error checking states on API server: %s", e)
177                     continue
178
179                 for p in proc_states["items"]:
180                     self.on_message({
181                         "object_uuid": p["uuid"],
182                         "event_type": "update",
183                         "properties": {
184                             "new_attributes": p
185                         }
186                     })
187         except:
188             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
189             self.cond.acquire()
190             self.processes.clear()
191             self.cond.notify()
192             self.cond.release()
193         finally:
194             self.stop_polling.set()
195
196     def get_uploaded(self):
197         return self.uploaded.copy()
198
199     def add_uploaded(self, src, pair):
200         self.uploaded[src] = pair
201
202     def check_features(self, obj):
203         if isinstance(obj, dict):
204             if obj.get("writable"):
205                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
206             if obj.get("class") == "CommandLineTool":
207                 if self.work_api == "containers":
208                     if obj.get("stdin"):
209                         raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
210                     if obj.get("stderr"):
211                         raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
212             if obj.get("class") == "DockerRequirement":
213                 if obj.get("dockerOutputDirectory"):
214                     # TODO: can be supported by containers API, but not jobs API.
215                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
216                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
217             for v in obj.itervalues():
218                 self.check_features(v)
219         elif isinstance(obj, list):
220             for i,v in enumerate(obj):
221                 with SourceLine(obj, i, UnsupportedRequirement):
222                     self.check_features(v)
223
224     def make_output_collection(self, name, tagsString, outputObj):
225         outputObj = copy.deepcopy(outputObj)
226
227         files = []
228         def capture(fileobj):
229             files.append(fileobj)
230
231         adjustDirObjs(outputObj, capture)
232         adjustFileObjs(outputObj, capture)
233
234         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
235
236         final = arvados.collection.Collection(api_client=self.api,
237                                               keep_client=self.keep_client,
238                                               num_retries=self.num_retries)
239
240         srccollections = {}
241         for k,v in generatemapper.items():
242             if k.startswith("_:"):
243                 if v.type == "Directory":
244                     continue
245                 if v.type == "CreateFile":
246                     with final.open(v.target, "wb") as f:
247                         f.write(v.resolved.encode("utf-8"))
248                     continue
249
250             if not k.startswith("keep:"):
251                 raise Exception("Output source is not in keep or a literal")
252             sp = k.split("/")
253             srccollection = sp[0][5:]
254             if srccollection not in srccollections:
255                 try:
256                     srccollections[srccollection] = arvados.collection.CollectionReader(
257                         srccollection,
258                         api_client=self.api,
259                         keep_client=self.keep_client,
260                         num_retries=self.num_retries)
261                 except arvados.errors.ArgumentError as e:
262                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
263                     raise
264             reader = srccollections[srccollection]
265             try:
266                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
267                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
268             except IOError as e:
269                 logger.warn("While preparing output collection: %s", e)
270
271         def rewrite(fileobj):
272             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
273             for k in ("basename", "listing", "contents"):
274                 if k in fileobj:
275                     del fileobj[k]
276
277         adjustDirObjs(outputObj, rewrite)
278         adjustFileObjs(outputObj, rewrite)
279
280         with final.open("cwl.output.json", "w") as f:
281             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
282
283         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
284
285         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
286                     final.api_response()["name"],
287                     final.manifest_locator())
288
289         final_uuid = final.manifest_locator()
290         tags = tagsString.split(',')
291         for tag in tags:
292              self.api.links().create(body={
293                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
294                 }).execute(num_retries=self.num_retries)
295
296         def finalcollection(fileobj):
297             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
298
299         adjustDirObjs(outputObj, finalcollection)
300         adjustFileObjs(outputObj, finalcollection)
301
302         return (outputObj, final)
303
304     def set_crunch_output(self):
305         if self.work_api == "containers":
306             try:
307                 current = self.api.containers().current().execute(num_retries=self.num_retries)
308             except ApiError as e:
309                 # Status code 404 just means we're not running in a container.
310                 if e.resp.status != 404:
311                     logger.info("Getting current container: %s", e)
312                 return
313             try:
314                 self.api.containers().update(uuid=current['uuid'],
315                                              body={
316                                                  'output': self.final_output_collection.portable_data_hash(),
317                                              }).execute(num_retries=self.num_retries)
318                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
319                                               body={
320                                                   'is_trashed': True
321                                               }).execute(num_retries=self.num_retries)
322             except Exception as e:
323                 logger.info("Setting container output: %s", e)
324         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
325             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
326                                    body={
327                                        'output': self.final_output_collection.portable_data_hash(),
328                                        'success': self.final_status == "success",
329                                        'progress':1.0
330                                    }).execute(num_retries=self.num_retries)
331
332     def arv_executor(self, tool, job_order, **kwargs):
333         self.debug = kwargs.get("debug")
334
335         tool.visit(self.check_features)
336
337         self.project_uuid = kwargs.get("project_uuid")
338         self.pipeline = None
339         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
340                                                                  api_client=self.api,
341                                                                  keep_client=self.keep_client)
342         self.fs_access = make_fs_access(kwargs["basedir"])
343
344         if not kwargs.get("name"):
345             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
346
347         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
348         # Also uploads docker images.
349         upload_workflow_deps(self, tool)
350
351         # Reload tool object which may have been updated by
352         # upload_workflow_deps
353         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
354                                   makeTool=self.arv_make_tool,
355                                   loader=tool.doc_loader,
356                                   avsc_names=tool.doc_schema,
357                                   metadata=tool.metadata)
358
359         # Upload local file references in the job order.
360         job_order = upload_job_order(self, "%s input" % kwargs["name"],
361                                      tool, job_order)
362
363         existing_uuid = kwargs.get("update_workflow")
364         if existing_uuid or kwargs.get("create_workflow"):
365             # Create a pipeline template or workflow record and exit.
366             if self.work_api == "jobs":
367                 tmpl = RunnerTemplate(self, tool, job_order,
368                                       kwargs.get("enable_reuse"),
369                                       uuid=existing_uuid,
370                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
371                                       name=kwargs["name"])
372                 tmpl.save()
373                 # cwltool.main will write our return value to stdout.
374                 return (tmpl.uuid, "success")
375             elif self.work_api == "containers":
376                 return (upload_workflow(self, tool, job_order,
377                                         self.project_uuid,
378                                         uuid=existing_uuid,
379                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
380                                         name=kwargs["name"]),
381                         "success")
382
383         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
384
385         kwargs["make_fs_access"] = make_fs_access
386         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
387         kwargs["use_container"] = True
388         kwargs["tmpdir_prefix"] = "tmp"
389         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
390
391         if self.work_api == "containers":
392             kwargs["outdir"] = "/var/spool/cwl"
393             kwargs["docker_outdir"] = "/var/spool/cwl"
394             kwargs["tmpdir"] = "/tmp"
395             kwargs["docker_tmpdir"] = "/tmp"
396         elif self.work_api == "jobs":
397             kwargs["outdir"] = "$(task.outdir)"
398             kwargs["docker_outdir"] = "$(task.outdir)"
399             kwargs["tmpdir"] = "$(task.tmpdir)"
400
401         runnerjob = None
402         if kwargs.get("submit"):
403             # Submit a runner job to run the workflow for us.
404             if self.work_api == "containers":
405                 if tool.tool["class"] == "CommandLineTool":
406                     kwargs["runnerjob"] = tool.tool["id"]
407                     upload_dependencies(self,
408                                         kwargs["name"],
409                                         tool.doc_loader,
410                                         tool.tool,
411                                         tool.tool["id"],
412                                         False)
413                     runnerjob = tool.job(job_order,
414                                          self.output_callback,
415                                          **kwargs).next()
416                 else:
417                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
418                                                 self.output_name,
419                                                 self.output_tags,
420                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
421                                                 name=kwargs.get("name"),
422                                                 on_error=kwargs.get("on_error"),
423                                                 submit_runner_image=kwargs.get("submit_runner_image"))
424             elif self.work_api == "jobs":
425                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
426                                       self.output_name,
427                                       self.output_tags,
428                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
429                                       name=kwargs.get("name"),
430                                       on_error=kwargs.get("on_error"),
431                                       submit_runner_image=kwargs.get("submit_runner_image"))
432
433         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
434             # Create pipeline for local run
435             self.pipeline = self.api.pipeline_instances().create(
436                 body={
437                     "owner_uuid": self.project_uuid,
438                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
439                     "components": {},
440                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
441             logger.info("Pipeline instance %s", self.pipeline["uuid"])
442
443         if runnerjob and not kwargs.get("wait"):
444             runnerjob.run(wait=kwargs.get("wait"))
445             return (runnerjob.uuid, "success")
446
447         self.poll_api = arvados.api('v1')
448         self.polling_thread = threading.Thread(target=self.poll_states)
449         self.polling_thread.start()
450
451         if runnerjob:
452             jobiter = iter((runnerjob,))
453         else:
454             if "cwl_runner_job" in kwargs:
455                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
456             jobiter = tool.job(job_order,
457                                self.output_callback,
458                                **kwargs)
459
460         try:
461             self.cond.acquire()
462             # Will continue to hold the lock for the duration of this code
463             # except when in cond.wait(), at which point on_message can update
464             # job state and process output callbacks.
465
466             loopperf = Perf(metrics, "jobiter")
467             loopperf.__enter__()
468             for runnable in jobiter:
469                 loopperf.__exit__()
470
471                 if self.stop_polling.is_set():
472                     break
473
474                 if runnable:
475                     with Perf(metrics, "run"):
476                         runnable.run(**kwargs)
477                 else:
478                     if self.processes:
479                         self.cond.wait(1)
480                     else:
481                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
482                         break
483                 loopperf.__enter__()
484             loopperf.__exit__()
485
486             while self.processes:
487                 self.cond.wait(1)
488
489         except UnsupportedRequirement:
490             raise
491         except:
492             if sys.exc_info()[0] is KeyboardInterrupt:
493                 logger.error("Interrupted, marking pipeline as failed")
494             else:
495                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
496             if self.pipeline:
497                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
498                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
499             if runnerjob and runnerjob.uuid and self.work_api == "containers":
500                 self.api.container_requests().update(uuid=runnerjob.uuid,
501                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
502         finally:
503             self.cond.release()
504             self.stop_polling.set()
505             self.polling_thread.join()
506
507         if self.final_status == "UnsupportedRequirement":
508             raise UnsupportedRequirement("Check log for details.")
509
510         if self.final_output is None:
511             raise WorkflowException("Workflow did not return a result.")
512
513         if kwargs.get("submit") and isinstance(runnerjob, Runner):
514             logger.info("Final output collection %s", runnerjob.final_output)
515         else:
516             if self.output_name is None:
517                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
518             if self.output_tags is None:
519                 self.output_tags = ""
520             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
521             self.set_crunch_output()
522
523         if kwargs.get("compute_checksum"):
524             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
525             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
526
527         return (self.final_output, self.final_status)
528
529
530 def versionstring():
531     """Print version string of key packages for provenance and debugging."""
532
533     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
534     arvpkg = pkg_resources.require("arvados-python-client")
535     cwlpkg = pkg_resources.require("cwltool")
536
537     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
538                                     "arvados-python-client", arvpkg[0].version,
539                                     "cwltool", cwlpkg[0].version)
540
541
542 def arg_parser():  # type: () -> argparse.ArgumentParser
543     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
544
545     parser.add_argument("--basedir", type=str,
546                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
547     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
548                         help="Output directory, default current directory")
549
550     parser.add_argument("--eval-timeout",
551                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
552                         type=float,
553                         default=20)
554     parser.add_argument("--version", action="store_true", help="Print version and exit")
555
556     exgroup = parser.add_mutually_exclusive_group()
557     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
558     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
559     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
560
561     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
562
563     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
564
565     exgroup = parser.add_mutually_exclusive_group()
566     exgroup.add_argument("--enable-reuse", action="store_true",
567                         default=True, dest="enable_reuse",
568                         help="")
569     exgroup.add_argument("--disable-reuse", action="store_false",
570                         default=True, dest="enable_reuse",
571                         help="")
572
573     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
574     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
575     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
576     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
577                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
578                         default=False)
579
580     exgroup = parser.add_mutually_exclusive_group()
581     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
582                         default=True, dest="submit")
583     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
584                         default=True, dest="submit")
585     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
586                          dest="create_workflow")
587     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
588     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
589
590     exgroup = parser.add_mutually_exclusive_group()
591     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
592                         default=True, dest="wait")
593     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
594                         default=True, dest="wait")
595
596     exgroup = parser.add_mutually_exclusive_group()
597     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
598                         default=True, dest="log_timestamps")
599     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
600                         default=True, dest="log_timestamps")
601
602     parser.add_argument("--api", type=str,
603                         default=None, dest="work_api",
604                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
605
606     parser.add_argument("--compute-checksum", action="store_true", default=False,
607                         help="Compute checksum of contents while collecting outputs",
608                         dest="compute_checksum")
609
610     parser.add_argument("--submit-runner-ram", type=int,
611                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
612                         default=1024)
613
614     parser.add_argument("--submit-runner-image", type=str,
615                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
616                         default=None)
617
618     parser.add_argument("--name", type=str,
619                         help="Name to use for workflow execution instance.",
620                         default=None)
621
622     parser.add_argument("--on-error", type=str,
623                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
624                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
625
626     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
627     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
628
629     return parser
630
631 def add_arv_hints():
632     cache = {}
633     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
634     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
635     cache["http://arvados.org/cwl"] = res.read()
636     res.close()
637     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
638     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
639     for n in extnames.names:
640         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
641             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
642         document_loader.idx["http://arvados.org/cwl#"+n] = {}
643
644 def main(args, stdout, stderr, api_client=None, keep_client=None):
645     parser = arg_parser()
646
647     job_order_object = None
648     arvargs = parser.parse_args(args)
649
650     if arvargs.version:
651         print versionstring()
652         return
653
654     if arvargs.update_workflow:
655         if arvargs.update_workflow.find('-7fd4e-') == 5:
656             want_api = 'containers'
657         elif arvargs.update_workflow.find('-p5p6p-') == 5:
658             want_api = 'jobs'
659         else:
660             want_api = None
661         if want_api and arvargs.work_api and want_api != arvargs.work_api:
662             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
663                 arvargs.update_workflow, want_api, arvargs.work_api))
664             return 1
665         arvargs.work_api = want_api
666
667     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
668         job_order_object = ({}, "")
669
670     add_arv_hints()
671
672     try:
673         if api_client is None:
674             api_client=arvados.api('v1', model=OrderedJsonModel())
675         if keep_client is None:
676             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
677         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
678                               num_retries=4, output_name=arvargs.output_name,
679                               output_tags=arvargs.output_tags)
680     except Exception as e:
681         logger.error(e)
682         return 1
683
684     if arvargs.debug:
685         logger.setLevel(logging.DEBUG)
686         logging.getLogger('arvados').setLevel(logging.DEBUG)
687
688     if arvargs.quiet:
689         logger.setLevel(logging.WARN)
690         logging.getLogger('arvados').setLevel(logging.WARN)
691         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
692
693     if arvargs.metrics:
694         metrics.setLevel(logging.DEBUG)
695         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
696
697     if arvargs.log_timestamps:
698         arvados.log_handler.setFormatter(logging.Formatter(
699             '%(asctime)s %(name)s %(levelname)s: %(message)s',
700             '%Y-%m-%d %H:%M:%S'))
701     else:
702         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
703
704     arvargs.conformance_test = None
705     arvargs.use_container = True
706     arvargs.relax_path_checks = True
707     arvargs.validate = None
708
709     return cwltool.main.main(args=arvargs,
710                              stdout=stdout,
711                              stderr=stderr,
712                              executor=runner.arv_executor,
713                              makeTool=runner.arv_make_tool,
714                              versionfunc=versionstring,
715                              job_order_object=job_order_object,
716                              make_fs_access=partial(CollectionFsAccess,
717                                                     api_client=api_client,
718                                                     keep_client=keep_client),
719                              fetcher_constructor=partial(CollectionFetcher,
720                                                          api_client=api_client,
721                                                          keep_client=keep_client,
722                                                          num_retries=runner.num_retries),
723                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
724                              logger_handler=arvados.log_handler)