10111: Merge branch 'master' into 10111-collection-labels
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77
78         if keep_client is not None:
79             self.keep_client = keep_client
80         else:
81             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
82
83         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
84
85         self.work_api = None
86         expected_api = ["jobs", "containers"]
87         for api in expected_api:
88             try:
89                 methods = self.api._rootDesc.get('resources')[api]['methods']
90                 if ('httpMethod' in methods['create'] and
91                     (work_api == api or work_api is None)):
92                     self.work_api = api
93                     break
94             except KeyError:
95                 pass
96
97         if not self.work_api:
98             if work_api is None:
99                 raise Exception("No supported APIs")
100             else:
101                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
102
103     def arv_make_tool(self, toolpath_object, **kwargs):
104         kwargs["work_api"] = self.work_api
105         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
106                                                 api_client=self.api,
107                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
108                                                 num_retries=self.num_retries)
109         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
110             return ArvadosCommandTool(self, toolpath_object, **kwargs)
111         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
112             return ArvadosWorkflow(self, toolpath_object, **kwargs)
113         else:
114             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
115
116     def output_callback(self, out, processStatus):
117         if processStatus == "success":
118             logger.info("Overall process status is %s", processStatus)
119             if self.pipeline:
120                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
121                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
122         else:
123             logger.warn("Overall process status is %s", processStatus)
124             if self.pipeline:
125                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
126                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
127         self.final_status = processStatus
128         self.final_output = out
129
130     def on_message(self, event):
131         if "object_uuid" in event:
132             if event["object_uuid"] in self.processes and event["event_type"] == "update":
133                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
134                     uuid = event["object_uuid"]
135                     with self.lock:
136                         j = self.processes[uuid]
137                         logger.info("%s %s is Running", self.label(j), uuid)
138                         j.running = True
139                         j.update_pipeline_component(event["properties"]["new_attributes"])
140                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
141                     uuid = event["object_uuid"]
142                     try:
143                         self.cond.acquire()
144                         j = self.processes[uuid]
145                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
146                         with Perf(metrics, "done %s" % j.name):
147                             j.done(event["properties"]["new_attributes"])
148                         self.cond.notify()
149                     finally:
150                         self.cond.release()
151
152     def label(self, obj):
153         return "[%s %s]" % (self.work_api[0:-1], obj.name)
154
155     def poll_states(self):
156         """Poll status of jobs or containers listed in the processes dict.
157
158         Runs in a separate thread.
159         """
160
161         try:
162             while True:
163                 self.stop_polling.wait(15)
164                 if self.stop_polling.is_set():
165                     break
166                 with self.lock:
167                     keys = self.processes.keys()
168                 if not keys:
169                     continue
170
171                 if self.work_api == "containers":
172                     table = self.poll_api.container_requests()
173                 elif self.work_api == "jobs":
174                     table = self.poll_api.jobs()
175
176                 try:
177                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
178                 except Exception as e:
179                     logger.warn("Error checking states on API server: %s", e)
180                     continue
181
182                 for p in proc_states["items"]:
183                     self.on_message({
184                         "object_uuid": p["uuid"],
185                         "event_type": "update",
186                         "properties": {
187                             "new_attributes": p
188                         }
189                     })
190         except:
191             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
192             self.cond.acquire()
193             self.processes.clear()
194             self.cond.notify()
195             self.cond.release()
196         finally:
197             self.stop_polling.set()
198
199     def get_uploaded(self):
200         return self.uploaded.copy()
201
202     def add_uploaded(self, src, pair):
203         self.uploaded[src] = pair
204
205     def check_features(self, obj):
206         if isinstance(obj, dict):
207             if obj.get("writable"):
208                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
209             if obj.get("class") == "DockerRequirement":
210                 if obj.get("dockerOutputDirectory"):
211                     # TODO: can be supported by containers API, but not jobs API.
212                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
213                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
214             for v in obj.itervalues():
215                 self.check_features(v)
216         elif isinstance(obj, list):
217             for i,v in enumerate(obj):
218                 with SourceLine(obj, i, UnsupportedRequirement):
219                     self.check_features(v)
220
221     def make_output_collection(self, name, tagsString, outputObj):
222         outputObj = copy.deepcopy(outputObj)
223
224         files = []
225         def capture(fileobj):
226             files.append(fileobj)
227
228         adjustDirObjs(outputObj, capture)
229         adjustFileObjs(outputObj, capture)
230
231         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
232
233         final = arvados.collection.Collection(api_client=self.api,
234                                               keep_client=self.keep_client,
235                                               num_retries=self.num_retries)
236
237         for k,v in generatemapper.items():
238             if k.startswith("_:"):
239                 if v.type == "Directory":
240                     continue
241                 if v.type == "CreateFile":
242                     with final.open(v.target, "wb") as f:
243                         f.write(v.resolved.encode("utf-8"))
244                     continue
245
246             if not k.startswith("keep:"):
247                 raise Exception("Output source is not in keep or a literal")
248             sp = k.split("/")
249             srccollection = sp[0][5:]
250             try:
251                 reader = self.collection_cache.get(srccollection)
252                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
253                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
254             except arvados.errors.ArgumentError as e:
255                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
256                 raise
257             except IOError as e:
258                 logger.warn("While preparing output collection: %s", e)
259
260         def rewrite(fileobj):
261             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
262             for k in ("basename", "listing", "contents"):
263                 if k in fileobj:
264                     del fileobj[k]
265
266         adjustDirObjs(outputObj, rewrite)
267         adjustFileObjs(outputObj, rewrite)
268
269         with final.open("cwl.output.json", "w") as f:
270             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
271
272         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
273
274         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
275                     final.api_response()["name"],
276                     final.manifest_locator())
277
278         final_uuid = final.manifest_locator()
279         tags = tagsString.split(',')
280         for tag in tags:
281              self.api.links().create(body={
282                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
283                 }).execute(num_retries=self.num_retries)
284
285         def finalcollection(fileobj):
286             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
287
288         adjustDirObjs(outputObj, finalcollection)
289         adjustFileObjs(outputObj, finalcollection)
290
291         return (outputObj, final)
292
293     def set_crunch_output(self):
294         if self.work_api == "containers":
295             try:
296                 current = self.api.containers().current().execute(num_retries=self.num_retries)
297             except ApiError as e:
298                 # Status code 404 just means we're not running in a container.
299                 if e.resp.status != 404:
300                     logger.info("Getting current container: %s", e)
301                 return
302             try:
303                 self.api.containers().update(uuid=current['uuid'],
304                                              body={
305                                                  'output': self.final_output_collection.portable_data_hash(),
306                                              }).execute(num_retries=self.num_retries)
307                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
308                                               body={
309                                                   'is_trashed': True
310                                               }).execute(num_retries=self.num_retries)
311             except Exception as e:
312                 logger.info("Setting container output: %s", e)
313         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
314             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
315                                    body={
316                                        'output': self.final_output_collection.portable_data_hash(),
317                                        'success': self.final_status == "success",
318                                        'progress':1.0
319                                    }).execute(num_retries=self.num_retries)
320
321     def arv_executor(self, tool, job_order, **kwargs):
322         self.debug = kwargs.get("debug")
323
324         tool.visit(self.check_features)
325
326         self.project_uuid = kwargs.get("project_uuid")
327         self.pipeline = None
328         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
329                                                                  collection_cache=self.collection_cache)
330         self.fs_access = make_fs_access(kwargs["basedir"])
331
332         if not kwargs.get("name"):
333             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
334
335         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
336         # Also uploads docker images.
337         upload_workflow_deps(self, tool)
338
339         # Reload tool object which may have been updated by
340         # upload_workflow_deps
341         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
342                                   makeTool=self.arv_make_tool,
343                                   loader=tool.doc_loader,
344                                   avsc_names=tool.doc_schema,
345                                   metadata=tool.metadata)
346
347         # Upload local file references in the job order.
348         job_order = upload_job_order(self, "%s input" % kwargs["name"],
349                                      tool, job_order)
350
351         existing_uuid = kwargs.get("update_workflow")
352         if existing_uuid or kwargs.get("create_workflow"):
353             # Create a pipeline template or workflow record and exit.
354             if self.work_api == "jobs":
355                 tmpl = RunnerTemplate(self, tool, job_order,
356                                       kwargs.get("enable_reuse"),
357                                       uuid=existing_uuid,
358                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
359                                       name=kwargs["name"])
360                 tmpl.save()
361                 # cwltool.main will write our return value to stdout.
362                 return (tmpl.uuid, "success")
363             elif self.work_api == "containers":
364                 return (upload_workflow(self, tool, job_order,
365                                         self.project_uuid,
366                                         uuid=existing_uuid,
367                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
368                                         name=kwargs["name"]),
369                         "success")
370
371         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
372
373         kwargs["make_fs_access"] = make_fs_access
374         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
375         kwargs["use_container"] = True
376         kwargs["tmpdir_prefix"] = "tmp"
377         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
378
379         if self.work_api == "containers":
380             kwargs["outdir"] = "/var/spool/cwl"
381             kwargs["docker_outdir"] = "/var/spool/cwl"
382             kwargs["tmpdir"] = "/tmp"
383             kwargs["docker_tmpdir"] = "/tmp"
384         elif self.work_api == "jobs":
385             kwargs["outdir"] = "$(task.outdir)"
386             kwargs["docker_outdir"] = "$(task.outdir)"
387             kwargs["tmpdir"] = "$(task.tmpdir)"
388
389         runnerjob = None
390         if kwargs.get("submit"):
391             # Submit a runner job to run the workflow for us.
392             if self.work_api == "containers":
393                 if tool.tool["class"] == "CommandLineTool":
394                     kwargs["runnerjob"] = tool.tool["id"]
395                     upload_dependencies(self,
396                                         kwargs["name"],
397                                         tool.doc_loader,
398                                         tool.tool,
399                                         tool.tool["id"],
400                                         False)
401                     runnerjob = tool.job(job_order,
402                                          self.output_callback,
403                                          **kwargs).next()
404                 else:
405                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
406                                                 self.output_name,
407                                                 self.output_tags,
408                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
409                                                 name=kwargs.get("name"),
410                                                 on_error=kwargs.get("on_error"),
411                                                 submit_runner_image=kwargs.get("submit_runner_image"))
412             elif self.work_api == "jobs":
413                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
414                                       self.output_name,
415                                       self.output_tags,
416                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
417                                       name=kwargs.get("name"),
418                                       on_error=kwargs.get("on_error"),
419                                       submit_runner_image=kwargs.get("submit_runner_image"))
420
421         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
422             # Create pipeline for local run
423             self.pipeline = self.api.pipeline_instances().create(
424                 body={
425                     "owner_uuid": self.project_uuid,
426                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
427                     "components": {},
428                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
429             logger.info("Pipeline instance %s", self.pipeline["uuid"])
430
431         if runnerjob and not kwargs.get("wait"):
432             runnerjob.run(wait=kwargs.get("wait"))
433             return (runnerjob.uuid, "success")
434
435         self.poll_api = arvados.api('v1')
436         self.polling_thread = threading.Thread(target=self.poll_states)
437         self.polling_thread.start()
438
439         if runnerjob:
440             jobiter = iter((runnerjob,))
441         else:
442             if "cwl_runner_job" in kwargs:
443                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
444             jobiter = tool.job(job_order,
445                                self.output_callback,
446                                **kwargs)
447
448         try:
449             self.cond.acquire()
450             # Will continue to hold the lock for the duration of this code
451             # except when in cond.wait(), at which point on_message can update
452             # job state and process output callbacks.
453
454             loopperf = Perf(metrics, "jobiter")
455             loopperf.__enter__()
456             for runnable in jobiter:
457                 loopperf.__exit__()
458
459                 if self.stop_polling.is_set():
460                     break
461
462                 if runnable:
463                     with Perf(metrics, "run"):
464                         runnable.run(**kwargs)
465                 else:
466                     if self.processes:
467                         self.cond.wait(1)
468                     else:
469                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
470                         break
471                 loopperf.__enter__()
472             loopperf.__exit__()
473
474             while self.processes:
475                 self.cond.wait(1)
476
477         except UnsupportedRequirement:
478             raise
479         except:
480             if sys.exc_info()[0] is KeyboardInterrupt:
481                 logger.error("Interrupted, marking pipeline as failed")
482             else:
483                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
484             if self.pipeline:
485                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
486                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
487             if runnerjob and runnerjob.uuid and self.work_api == "containers":
488                 self.api.container_requests().update(uuid=runnerjob.uuid,
489                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
490         finally:
491             self.cond.release()
492             self.stop_polling.set()
493             self.polling_thread.join()
494
495         if self.final_status == "UnsupportedRequirement":
496             raise UnsupportedRequirement("Check log for details.")
497
498         if self.final_output is None:
499             raise WorkflowException("Workflow did not return a result.")
500
501         if kwargs.get("submit") and isinstance(runnerjob, Runner):
502             logger.info("Final output collection %s", runnerjob.final_output)
503         else:
504             if self.output_name is None:
505                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
506             if self.output_tags is None:
507                 self.output_tags = ""
508             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
509             self.set_crunch_output()
510
511         if kwargs.get("compute_checksum"):
512             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
513             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
514
515         return (self.final_output, self.final_status)
516
517
518 def versionstring():
519     """Print version string of key packages for provenance and debugging."""
520
521     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
522     arvpkg = pkg_resources.require("arvados-python-client")
523     cwlpkg = pkg_resources.require("cwltool")
524
525     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
526                                     "arvados-python-client", arvpkg[0].version,
527                                     "cwltool", cwlpkg[0].version)
528
529
530 def arg_parser():  # type: () -> argparse.ArgumentParser
531     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
532
533     parser.add_argument("--basedir", type=str,
534                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
535     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
536                         help="Output directory, default current directory")
537
538     parser.add_argument("--eval-timeout",
539                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
540                         type=float,
541                         default=20)
542
543     exgroup = parser.add_mutually_exclusive_group()
544     exgroup.add_argument("--print-dot", action="store_true",
545                          help="Print workflow visualization in graphviz format and exit")
546     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
547     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
548
549     exgroup = parser.add_mutually_exclusive_group()
550     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
551     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
552     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
553
554     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
555
556     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
557
558     exgroup = parser.add_mutually_exclusive_group()
559     exgroup.add_argument("--enable-reuse", action="store_true",
560                         default=True, dest="enable_reuse",
561                         help="")
562     exgroup.add_argument("--disable-reuse", action="store_false",
563                         default=True, dest="enable_reuse",
564                         help="")
565
566     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
567     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
568     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
569     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
570                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
571                         default=False)
572
573     exgroup = parser.add_mutually_exclusive_group()
574     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
575                         default=True, dest="submit")
576     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
577                         default=True, dest="submit")
578     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
579                          dest="create_workflow")
580     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
581     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
582
583     exgroup = parser.add_mutually_exclusive_group()
584     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
585                         default=True, dest="wait")
586     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
587                         default=True, dest="wait")
588
589     exgroup = parser.add_mutually_exclusive_group()
590     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
591                         default=True, dest="log_timestamps")
592     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
593                         default=True, dest="log_timestamps")
594
595     parser.add_argument("--api", type=str,
596                         default=None, dest="work_api",
597                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
598
599     parser.add_argument("--compute-checksum", action="store_true", default=False,
600                         help="Compute checksum of contents while collecting outputs",
601                         dest="compute_checksum")
602
603     parser.add_argument("--submit-runner-ram", type=int,
604                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
605                         default=1024)
606
607     parser.add_argument("--submit-runner-image", type=str,
608                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
609                         default=None)
610
611     parser.add_argument("--name", type=str,
612                         help="Name to use for workflow execution instance.",
613                         default=None)
614
615     parser.add_argument("--on-error", type=str,
616                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
617                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
618
619     parser.add_argument("--enable-dev", action="store_true",
620                         help="Enable loading and running development versions "
621                              "of CWL spec.", default=False)
622
623     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
624     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
625
626     return parser
627
628 def add_arv_hints():
629     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
630     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
631     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
632     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
633     res.close()
634     cwltool.process.supportedProcessRequirements.extend([
635         "http://arvados.org/cwl#RunInSingleContainer",
636         "http://arvados.org/cwl#OutputDirType",
637         "http://arvados.org/cwl#RuntimeConstraints",
638         "http://arvados.org/cwl#PartitionRequirement",
639         "http://arvados.org/cwl#APIRequirement",
640         "http://commonwl.org/cwltool#LoadListingRequirement"
641     ])
642
643 def main(args, stdout, stderr, api_client=None, keep_client=None):
644     parser = arg_parser()
645
646     job_order_object = None
647     arvargs = parser.parse_args(args)
648
649     if arvargs.version:
650         print versionstring()
651         return
652
653     if arvargs.update_workflow:
654         if arvargs.update_workflow.find('-7fd4e-') == 5:
655             want_api = 'containers'
656         elif arvargs.update_workflow.find('-p5p6p-') == 5:
657             want_api = 'jobs'
658         else:
659             want_api = None
660         if want_api and arvargs.work_api and want_api != arvargs.work_api:
661             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
662                 arvargs.update_workflow, want_api, arvargs.work_api))
663             return 1
664         arvargs.work_api = want_api
665
666     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
667         job_order_object = ({}, "")
668
669     add_arv_hints()
670
671     try:
672         if api_client is None:
673             api_client=arvados.api('v1', model=OrderedJsonModel())
674         if keep_client is None:
675             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
676         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
677                               num_retries=4, output_name=arvargs.output_name,
678                               output_tags=arvargs.output_tags)
679     except Exception as e:
680         logger.error(e)
681         return 1
682
683     if arvargs.debug:
684         logger.setLevel(logging.DEBUG)
685         logging.getLogger('arvados').setLevel(logging.DEBUG)
686
687     if arvargs.quiet:
688         logger.setLevel(logging.WARN)
689         logging.getLogger('arvados').setLevel(logging.WARN)
690         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
691
692     if arvargs.metrics:
693         metrics.setLevel(logging.DEBUG)
694         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
695
696     if arvargs.log_timestamps:
697         arvados.log_handler.setFormatter(logging.Formatter(
698             '%(asctime)s %(name)s %(levelname)s: %(message)s',
699             '%Y-%m-%d %H:%M:%S'))
700     else:
701         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
702
703     arvargs.conformance_test = None
704     arvargs.use_container = True
705     arvargs.relax_path_checks = True
706     arvargs.validate = None
707
708     make_fs_access = partial(CollectionFsAccess,
709                            collection_cache=runner.collection_cache)
710
711     return cwltool.main.main(args=arvargs,
712                              stdout=stdout,
713                              stderr=stderr,
714                              executor=runner.arv_executor,
715                              makeTool=runner.arv_make_tool,
716                              versionfunc=versionstring,
717                              job_order_object=job_order_object,
718                              make_fs_access=make_fs_access,
719                              fetcher_constructor=partial(CollectionFetcher,
720                                                          api_client=api_client,
721                                                          fs_access=make_fs_access(""),
722                                                          num_retries=runner.num_retries),
723                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
724                              logger_handler=arvados.log_handler,
725                              custom_schema_callback=add_arv_hints)