11002: Merge branch 'master' into 11002-arvput-crash-fix
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import schema_salad
22 from schema_salad.sourceline import SourceLine
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import NoFollowPathMapper
37 from ._version import __version__
38
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
44
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
48
49 arvados.log_handler.setFormatter(logging.Formatter(
50         '%(asctime)s %(name)s %(levelname)s: %(message)s',
51         '%Y-%m-%d %H:%M:%S'))
52
53 class ArvCwlRunner(object):
54     """Execute a CWL tool or workflow, submit work (using either jobs or
55     containers API), wait for them to complete, and report output.
56
57     """
58
59     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
60         self.api = api_client
61         self.processes = {}
62         self.lock = threading.Lock()
63         self.cond = threading.Condition(self.lock)
64         self.final_output = None
65         self.final_status = None
66         self.uploaded = {}
67         self.num_retries = num_retries
68         self.uuid = None
69         self.stop_polling = threading.Event()
70         self.poll_api = None
71         self.pipeline = None
72         self.final_output_collection = None
73         self.output_name = output_name
74         self.output_tags = output_tags
75         self.project_uuid = None
76
77         if keep_client is not None:
78             self.keep_client = keep_client
79         else:
80             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
81
82         self.work_api = None
83         expected_api = ["jobs", "containers"]
84         for api in expected_api:
85             try:
86                 methods = self.api._rootDesc.get('resources')[api]['methods']
87                 if ('httpMethod' in methods['create'] and
88                     (work_api == api or work_api is None)):
89                     self.work_api = api
90                     break
91             except KeyError:
92                 pass
93
94         if not self.work_api:
95             if work_api is None:
96                 raise Exception("No supported APIs")
97             else:
98                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
99
100     def arv_make_tool(self, toolpath_object, **kwargs):
101         kwargs["work_api"] = self.work_api
102         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
103                                                 api_client=self.api,
104                                                 keep_client=self.keep_client)
105         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106             return ArvadosCommandTool(self, toolpath_object, **kwargs)
107         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108             return ArvadosWorkflow(self, toolpath_object, **kwargs)
109         else:
110             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
111
112     def output_callback(self, out, processStatus):
113         if processStatus == "success":
114             logger.info("Overall process status is %s", processStatus)
115             if self.pipeline:
116                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
118         else:
119             logger.warn("Overall process status is %s", processStatus)
120             if self.pipeline:
121                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
123         self.final_status = processStatus
124         self.final_output = out
125
126     def on_message(self, event):
127         if "object_uuid" in event:
128             if event["object_uuid"] in self.processes and event["event_type"] == "update":
129                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130                     uuid = event["object_uuid"]
131                     with self.lock:
132                         j = self.processes[uuid]
133                         logger.info("%s %s is Running", self.label(j), uuid)
134                         j.running = True
135                         j.update_pipeline_component(event["properties"]["new_attributes"])
136                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137                     uuid = event["object_uuid"]
138                     try:
139                         self.cond.acquire()
140                         j = self.processes[uuid]
141                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142                         with Perf(metrics, "done %s" % j.name):
143                             j.done(event["properties"]["new_attributes"])
144                         self.cond.notify()
145                     finally:
146                         self.cond.release()
147
148     def label(self, obj):
149         return "[%s %s]" % (self.work_api[0:-1], obj.name)
150
151     def poll_states(self):
152         """Poll status of jobs or containers listed in the processes dict.
153
154         Runs in a separate thread.
155         """
156
157         try:
158             while True:
159                 self.stop_polling.wait(15)
160                 if self.stop_polling.is_set():
161                     break
162                 with self.lock:
163                     keys = self.processes.keys()
164                 if not keys:
165                     continue
166
167                 if self.work_api == "containers":
168                     table = self.poll_api.container_requests()
169                 elif self.work_api == "jobs":
170                     table = self.poll_api.jobs()
171
172                 try:
173                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174                 except Exception as e:
175                     logger.warn("Error checking states on API server: %s", e)
176                     continue
177
178                 for p in proc_states["items"]:
179                     self.on_message({
180                         "object_uuid": p["uuid"],
181                         "event_type": "update",
182                         "properties": {
183                             "new_attributes": p
184                         }
185                     })
186         except:
187             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
188             self.cond.acquire()
189             self.processes.clear()
190             self.cond.notify()
191             self.cond.release()
192         finally:
193             self.stop_polling.set()
194
195     def get_uploaded(self):
196         return self.uploaded.copy()
197
198     def add_uploaded(self, src, pair):
199         self.uploaded[src] = pair
200
201     def check_features(self, obj):
202         if isinstance(obj, dict):
203             if obj.get("writable"):
204                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
205             if obj.get("class") == "CommandLineTool":
206                 if self.work_api == "containers":
207                     if obj.get("stdin"):
208                         raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
209                     if obj.get("stderr"):
210                         raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
211             if obj.get("class") == "DockerRequirement":
212                 if obj.get("dockerOutputDirectory"):
213                     # TODO: can be supported by containers API, but not jobs API.
214                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
215                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
216             for v in obj.itervalues():
217                 self.check_features(v)
218         elif isinstance(obj, list):
219             for i,v in enumerate(obj):
220                 with SourceLine(obj, i, UnsupportedRequirement):
221                     self.check_features(v)
222
223     def make_output_collection(self, name, tagsString, outputObj):
224         outputObj = copy.deepcopy(outputObj)
225
226         files = []
227         def capture(fileobj):
228             files.append(fileobj)
229
230         adjustDirObjs(outputObj, capture)
231         adjustFileObjs(outputObj, capture)
232
233         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
234
235         final = arvados.collection.Collection(api_client=self.api,
236                                               keep_client=self.keep_client,
237                                               num_retries=self.num_retries)
238
239         srccollections = {}
240         for k,v in generatemapper.items():
241             if k.startswith("_:"):
242                 if v.type == "Directory":
243                     continue
244                 if v.type == "CreateFile":
245                     with final.open(v.target, "wb") as f:
246                         f.write(v.resolved.encode("utf-8"))
247                     continue
248
249             if not k.startswith("keep:"):
250                 raise Exception("Output source is not in keep or a literal")
251             sp = k.split("/")
252             srccollection = sp[0][5:]
253             if srccollection not in srccollections:
254                 try:
255                     srccollections[srccollection] = arvados.collection.CollectionReader(
256                         srccollection,
257                         api_client=self.api,
258                         keep_client=self.keep_client,
259                         num_retries=self.num_retries)
260                 except arvados.errors.ArgumentError as e:
261                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
262                     raise
263             reader = srccollections[srccollection]
264             try:
265                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
266                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
267             except IOError as e:
268                 logger.warn("While preparing output collection: %s", e)
269
270         def rewrite(fileobj):
271             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
272             for k in ("basename", "listing", "contents"):
273                 if k in fileobj:
274                     del fileobj[k]
275
276         adjustDirObjs(outputObj, rewrite)
277         adjustFileObjs(outputObj, rewrite)
278
279         with final.open("cwl.output.json", "w") as f:
280             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
281
282         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
283
284         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
285                     final.api_response()["name"],
286                     final.manifest_locator())
287
288         final_uuid = final.manifest_locator()
289         tags = tagsString.split(',')
290         for tag in tags:
291              self.api.links().create(body={
292                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
293                 }).execute(num_retries=self.num_retries)
294
295         def finalcollection(fileobj):
296             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
297
298         adjustDirObjs(outputObj, finalcollection)
299         adjustFileObjs(outputObj, finalcollection)
300
301         return (outputObj, final)
302
303     def set_crunch_output(self):
304         if self.work_api == "containers":
305             try:
306                 current = self.api.containers().current().execute(num_retries=self.num_retries)
307             except ApiError as e:
308                 # Status code 404 just means we're not running in a container.
309                 if e.resp.status != 404:
310                     logger.info("Getting current container: %s", e)
311                 return
312             try:
313                 self.api.containers().update(uuid=current['uuid'],
314                                              body={
315                                                  'output': self.final_output_collection.portable_data_hash(),
316                                              }).execute(num_retries=self.num_retries)
317             except Exception as e:
318                 logger.info("Setting container output: %s", e)
319         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
320             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
321                                    body={
322                                        'output': self.final_output_collection.portable_data_hash(),
323                                        'success': self.final_status == "success",
324                                        'progress':1.0
325                                    }).execute(num_retries=self.num_retries)
326
327     def arv_executor(self, tool, job_order, **kwargs):
328         self.debug = kwargs.get("debug")
329
330         tool.visit(self.check_features)
331
332         self.project_uuid = kwargs.get("project_uuid")
333         self.pipeline = None
334         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
335                                                                  api_client=self.api,
336                                                                  keep_client=self.keep_client)
337         self.fs_access = make_fs_access(kwargs["basedir"])
338
339         if not kwargs.get("name"):
340             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
341
342         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
343         # Also uploads docker images.
344         upload_workflow_deps(self, tool)
345
346         # Reload tool object which may have been updated by
347         # upload_workflow_deps
348         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
349                                   makeTool=self.arv_make_tool,
350                                   loader=tool.doc_loader,
351                                   avsc_names=tool.doc_schema,
352                                   metadata=tool.metadata)
353
354         # Upload local file references in the job order.
355         job_order = upload_job_order(self, "%s input" % kwargs["name"],
356                                      tool, job_order)
357
358         existing_uuid = kwargs.get("update_workflow")
359         if existing_uuid or kwargs.get("create_workflow"):
360             # Create a pipeline template or workflow record and exit.
361             if self.work_api == "jobs":
362                 tmpl = RunnerTemplate(self, tool, job_order,
363                                       kwargs.get("enable_reuse"),
364                                       uuid=existing_uuid,
365                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
366                                       name=kwargs["name"])
367                 tmpl.save()
368                 # cwltool.main will write our return value to stdout.
369                 return (tmpl.uuid, "success")
370             elif self.work_api == "containers":
371                 return (upload_workflow(self, tool, job_order,
372                                         self.project_uuid,
373                                         uuid=existing_uuid,
374                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
375                                         name=kwargs["name"]),
376                         "success")
377
378         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
379
380         kwargs["make_fs_access"] = make_fs_access
381         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
382         kwargs["use_container"] = True
383         kwargs["tmpdir_prefix"] = "tmp"
384         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
385
386         if self.work_api == "containers":
387             kwargs["outdir"] = "/var/spool/cwl"
388             kwargs["docker_outdir"] = "/var/spool/cwl"
389             kwargs["tmpdir"] = "/tmp"
390             kwargs["docker_tmpdir"] = "/tmp"
391         elif self.work_api == "jobs":
392             kwargs["outdir"] = "$(task.outdir)"
393             kwargs["docker_outdir"] = "$(task.outdir)"
394             kwargs["tmpdir"] = "$(task.tmpdir)"
395
396         runnerjob = None
397         if kwargs.get("submit"):
398             # Submit a runner job to run the workflow for us.
399             if self.work_api == "containers":
400                 if tool.tool["class"] == "CommandLineTool":
401                     kwargs["runnerjob"] = tool.tool["id"]
402                     upload_dependencies(self,
403                                         kwargs["name"],
404                                         tool.doc_loader,
405                                         tool.tool,
406                                         tool.tool["id"],
407                                         False)
408                     runnerjob = tool.job(job_order,
409                                          self.output_callback,
410                                          **kwargs).next()
411                 else:
412                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
413                                                 self.output_name,
414                                                 self.output_tags,
415                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
416                                                 name=kwargs.get("name"),
417                                                 on_error=kwargs.get("on_error"),
418                                                 submit_runner_image=kwargs.get("submit_runner_image"))
419             elif self.work_api == "jobs":
420                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
421                                       self.output_name,
422                                       self.output_tags,
423                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
424                                       name=kwargs.get("name"),
425                                       on_error=kwargs.get("on_error"),
426                                       submit_runner_image=kwargs.get("submit_runner_image"))
427
428         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
429             # Create pipeline for local run
430             self.pipeline = self.api.pipeline_instances().create(
431                 body={
432                     "owner_uuid": self.project_uuid,
433                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
434                     "components": {},
435                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
436             logger.info("Pipeline instance %s", self.pipeline["uuid"])
437
438         if runnerjob and not kwargs.get("wait"):
439             runnerjob.run(wait=kwargs.get("wait"))
440             return (runnerjob.uuid, "success")
441
442         self.poll_api = arvados.api('v1')
443         self.polling_thread = threading.Thread(target=self.poll_states)
444         self.polling_thread.start()
445
446         if runnerjob:
447             jobiter = iter((runnerjob,))
448         else:
449             if "cwl_runner_job" in kwargs:
450                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
451             jobiter = tool.job(job_order,
452                                self.output_callback,
453                                **kwargs)
454
455         try:
456             self.cond.acquire()
457             # Will continue to hold the lock for the duration of this code
458             # except when in cond.wait(), at which point on_message can update
459             # job state and process output callbacks.
460
461             loopperf = Perf(metrics, "jobiter")
462             loopperf.__enter__()
463             for runnable in jobiter:
464                 loopperf.__exit__()
465
466                 if self.stop_polling.is_set():
467                     break
468
469                 if runnable:
470                     with Perf(metrics, "run"):
471                         runnable.run(**kwargs)
472                 else:
473                     if self.processes:
474                         self.cond.wait(1)
475                     else:
476                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
477                         break
478                 loopperf.__enter__()
479             loopperf.__exit__()
480
481             while self.processes:
482                 self.cond.wait(1)
483
484         except UnsupportedRequirement:
485             raise
486         except:
487             if sys.exc_info()[0] is KeyboardInterrupt:
488                 logger.error("Interrupted, marking pipeline as failed")
489             else:
490                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
491             if self.pipeline:
492                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
493                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
494             if runnerjob and runnerjob.uuid and self.work_api == "containers":
495                 self.api.container_requests().update(uuid=runnerjob.uuid,
496                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
497         finally:
498             self.cond.release()
499             self.stop_polling.set()
500             self.polling_thread.join()
501
502         if self.final_status == "UnsupportedRequirement":
503             raise UnsupportedRequirement("Check log for details.")
504
505         if self.final_output is None:
506             raise WorkflowException("Workflow did not return a result.")
507
508         if kwargs.get("submit") and isinstance(runnerjob, Runner):
509             logger.info("Final output collection %s", runnerjob.final_output)
510         else:
511             if self.output_name is None:
512                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
513             if self.output_tags is None:
514                 self.output_tags = ""
515             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
516             self.set_crunch_output()
517
518         if kwargs.get("compute_checksum"):
519             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
520             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
521
522         return (self.final_output, self.final_status)
523
524
525 def versionstring():
526     """Print version string of key packages for provenance and debugging."""
527
528     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
529     arvpkg = pkg_resources.require("arvados-python-client")
530     cwlpkg = pkg_resources.require("cwltool")
531
532     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
533                                     "arvados-python-client", arvpkg[0].version,
534                                     "cwltool", cwlpkg[0].version)
535
536
537 def arg_parser():  # type: () -> argparse.ArgumentParser
538     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
539
540     parser.add_argument("--basedir", type=str,
541                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
542     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
543                         help="Output directory, default current directory")
544
545     parser.add_argument("--eval-timeout",
546                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
547                         type=float,
548                         default=20)
549     parser.add_argument("--version", action="store_true", help="Print version and exit")
550
551     exgroup = parser.add_mutually_exclusive_group()
552     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
553     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
554     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
555
556     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
557
558     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
559
560     exgroup = parser.add_mutually_exclusive_group()
561     exgroup.add_argument("--enable-reuse", action="store_true",
562                         default=True, dest="enable_reuse",
563                         help="")
564     exgroup.add_argument("--disable-reuse", action="store_false",
565                         default=True, dest="enable_reuse",
566                         help="")
567
568     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
569     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
570     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
571     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
572                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
573                         default=False)
574
575     exgroup = parser.add_mutually_exclusive_group()
576     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
577                         default=True, dest="submit")
578     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
579                         default=True, dest="submit")
580     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
581                          dest="create_workflow")
582     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
583     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
584
585     exgroup = parser.add_mutually_exclusive_group()
586     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
587                         default=True, dest="wait")
588     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
589                         default=True, dest="wait")
590
591     exgroup = parser.add_mutually_exclusive_group()
592     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
593                         default=True, dest="log_timestamps")
594     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
595                         default=True, dest="log_timestamps")
596
597     parser.add_argument("--api", type=str,
598                         default=None, dest="work_api",
599                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
600
601     parser.add_argument("--compute-checksum", action="store_true", default=False,
602                         help="Compute checksum of contents while collecting outputs",
603                         dest="compute_checksum")
604
605     parser.add_argument("--submit-runner-ram", type=int,
606                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
607                         default=1024)
608
609     parser.add_argument("--submit-runner-image", type=str,
610                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
611                         default=None)
612
613     parser.add_argument("--name", type=str,
614                         help="Name to use for workflow execution instance.",
615                         default=None)
616
617     parser.add_argument("--on-error", type=str,
618                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
619                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
620
621     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
622     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
623
624     return parser
625
626 def add_arv_hints():
627     cache = {}
628     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
629     cache["http://arvados.org/cwl"] = res.read()
630     res.close()
631     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
632     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
633     for n in extnames.names:
634         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
635             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
636         document_loader.idx["http://arvados.org/cwl#"+n] = {}
637
638 def main(args, stdout, stderr, api_client=None, keep_client=None):
639     parser = arg_parser()
640
641     job_order_object = None
642     arvargs = parser.parse_args(args)
643
644     if arvargs.version:
645         print versionstring()
646         return
647
648     if arvargs.update_workflow:
649         if arvargs.update_workflow.find('-7fd4e-') == 5:
650             want_api = 'containers'
651         elif arvargs.update_workflow.find('-p5p6p-') == 5:
652             want_api = 'jobs'
653         else:
654             want_api = None
655         if want_api and arvargs.work_api and want_api != arvargs.work_api:
656             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
657                 arvargs.update_workflow, want_api, arvargs.work_api))
658             return 1
659         arvargs.work_api = want_api
660
661     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
662         job_order_object = ({}, "")
663
664     add_arv_hints()
665
666     try:
667         if api_client is None:
668             api_client=arvados.api('v1', model=OrderedJsonModel())
669         if keep_client is None:
670             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
671         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
672                               num_retries=4, output_name=arvargs.output_name,
673                               output_tags=arvargs.output_tags)
674     except Exception as e:
675         logger.error(e)
676         return 1
677
678     if arvargs.debug:
679         logger.setLevel(logging.DEBUG)
680         logging.getLogger('arvados').setLevel(logging.DEBUG)
681
682     if arvargs.quiet:
683         logger.setLevel(logging.WARN)
684         logging.getLogger('arvados').setLevel(logging.WARN)
685         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
686
687     if arvargs.metrics:
688         metrics.setLevel(logging.DEBUG)
689         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
690
691     if arvargs.log_timestamps:
692         arvados.log_handler.setFormatter(logging.Formatter(
693             '%(asctime)s %(name)s %(levelname)s: %(message)s',
694             '%Y-%m-%d %H:%M:%S'))
695     else:
696         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
697
698     arvargs.conformance_test = None
699     arvargs.use_container = True
700     arvargs.relax_path_checks = True
701     arvargs.validate = None
702
703     return cwltool.main.main(args=arvargs,
704                              stdout=stdout,
705                              stderr=stderr,
706                              executor=runner.arv_executor,
707                              makeTool=runner.arv_make_tool,
708                              versionfunc=versionstring,
709                              job_order_object=job_order_object,
710                              make_fs_access=partial(CollectionFsAccess,
711                                                     api_client=api_client,
712                                                     keep_client=keep_client),
713                              fetcher_constructor=partial(CollectionFetcher,
714                                                          api_client=api_client,
715                                                          keep_client=keep_client,
716                                                          num_retries=runner.num_retries),
717                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
718                              logger_handler=arvados.log_handler)