10812: Use packed workflows for all run modes.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import schema_salad
22 from schema_salad.sourceline import SourceLine
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
38
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
44
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
48
49 arvados.log_handler.setFormatter(logging.Formatter(
50         '%(asctime)s %(name)s %(levelname)s: %(message)s',
51         '%Y-%m-%d %H:%M:%S'))
52
53 class ArvCwlRunner(object):
54     """Execute a CWL tool or workflow, submit work (using either jobs or
55     containers API), wait for them to complete, and report output.
56
57     """
58
59     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
60         self.api = api_client
61         self.processes = {}
62         self.lock = threading.Lock()
63         self.cond = threading.Condition(self.lock)
64         self.final_output = None
65         self.final_status = None
66         self.uploaded = {}
67         self.num_retries = num_retries
68         self.uuid = None
69         self.stop_polling = threading.Event()
70         self.poll_api = None
71         self.pipeline = None
72         self.final_output_collection = None
73         self.output_name = output_name
74         self.output_tags = output_tags
75         self.project_uuid = None
76
77         if keep_client is not None:
78             self.keep_client = keep_client
79         else:
80             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
81
82         self.work_api = None
83         expected_api = ["jobs", "containers"]
84         for api in expected_api:
85             try:
86                 methods = self.api._rootDesc.get('resources')[api]['methods']
87                 if ('httpMethod' in methods['create'] and
88                     (work_api == api or work_api is None)):
89                     self.work_api = api
90                     break
91             except KeyError:
92                 pass
93
94         if not self.work_api:
95             if work_api is None:
96                 raise Exception("No supported APIs")
97             else:
98                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
99
100     def arv_make_tool(self, toolpath_object, **kwargs):
101         kwargs["work_api"] = self.work_api
102         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
103                                                 api_client=self.api,
104                                                 keep_client=self.keep_client)
105         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106             return ArvadosCommandTool(self, toolpath_object, **kwargs)
107         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108             return ArvadosWorkflow(self, toolpath_object, **kwargs)
109         else:
110             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
111
112     def output_callback(self, out, processStatus):
113         if processStatus == "success":
114             logger.info("Overall process status is %s", processStatus)
115             if self.pipeline:
116                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
118         else:
119             logger.warn("Overall process status is %s", processStatus)
120             if self.pipeline:
121                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
123         self.final_status = processStatus
124         self.final_output = out
125
126     def on_message(self, event):
127         if "object_uuid" in event:
128             if event["object_uuid"] in self.processes and event["event_type"] == "update":
129                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130                     uuid = event["object_uuid"]
131                     with self.lock:
132                         j = self.processes[uuid]
133                         logger.info("%s %s is Running", self.label(j), uuid)
134                         j.running = True
135                         j.update_pipeline_component(event["properties"]["new_attributes"])
136                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137                     uuid = event["object_uuid"]
138                     try:
139                         self.cond.acquire()
140                         j = self.processes[uuid]
141                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142                         with Perf(metrics, "done %s" % j.name):
143                             j.done(event["properties"]["new_attributes"])
144                         self.cond.notify()
145                     finally:
146                         self.cond.release()
147
148     def label(self, obj):
149         return "[%s %s]" % (self.work_api[0:-1], obj.name)
150
151     def poll_states(self):
152         """Poll status of jobs or containers listed in the processes dict.
153
154         Runs in a separate thread.
155         """
156
157         try:
158             while True:
159                 self.stop_polling.wait(15)
160                 if self.stop_polling.is_set():
161                     break
162                 with self.lock:
163                     keys = self.processes.keys()
164                 if not keys:
165                     continue
166
167                 if self.work_api == "containers":
168                     table = self.poll_api.container_requests()
169                 elif self.work_api == "jobs":
170                     table = self.poll_api.jobs()
171
172                 try:
173                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174                 except Exception as e:
175                     logger.warn("Error checking states on API server: %s", e)
176                     continue
177
178                 for p in proc_states["items"]:
179                     self.on_message({
180                        "object_uuid": p["uuid"],
181                         "event_type": "update",
182                         "properties": {
183                             "new_attributes": p
184                         }
185                     })
186         except:
187             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
188             self.cond.acquire()
189             self.processes.clear()
190             self.cond.notify()
191             self.cond.release()
192         finally:
193             self.stop_polling.set()
194
195     def get_uploaded(self):
196         return self.uploaded.copy()
197
198     def add_uploaded(self, src, pair):
199         self.uploaded[src] = pair
200
201     def check_features(self, obj):
202         if isinstance(obj, dict):
203             if obj.get("class") == "InitialWorkDirRequirement":
204                 if self.work_api == "containers":
205                     raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
206             if obj.get("writable"):
207                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
208             if obj.get("class") == "CommandLineTool":
209                 if self.work_api == "containers":
210                     if obj.get("stdin"):
211                         raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
212                     if obj.get("stderr"):
213                         raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
214             for v in obj.itervalues():
215                 self.check_features(v)
216         elif isinstance(obj, list):
217             for i,v in enumerate(obj):
218                 with SourceLine(obj, i, UnsupportedRequirement):
219                     self.check_features(v)
220
221     def make_output_collection(self, name, tagsString, outputObj):
222         outputObj = copy.deepcopy(outputObj)
223
224         files = []
225         def capture(fileobj):
226             files.append(fileobj)
227
228         adjustDirObjs(outputObj, capture)
229         adjustFileObjs(outputObj, capture)
230
231         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
232
233         final = arvados.collection.Collection(api_client=self.api,
234                                               keep_client=self.keep_client,
235                                               num_retries=self.num_retries)
236
237         srccollections = {}
238         for k,v in generatemapper.items():
239             if k.startswith("_:"):
240                 if v.type == "Directory":
241                     continue
242                 if v.type == "CreateFile":
243                     with final.open(v.target, "wb") as f:
244                         f.write(v.resolved.encode("utf-8"))
245                     continue
246
247             if not k.startswith("keep:"):
248                 raise Exception("Output source is not in keep or a literal")
249             sp = k.split("/")
250             srccollection = sp[0][5:]
251             if srccollection not in srccollections:
252                 try:
253                     srccollections[srccollection] = arvados.collection.CollectionReader(
254                         srccollection,
255                         api_client=self.api,
256                         keep_client=self.keep_client,
257                         num_retries=self.num_retries)
258                 except arvados.errors.ArgumentError as e:
259                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
260                     raise
261             reader = srccollections[srccollection]
262             try:
263                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
264                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
265             except IOError as e:
266                 logger.warn("While preparing output collection: %s", e)
267
268         def rewrite(fileobj):
269             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
270             for k in ("basename", "listing", "contents"):
271                 if k in fileobj:
272                     del fileobj[k]
273
274         adjustDirObjs(outputObj, rewrite)
275         adjustFileObjs(outputObj, rewrite)
276
277         with final.open("cwl.output.json", "w") as f:
278             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
279
280         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
281
282         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
283                     final.api_response()["name"],
284                     final.manifest_locator())
285
286         final_uuid = final.manifest_locator()
287         tags = tagsString.split(',')
288         for tag in tags:
289              self.api.links().create(body={
290                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
291                 }).execute(num_retries=self.num_retries)
292
293         def finalcollection(fileobj):
294             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
295
296         adjustDirObjs(outputObj, finalcollection)
297         adjustFileObjs(outputObj, finalcollection)
298
299         return (outputObj, final)
300
301     def set_crunch_output(self):
302         if self.work_api == "containers":
303             try:
304                 current = self.api.containers().current().execute(num_retries=self.num_retries)
305             except ApiError as e:
306                 # Status code 404 just means we're not running in a container.
307                 if e.resp.status != 404:
308                     logger.info("Getting current container: %s", e)
309                 return
310             try:
311                 self.api.containers().update(uuid=current['uuid'],
312                                              body={
313                                                  'output': self.final_output_collection.portable_data_hash(),
314                                              }).execute(num_retries=self.num_retries)
315             except Exception as e:
316                 logger.info("Setting container output: %s", e)
317         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
318             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
319                                    body={
320                                        'output': self.final_output_collection.portable_data_hash(),
321                                        'success': self.final_status == "success",
322                                        'progress':1.0
323                                    }).execute(num_retries=self.num_retries)
324
325     def arv_executor(self, tool, job_order, **kwargs):
326         self.debug = kwargs.get("debug")
327
328         tool.visit(self.check_features)
329
330         self.project_uuid = kwargs.get("project_uuid")
331         self.pipeline = None
332         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
333                                                                  api_client=self.api,
334                                                                  keep_client=self.keep_client)
335         self.fs_access = make_fs_access(kwargs["basedir"])
336
337         if not kwargs.get("name"):
338             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
339
340         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
341         # Also uploads docker images.
342         upload_workflow_deps(self, tool)
343
344         # Reload tool object which may have been updated by
345         # upload_workflow_deps
346         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
347                                   makeTool=self.arv_make_tool,
348                                   loader=tool.doc_loader,
349                                   avsc_names=tool.doc_schema,
350                                   metadata=tool.metadata)
351
352         # Upload local file references in the job order.
353         job_order = upload_job_order(self, "%s input" % kwargs["name"],
354                                      tool, job_order)
355
356         existing_uuid = kwargs.get("update_workflow")
357         if existing_uuid or kwargs.get("create_workflow"):
358             # Create a pipeline template or workflow record and exit.
359             if self.work_api == "jobs":
360                 tmpl = RunnerTemplate(self, tool, job_order,
361                                       kwargs.get("enable_reuse"),
362                                       uuid=existing_uuid,
363                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
364                                       name=kwargs["name"])
365                 tmpl.save()
366                 # cwltool.main will write our return value to stdout.
367                 return (tmpl.uuid, "success")
368             elif self.work_api == "containers":
369                 return (upload_workflow(self, tool, job_order,
370                                         self.project_uuid,
371                                         uuid=existing_uuid,
372                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
373                                         name=kwargs["name"]),
374                         "success")
375
376         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
377
378         kwargs["make_fs_access"] = make_fs_access
379         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
380         kwargs["use_container"] = True
381         kwargs["tmpdir_prefix"] = "tmp"
382         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
383
384         if self.work_api == "containers":
385             kwargs["outdir"] = "/var/spool/cwl"
386             kwargs["docker_outdir"] = "/var/spool/cwl"
387             kwargs["tmpdir"] = "/tmp"
388             kwargs["docker_tmpdir"] = "/tmp"
389         elif self.work_api == "jobs":
390             kwargs["outdir"] = "$(task.outdir)"
391             kwargs["docker_outdir"] = "$(task.outdir)"
392             kwargs["tmpdir"] = "$(task.tmpdir)"
393
394         runnerjob = None
395         if kwargs.get("submit"):
396             # Submit a runner job to run the workflow for us.
397             if self.work_api == "containers":
398                 if tool.tool["class"] == "CommandLineTool":
399                     kwargs["runnerjob"] = tool.tool["id"]
400                     upload_dependencies(self,
401                                         kwargs["name"],
402                                         tool.doc_loader,
403                                         tool.tool,
404                                         tool.tool["id"],
405                                         False)
406                     runnerjob = tool.job(job_order,
407                                          self.output_callback,
408                                          **kwargs).next()
409                 else:
410                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
411                                                 self.output_name,
412                                                 self.output_tags,
413                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
414                                                 name=kwargs.get("name"),
415                                                 on_error=kwargs.get("on_error"),
416                                                 submit_runner_image=kwargs.get("submit_runner_image"))
417             elif self.work_api == "jobs":
418                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
419                                       self.output_name,
420                                       self.output_tags,
421                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
422                                       name=kwargs.get("name"),
423                                       on_error=kwargs.get("on_error"),
424                                       submit_runner_image=kwargs.get("submit_runner_image"))
425
426         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
427             # Create pipeline for local run
428             self.pipeline = self.api.pipeline_instances().create(
429                 body={
430                     "owner_uuid": self.project_uuid,
431                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
432                     "components": {},
433                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
434             logger.info("Pipeline instance %s", self.pipeline["uuid"])
435
436         if runnerjob and not kwargs.get("wait"):
437             runnerjob.run(wait=kwargs.get("wait"))
438             return (runnerjob.uuid, "success")
439
440         self.poll_api = arvados.api('v1')
441         self.polling_thread = threading.Thread(target=self.poll_states)
442         self.polling_thread.start()
443
444         if runnerjob:
445             jobiter = iter((runnerjob,))
446         else:
447             if "cwl_runner_job" in kwargs:
448                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
449             jobiter = tool.job(job_order,
450                                self.output_callback,
451                                **kwargs)
452
453         try:
454             self.cond.acquire()
455             # Will continue to hold the lock for the duration of this code
456             # except when in cond.wait(), at which point on_message can update
457             # job state and process output callbacks.
458
459             loopperf = Perf(metrics, "jobiter")
460             loopperf.__enter__()
461             for runnable in jobiter:
462                 loopperf.__exit__()
463
464                 if self.stop_polling.is_set():
465                     break
466
467                 if runnable:
468                     with Perf(metrics, "run"):
469                         runnable.run(**kwargs)
470                 else:
471                     if self.processes:
472                         self.cond.wait(1)
473                     else:
474                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
475                         break
476                 loopperf.__enter__()
477             loopperf.__exit__()
478
479             while self.processes:
480                 self.cond.wait(1)
481
482         except UnsupportedRequirement:
483             raise
484         except:
485             if sys.exc_info()[0] is KeyboardInterrupt:
486                 logger.error("Interrupted, marking pipeline as failed")
487             else:
488                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
489             if self.pipeline:
490                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
491                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
492             if runnerjob and runnerjob.uuid and self.work_api == "containers":
493                 self.api.container_requests().update(uuid=runnerjob.uuid,
494                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
495         finally:
496             self.cond.release()
497             self.stop_polling.set()
498             self.polling_thread.join()
499
500         if self.final_status == "UnsupportedRequirement":
501             raise UnsupportedRequirement("Check log for details.")
502
503         if self.final_output is None:
504             raise WorkflowException("Workflow did not return a result.")
505
506         if kwargs.get("submit") and isinstance(runnerjob, Runner):
507             logger.info("Final output collection %s", runnerjob.final_output)
508         else:
509             if self.output_name is None:
510                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
511             if self.output_tags is None:
512                 self.output_tags = ""
513             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
514             self.set_crunch_output()
515
516         if kwargs.get("compute_checksum"):
517             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
518             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
519
520         return (self.final_output, self.final_status)
521
522
523 def versionstring():
524     """Print version string of key packages for provenance and debugging."""
525
526     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
527     arvpkg = pkg_resources.require("arvados-python-client")
528     cwlpkg = pkg_resources.require("cwltool")
529
530     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
531                                     "arvados-python-client", arvpkg[0].version,
532                                     "cwltool", cwlpkg[0].version)
533
534
535 def arg_parser():  # type: () -> argparse.ArgumentParser
536     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
537
538     parser.add_argument("--basedir", type=str,
539                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
540     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
541                         help="Output directory, default current directory")
542
543     parser.add_argument("--eval-timeout",
544                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
545                         type=float,
546                         default=20)
547     parser.add_argument("--version", action="store_true", help="Print version and exit")
548
549     exgroup = parser.add_mutually_exclusive_group()
550     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
551     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
552     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
553
554     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
555
556     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
557
558     exgroup = parser.add_mutually_exclusive_group()
559     exgroup.add_argument("--enable-reuse", action="store_true",
560                         default=True, dest="enable_reuse",
561                         help="")
562     exgroup.add_argument("--disable-reuse", action="store_false",
563                         default=True, dest="enable_reuse",
564                         help="")
565
566     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
567     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
568     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
569     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
570                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
571                         default=False)
572
573     exgroup = parser.add_mutually_exclusive_group()
574     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
575                         default=True, dest="submit")
576     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
577                         default=True, dest="submit")
578     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
579                          dest="create_workflow")
580     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
581     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
582
583     exgroup = parser.add_mutually_exclusive_group()
584     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
585                         default=True, dest="wait")
586     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
587                         default=True, dest="wait")
588
589     exgroup = parser.add_mutually_exclusive_group()
590     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
591                         default=True, dest="log_timestamps")
592     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
593                         default=True, dest="log_timestamps")
594
595     parser.add_argument("--api", type=str,
596                         default=None, dest="work_api",
597                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
598
599     parser.add_argument("--compute-checksum", action="store_true", default=False,
600                         help="Compute checksum of contents while collecting outputs",
601                         dest="compute_checksum")
602
603     parser.add_argument("--submit-runner-ram", type=int,
604                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
605                         default=1024)
606
607     parser.add_argument("--submit-runner-image", type=str,
608                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
609                         default=None)
610
611     parser.add_argument("--name", type=str,
612                         help="Name to use for workflow execution instance.",
613                         default=None)
614
615     parser.add_argument("--on-error", type=str,
616                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
617                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
618
619     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
620     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
621
622     return parser
623
624 def add_arv_hints():
625     cache = {}
626     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
627     cache["http://arvados.org/cwl"] = res.read()
628     res.close()
629     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
630     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
631     for n in extnames.names:
632         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
633             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
634         document_loader.idx["http://arvados.org/cwl#"+n] = {}
635
636 def main(args, stdout, stderr, api_client=None, keep_client=None):
637     parser = arg_parser()
638
639     job_order_object = None
640     arvargs = parser.parse_args(args)
641
642     if arvargs.version:
643         print versionstring()
644         return
645
646     if arvargs.update_workflow:
647         if arvargs.update_workflow.find('-7fd4e-') == 5:
648             want_api = 'containers'
649         elif arvargs.update_workflow.find('-p5p6p-') == 5:
650             want_api = 'jobs'
651         else:
652             want_api = None
653         if want_api and arvargs.work_api and want_api != arvargs.work_api:
654             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
655                 arvargs.update_workflow, want_api, arvargs.work_api))
656             return 1
657         arvargs.work_api = want_api
658
659     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
660         job_order_object = ({}, "")
661
662     add_arv_hints()
663
664     try:
665         if api_client is None:
666             api_client=arvados.api('v1', model=OrderedJsonModel())
667         if keep_client is None:
668             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
669         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
670                               num_retries=4, output_name=arvargs.output_name,
671                               output_tags=arvargs.output_tags)
672     except Exception as e:
673         logger.error(e)
674         return 1
675
676     if arvargs.debug:
677         logger.setLevel(logging.DEBUG)
678
679     if arvargs.quiet:
680         logger.setLevel(logging.WARN)
681         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
682
683     if arvargs.metrics:
684         metrics.setLevel(logging.DEBUG)
685         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
686
687     if arvargs.log_timestamps:
688         arvados.log_handler.setFormatter(logging.Formatter(
689             '%(asctime)s %(name)s %(levelname)s: %(message)s',
690             '%Y-%m-%d %H:%M:%S'))
691     else:
692         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
693
694     arvargs.conformance_test = None
695     arvargs.use_container = True
696     arvargs.relax_path_checks = True
697     arvargs.validate = None
698
699     return cwltool.main.main(args=arvargs,
700                              stdout=stdout,
701                              stderr=stderr,
702                              executor=runner.arv_executor,
703                              makeTool=runner.arv_make_tool,
704                              versionfunc=versionstring,
705                              job_order_object=job_order_object,
706                              make_fs_access=partial(CollectionFsAccess,
707                                                     api_client=api_client,
708                                                     keep_client=keep_client),
709                              fetcher_constructor=partial(CollectionFetcher,
710                                                          api_client=api_client,
711                                                          keep_client=keep_client),
712                              resolver=partial(collectionResolver, api_client),
713                              logger_handler=arvados.log_handler)