11100: Propagate through to runner. Use intermediate_output_ttl consistently.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77         self.intermediate_output_ttl = 0
78         self.intermediate_output_collections = []
79
80         if keep_client is not None:
81             self.keep_client = keep_client
82         else:
83             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
84
85         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
86
87         self.work_api = None
88         expected_api = ["jobs", "containers"]
89         for api in expected_api:
90             try:
91                 methods = self.api._rootDesc.get('resources')[api]['methods']
92                 if ('httpMethod' in methods['create'] and
93                     (work_api == api or work_api is None)):
94                     self.work_api = api
95                     break
96             except KeyError:
97                 pass
98
99         if not self.work_api:
100             if work_api is None:
101                 raise Exception("No supported APIs")
102             else:
103                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
104
105     def arv_make_tool(self, toolpath_object, **kwargs):
106         kwargs["work_api"] = self.work_api
107         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
108                                                 api_client=self.api,
109                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
110                                                 num_retries=self.num_retries)
111         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
112             return ArvadosCommandTool(self, toolpath_object, **kwargs)
113         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
114             return ArvadosWorkflow(self, toolpath_object, **kwargs)
115         else:
116             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
117
118     def output_callback(self, out, processStatus):
119         if processStatus == "success":
120             logger.info("Overall process status is %s", processStatus)
121             if self.pipeline:
122                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
123                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
124         else:
125             logger.warn("Overall process status is %s", processStatus)
126             if self.pipeline:
127                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
128                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
129         self.final_status = processStatus
130         self.final_output = out
131
132     def on_message(self, event):
133         if "object_uuid" in event:
134             if event["object_uuid"] in self.processes and event["event_type"] == "update":
135                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
136                     uuid = event["object_uuid"]
137                     with self.lock:
138                         j = self.processes[uuid]
139                         logger.info("%s %s is Running", self.label(j), uuid)
140                         j.running = True
141                         j.update_pipeline_component(event["properties"]["new_attributes"])
142                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
143                     uuid = event["object_uuid"]
144                     try:
145                         self.cond.acquire()
146                         j = self.processes[uuid]
147                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
148                         with Perf(metrics, "done %s" % j.name):
149                             j.done(event["properties"]["new_attributes"])
150                         self.cond.notify()
151                     finally:
152                         self.cond.release()
153
154     def label(self, obj):
155         return "[%s %s]" % (self.work_api[0:-1], obj.name)
156
157     def poll_states(self):
158         """Poll status of jobs or containers listed in the processes dict.
159
160         Runs in a separate thread.
161         """
162
163         try:
164             while True:
165                 self.stop_polling.wait(15)
166                 if self.stop_polling.is_set():
167                     break
168                 with self.lock:
169                     keys = self.processes.keys()
170                 if not keys:
171                     continue
172
173                 if self.work_api == "containers":
174                     table = self.poll_api.container_requests()
175                 elif self.work_api == "jobs":
176                     table = self.poll_api.jobs()
177
178                 try:
179                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
180                 except Exception as e:
181                     logger.warn("Error checking states on API server: %s", e)
182                     continue
183
184                 for p in proc_states["items"]:
185                     self.on_message({
186                         "object_uuid": p["uuid"],
187                         "event_type": "update",
188                         "properties": {
189                             "new_attributes": p
190                         }
191                     })
192         except:
193             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
194             self.cond.acquire()
195             self.processes.clear()
196             self.cond.notify()
197             self.cond.release()
198         finally:
199             self.stop_polling.set()
200
201     def get_uploaded(self):
202         return self.uploaded.copy()
203
204     def add_uploaded(self, src, pair):
205         self.uploaded[src] = pair
206
207     def add_intermediate_output(self, uuid):
208         if uuid:
209             self.intermediate_output_collections.append(uuid)
210
211     def trash_intermediate_output(self):
212         logger.info("Cleaning up intermediate output collections")
213         for i in self.intermediate_output_collections:
214             try:
215                 self.api_client.collections().delete(uuid=i).execute(num_retries=self.num_retries)
216             except:
217                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
218             if sys.exc_info()[0] is KeyboardInterrupt:
219                 break
220
221     def check_features(self, obj):
222         if isinstance(obj, dict):
223             if obj.get("writable"):
224                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
225             if obj.get("class") == "DockerRequirement":
226                 if obj.get("dockerOutputDirectory"):
227                     # TODO: can be supported by containers API, but not jobs API.
228                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
229                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
230             for v in obj.itervalues():
231                 self.check_features(v)
232         elif isinstance(obj, list):
233             for i,v in enumerate(obj):
234                 with SourceLine(obj, i, UnsupportedRequirement):
235                     self.check_features(v)
236
237     def make_output_collection(self, name, tagsString, outputObj):
238         outputObj = copy.deepcopy(outputObj)
239
240         files = []
241         def capture(fileobj):
242             files.append(fileobj)
243
244         adjustDirObjs(outputObj, capture)
245         adjustFileObjs(outputObj, capture)
246
247         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
248
249         final = arvados.collection.Collection(api_client=self.api,
250                                               keep_client=self.keep_client,
251                                               num_retries=self.num_retries)
252
253         for k,v in generatemapper.items():
254             if k.startswith("_:"):
255                 if v.type == "Directory":
256                     continue
257                 if v.type == "CreateFile":
258                     with final.open(v.target, "wb") as f:
259                         f.write(v.resolved.encode("utf-8"))
260                     continue
261
262             if not k.startswith("keep:"):
263                 raise Exception("Output source is not in keep or a literal")
264             sp = k.split("/")
265             srccollection = sp[0][5:]
266             try:
267                 reader = self.collection_cache.get(srccollection)
268                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
269                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
270             except arvados.errors.ArgumentError as e:
271                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
272                 raise
273             except IOError as e:
274                 logger.warn("While preparing output collection: %s", e)
275
276         def rewrite(fileobj):
277             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
278             for k in ("basename", "listing", "contents"):
279                 if k in fileobj:
280                     del fileobj[k]
281
282         adjustDirObjs(outputObj, rewrite)
283         adjustFileObjs(outputObj, rewrite)
284
285         with final.open("cwl.output.json", "w") as f:
286             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
287
288         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
289
290         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
291                     final.api_response()["name"],
292                     final.manifest_locator())
293
294         final_uuid = final.manifest_locator()
295         tags = tagsString.split(',')
296         for tag in tags:
297              self.api.links().create(body={
298                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
299                 }).execute(num_retries=self.num_retries)
300
301         def finalcollection(fileobj):
302             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
303
304         adjustDirObjs(outputObj, finalcollection)
305         adjustFileObjs(outputObj, finalcollection)
306
307         return (outputObj, final)
308
309     def set_crunch_output(self):
310         if self.work_api == "containers":
311             try:
312                 current = self.api.containers().current().execute(num_retries=self.num_retries)
313             except ApiError as e:
314                 # Status code 404 just means we're not running in a container.
315                 if e.resp.status != 404:
316                     logger.info("Getting current container: %s", e)
317                 return
318             try:
319                 self.api.containers().update(uuid=current['uuid'],
320                                              body={
321                                                  'output': self.final_output_collection.portable_data_hash(),
322                                              }).execute(num_retries=self.num_retries)
323                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
324                                               body={
325                                                   'is_trashed': True
326                                               }).execute(num_retries=self.num_retries)
327             except Exception as e:
328                 logger.info("Setting container output: %s", e)
329         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
330             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
331                                    body={
332                                        'output': self.final_output_collection.portable_data_hash(),
333                                        'success': self.final_status == "success",
334                                        'progress':1.0
335                                    }).execute(num_retries=self.num_retries)
336
337     def arv_executor(self, tool, job_order, **kwargs):
338         self.debug = kwargs.get("debug")
339
340         tool.visit(self.check_features)
341
342         self.project_uuid = kwargs.get("project_uuid")
343         self.pipeline = None
344         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
345                                                                  collection_cache=self.collection_cache)
346         self.fs_access = make_fs_access(kwargs["basedir"])
347
348         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
349         if self.intermediate_output_ttl and self.work_api != "containers":
350             raise Exception("--intermediate-output-ttl is only supported when using the containers api.")
351
352         if not kwargs.get("name"):
353             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
354
355         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
356         # Also uploads docker images.
357         upload_workflow_deps(self, tool)
358
359         # Reload tool object which may have been updated by
360         # upload_workflow_deps
361         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
362                                   makeTool=self.arv_make_tool,
363                                   loader=tool.doc_loader,
364                                   avsc_names=tool.doc_schema,
365                                   metadata=tool.metadata)
366
367         # Upload local file references in the job order.
368         job_order = upload_job_order(self, "%s input" % kwargs["name"],
369                                      tool, job_order)
370
371         existing_uuid = kwargs.get("update_workflow")
372         if existing_uuid or kwargs.get("create_workflow"):
373             # Create a pipeline template or workflow record and exit.
374             if self.work_api == "jobs":
375                 tmpl = RunnerTemplate(self, tool, job_order,
376                                       kwargs.get("enable_reuse"),
377                                       uuid=existing_uuid,
378                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
379                                       name=kwargs["name"])
380                 tmpl.save()
381                 # cwltool.main will write our return value to stdout.
382                 return (tmpl.uuid, "success")
383             elif self.work_api == "containers":
384                 return (upload_workflow(self, tool, job_order,
385                                         self.project_uuid,
386                                         uuid=existing_uuid,
387                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
388                                         name=kwargs["name"]),
389                         "success")
390
391         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
392
393         kwargs["make_fs_access"] = make_fs_access
394         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
395         kwargs["use_container"] = True
396         kwargs["tmpdir_prefix"] = "tmp"
397         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
398
399         if self.work_api == "containers":
400             kwargs["outdir"] = "/var/spool/cwl"
401             kwargs["docker_outdir"] = "/var/spool/cwl"
402             kwargs["tmpdir"] = "/tmp"
403             kwargs["docker_tmpdir"] = "/tmp"
404         elif self.work_api == "jobs":
405             kwargs["outdir"] = "$(task.outdir)"
406             kwargs["docker_outdir"] = "$(task.outdir)"
407             kwargs["tmpdir"] = "$(task.tmpdir)"
408
409         runnerjob = None
410         if kwargs.get("submit"):
411             # Submit a runner job to run the workflow for us.
412             if self.work_api == "containers":
413                 if tool.tool["class"] == "CommandLineTool":
414                     kwargs["runnerjob"] = tool.tool["id"]
415                     upload_dependencies(self,
416                                         kwargs["name"],
417                                         tool.doc_loader,
418                                         tool.tool,
419                                         tool.tool["id"],
420                                         False)
421                     runnerjob = tool.job(job_order,
422                                          self.output_callback,
423                                          **kwargs).next()
424                 else:
425                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
426                                                 self.output_name,
427                                                 self.output_tags,
428                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
429                                                 name=kwargs.get("name"),
430                                                 on_error=kwargs.get("on_error"),
431                                                 submit_runner_image=kwargs.get("submit_runner_image"),
432                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
433             elif self.work_api == "jobs":
434                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
435                                       self.output_name,
436                                       self.output_tags,
437                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
438                                       name=kwargs.get("name"),
439                                       on_error=kwargs.get("on_error"),
440                                       submit_runner_image=kwargs.get("submit_runner_image"))
441
442         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
443             # Create pipeline for local run
444             self.pipeline = self.api.pipeline_instances().create(
445                 body={
446                     "owner_uuid": self.project_uuid,
447                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
448                     "components": {},
449                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
450             logger.info("Pipeline instance %s", self.pipeline["uuid"])
451
452         if runnerjob and not kwargs.get("wait"):
453             runnerjob.run(wait=kwargs.get("wait"))
454             return (runnerjob.uuid, "success")
455
456         self.poll_api = arvados.api('v1')
457         self.polling_thread = threading.Thread(target=self.poll_states)
458         self.polling_thread.start()
459
460         if runnerjob:
461             jobiter = iter((runnerjob,))
462         else:
463             if "cwl_runner_job" in kwargs:
464                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
465             jobiter = tool.job(job_order,
466                                self.output_callback,
467                                **kwargs)
468
469         try:
470             self.cond.acquire()
471             # Will continue to hold the lock for the duration of this code
472             # except when in cond.wait(), at which point on_message can update
473             # job state and process output callbacks.
474
475             loopperf = Perf(metrics, "jobiter")
476             loopperf.__enter__()
477             for runnable in jobiter:
478                 loopperf.__exit__()
479
480                 if self.stop_polling.is_set():
481                     break
482
483                 if runnable:
484                     with Perf(metrics, "run"):
485                         runnable.run(**kwargs)
486                 else:
487                     if self.processes:
488                         self.cond.wait(1)
489                     else:
490                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
491                         break
492                 loopperf.__enter__()
493             loopperf.__exit__()
494
495             while self.processes:
496                 self.cond.wait(1)
497
498         except UnsupportedRequirement:
499             raise
500         except:
501             if sys.exc_info()[0] is KeyboardInterrupt:
502                 logger.error("Interrupted, marking pipeline as failed")
503             else:
504                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
505             if self.pipeline:
506                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
507                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
508             if runnerjob and runnerjob.uuid and self.work_api == "containers":
509                 self.api.container_requests().update(uuid=runnerjob.uuid,
510                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
511         finally:
512             self.cond.release()
513             self.stop_polling.set()
514             self.polling_thread.join()
515
516         if self.final_status == "UnsupportedRequirement":
517             raise UnsupportedRequirement("Check log for details.")
518
519         if self.final_output is None:
520             raise WorkflowException("Workflow did not return a result.")
521
522         if kwargs.get("submit") and isinstance(runnerjob, Runner):
523             logger.info("Final output collection %s", runnerjob.final_output)
524         else:
525             if self.output_name is None:
526                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
527             if self.output_tags is None:
528                 self.output_tags = ""
529             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
530             self.set_crunch_output()
531
532         if kwargs.get("compute_checksum"):
533             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
534             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
535
536         if self.intermediate_output_ttl and self.final_status == "success":
537             self.trash_intermediate_output()
538
539         return (self.final_output, self.final_status)
540
541
542 def versionstring():
543     """Print version string of key packages for provenance and debugging."""
544
545     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
546     arvpkg = pkg_resources.require("arvados-python-client")
547     cwlpkg = pkg_resources.require("cwltool")
548
549     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
550                                     "arvados-python-client", arvpkg[0].version,
551                                     "cwltool", cwlpkg[0].version)
552
553
554 def arg_parser():  # type: () -> argparse.ArgumentParser
555     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
556
557     parser.add_argument("--basedir", type=str,
558                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
559     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
560                         help="Output directory, default current directory")
561
562     parser.add_argument("--eval-timeout",
563                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
564                         type=float,
565                         default=20)
566
567     exgroup = parser.add_mutually_exclusive_group()
568     exgroup.add_argument("--print-dot", action="store_true",
569                          help="Print workflow visualization in graphviz format and exit")
570     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
571     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
572
573     exgroup = parser.add_mutually_exclusive_group()
574     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
575     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
576     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
577
578     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
579
580     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
581
582     exgroup = parser.add_mutually_exclusive_group()
583     exgroup.add_argument("--enable-reuse", action="store_true",
584                         default=True, dest="enable_reuse",
585                         help="")
586     exgroup.add_argument("--disable-reuse", action="store_false",
587                         default=True, dest="enable_reuse",
588                         help="")
589
590     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
591     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
592     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
593     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
594                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
595                         default=False)
596
597     exgroup = parser.add_mutually_exclusive_group()
598     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
599                         default=True, dest="submit")
600     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
601                         default=True, dest="submit")
602     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
603                          dest="create_workflow")
604     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
605     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
606
607     exgroup = parser.add_mutually_exclusive_group()
608     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
609                         default=True, dest="wait")
610     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
611                         default=True, dest="wait")
612
613     exgroup = parser.add_mutually_exclusive_group()
614     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
615                         default=True, dest="log_timestamps")
616     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
617                         default=True, dest="log_timestamps")
618
619     parser.add_argument("--api", type=str,
620                         default=None, dest="work_api",
621                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
622
623     parser.add_argument("--compute-checksum", action="store_true", default=False,
624                         help="Compute checksum of contents while collecting outputs",
625                         dest="compute_checksum")
626
627     parser.add_argument("--submit-runner-ram", type=int,
628                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
629                         default=1024)
630
631     parser.add_argument("--submit-runner-image", type=str,
632                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
633                         default=None)
634
635     parser.add_argument("--name", type=str,
636                         help="Name to use for workflow execution instance.",
637                         default=None)
638
639     parser.add_argument("--on-error", type=str,
640                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
641                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
642
643     parser.add_argument("--enable-dev", action="store_true",
644                         help="Enable loading and running development versions "
645                              "of CWL spec.", default=False)
646     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
647                         help="If N > 0, intermediate output collections will be trashed N seconds after creation, or on successful completion of workflow (whichever comes first).",
648                         default=0)
649
650     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
651     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
652
653     return parser
654
655 def add_arv_hints():
656     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
657     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
658     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
659     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
660     res.close()
661     cwltool.process.supportedProcessRequirements.extend([
662         "http://arvados.org/cwl#RunInSingleContainer",
663         "http://arvados.org/cwl#OutputDirType",
664         "http://arvados.org/cwl#RuntimeConstraints",
665         "http://arvados.org/cwl#PartitionRequirement",
666         "http://arvados.org/cwl#APIRequirement",
667         "http://commonwl.org/cwltool#LoadListingRequirement"
668     ])
669
670 def main(args, stdout, stderr, api_client=None, keep_client=None):
671     parser = arg_parser()
672
673     job_order_object = None
674     arvargs = parser.parse_args(args)
675
676     if arvargs.version:
677         print versionstring()
678         return
679
680     if arvargs.update_workflow:
681         if arvargs.update_workflow.find('-7fd4e-') == 5:
682             want_api = 'containers'
683         elif arvargs.update_workflow.find('-p5p6p-') == 5:
684             want_api = 'jobs'
685         else:
686             want_api = None
687         if want_api and arvargs.work_api and want_api != arvargs.work_api:
688             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
689                 arvargs.update_workflow, want_api, arvargs.work_api))
690             return 1
691         arvargs.work_api = want_api
692
693     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
694         job_order_object = ({}, "")
695
696     add_arv_hints()
697
698     try:
699         if api_client is None:
700             api_client=arvados.api('v1', model=OrderedJsonModel())
701         if keep_client is None:
702             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
703         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
704                               num_retries=4, output_name=arvargs.output_name,
705                               output_tags=arvargs.output_tags)
706     except Exception as e:
707         logger.error(e)
708         return 1
709
710     if arvargs.debug:
711         logger.setLevel(logging.DEBUG)
712         logging.getLogger('arvados').setLevel(logging.DEBUG)
713
714     if arvargs.quiet:
715         logger.setLevel(logging.WARN)
716         logging.getLogger('arvados').setLevel(logging.WARN)
717         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
718
719     if arvargs.metrics:
720         metrics.setLevel(logging.DEBUG)
721         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
722
723     if arvargs.log_timestamps:
724         arvados.log_handler.setFormatter(logging.Formatter(
725             '%(asctime)s %(name)s %(levelname)s: %(message)s',
726             '%Y-%m-%d %H:%M:%S'))
727     else:
728         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
729
730     arvargs.conformance_test = None
731     arvargs.use_container = True
732     arvargs.relax_path_checks = True
733     arvargs.validate = None
734
735     make_fs_access = partial(CollectionFsAccess,
736                            collection_cache=runner.collection_cache)
737
738     return cwltool.main.main(args=arvargs,
739                              stdout=stdout,
740                              stderr=stderr,
741                              executor=runner.arv_executor,
742                              makeTool=runner.arv_make_tool,
743                              versionfunc=versionstring,
744                              job_order_object=job_order_object,
745                              make_fs_access=make_fs_access,
746                              fetcher_constructor=partial(CollectionFetcher,
747                                                          api_client=api_client,
748                                                          fs_access=make_fs_access(""),
749                                                          num_retries=runner.num_retries),
750                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
751                              logger_handler=arvados.log_handler,
752                              custom_schema_callback=add_arv_hints)