11684: Merge branch 'master' into 11684-unsigned-locator-fix
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77         self.intermediate_output_ttl = 0
78         self.intermediate_output_collections = []
79         self.trash_intermediate = False
80
81         if keep_client is not None:
82             self.keep_client = keep_client
83         else:
84             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
85
86         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
87
88         self.work_api = None
89         expected_api = ["jobs", "containers"]
90         for api in expected_api:
91             try:
92                 methods = self.api._rootDesc.get('resources')[api]['methods']
93                 if ('httpMethod' in methods['create'] and
94                     (work_api == api or work_api is None)):
95                     self.work_api = api
96                     break
97             except KeyError:
98                 pass
99
100         if not self.work_api:
101             if work_api is None:
102                 raise Exception("No supported APIs")
103             else:
104                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
105
106     def arv_make_tool(self, toolpath_object, **kwargs):
107         kwargs["work_api"] = self.work_api
108         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
109                                                 api_client=self.api,
110                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
111                                                 num_retries=self.num_retries,
112                                                 overrides=kwargs.get("override_tools"))
113         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
114             return ArvadosCommandTool(self, toolpath_object, **kwargs)
115         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
116             return ArvadosWorkflow(self, toolpath_object, **kwargs)
117         else:
118             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
119
120     def output_callback(self, out, processStatus):
121         if processStatus == "success":
122             logger.info("Overall process status is %s", processStatus)
123             if self.pipeline:
124                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
125                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
126         else:
127             logger.warn("Overall process status is %s", processStatus)
128             if self.pipeline:
129                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
131         self.final_status = processStatus
132         self.final_output = out
133
134     def on_message(self, event):
135         if "object_uuid" in event:
136             if event["object_uuid"] in self.processes and event["event_type"] == "update":
137                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
138                     uuid = event["object_uuid"]
139                     with self.lock:
140                         j = self.processes[uuid]
141                         logger.info("%s %s is Running", self.label(j), uuid)
142                         j.running = True
143                         j.update_pipeline_component(event["properties"]["new_attributes"])
144                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
145                     uuid = event["object_uuid"]
146                     try:
147                         self.cond.acquire()
148                         j = self.processes[uuid]
149                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
150                         with Perf(metrics, "done %s" % j.name):
151                             j.done(event["properties"]["new_attributes"])
152                         self.cond.notify()
153                     finally:
154                         self.cond.release()
155
156     def label(self, obj):
157         return "[%s %s]" % (self.work_api[0:-1], obj.name)
158
159     def poll_states(self):
160         """Poll status of jobs or containers listed in the processes dict.
161
162         Runs in a separate thread.
163         """
164
165         try:
166             while True:
167                 self.stop_polling.wait(15)
168                 if self.stop_polling.is_set():
169                     break
170                 with self.lock:
171                     keys = self.processes.keys()
172                 if not keys:
173                     continue
174
175                 if self.work_api == "containers":
176                     table = self.poll_api.container_requests()
177                 elif self.work_api == "jobs":
178                     table = self.poll_api.jobs()
179
180                 try:
181                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
182                 except Exception as e:
183                     logger.warn("Error checking states on API server: %s", e)
184                     continue
185
186                 for p in proc_states["items"]:
187                     self.on_message({
188                         "object_uuid": p["uuid"],
189                         "event_type": "update",
190                         "properties": {
191                             "new_attributes": p
192                         }
193                     })
194         except:
195             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
196             self.cond.acquire()
197             self.processes.clear()
198             self.cond.notify()
199             self.cond.release()
200         finally:
201             self.stop_polling.set()
202
203     def get_uploaded(self):
204         return self.uploaded.copy()
205
206     def add_uploaded(self, src, pair):
207         self.uploaded[src] = pair
208
209     def add_intermediate_output(self, uuid):
210         if uuid:
211             self.intermediate_output_collections.append(uuid)
212
213     def trash_intermediate_output(self):
214         logger.info("Cleaning up intermediate output collections")
215         for i in self.intermediate_output_collections:
216             try:
217                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
218             except:
219                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
220             if sys.exc_info()[0] is KeyboardInterrupt:
221                 break
222
223     def check_features(self, obj):
224         if isinstance(obj, dict):
225             if obj.get("writable"):
226                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
227             if obj.get("class") == "DockerRequirement":
228                 if obj.get("dockerOutputDirectory"):
229                     # TODO: can be supported by containers API, but not jobs API.
230                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
231                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
232             for v in obj.itervalues():
233                 self.check_features(v)
234         elif isinstance(obj, list):
235             for i,v in enumerate(obj):
236                 with SourceLine(obj, i, UnsupportedRequirement):
237                     self.check_features(v)
238
239     def make_output_collection(self, name, tagsString, outputObj):
240         outputObj = copy.deepcopy(outputObj)
241
242         files = []
243         def capture(fileobj):
244             files.append(fileobj)
245
246         adjustDirObjs(outputObj, capture)
247         adjustFileObjs(outputObj, capture)
248
249         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
250
251         final = arvados.collection.Collection(api_client=self.api,
252                                               keep_client=self.keep_client,
253                                               num_retries=self.num_retries)
254
255         for k,v in generatemapper.items():
256             if k.startswith("_:"):
257                 if v.type == "Directory":
258                     continue
259                 if v.type == "CreateFile":
260                     with final.open(v.target, "wb") as f:
261                         f.write(v.resolved.encode("utf-8"))
262                     continue
263
264             if not k.startswith("keep:"):
265                 raise Exception("Output source is not in keep or a literal")
266             sp = k.split("/")
267             srccollection = sp[0][5:]
268             try:
269                 reader = self.collection_cache.get(srccollection)
270                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
271                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
272             except arvados.errors.ArgumentError as e:
273                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
274                 raise
275             except IOError as e:
276                 logger.warn("While preparing output collection: %s", e)
277
278         def rewrite(fileobj):
279             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
280             for k in ("basename", "listing", "contents"):
281                 if k in fileobj:
282                     del fileobj[k]
283
284         adjustDirObjs(outputObj, rewrite)
285         adjustFileObjs(outputObj, rewrite)
286
287         with final.open("cwl.output.json", "w") as f:
288             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
289
290         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
291
292         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
293                     final.api_response()["name"],
294                     final.manifest_locator())
295
296         final_uuid = final.manifest_locator()
297         tags = tagsString.split(',')
298         for tag in tags:
299              self.api.links().create(body={
300                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
301                 }).execute(num_retries=self.num_retries)
302
303         def finalcollection(fileobj):
304             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
305
306         adjustDirObjs(outputObj, finalcollection)
307         adjustFileObjs(outputObj, finalcollection)
308
309         return (outputObj, final)
310
311     def set_crunch_output(self):
312         if self.work_api == "containers":
313             try:
314                 current = self.api.containers().current().execute(num_retries=self.num_retries)
315             except ApiError as e:
316                 # Status code 404 just means we're not running in a container.
317                 if e.resp.status != 404:
318                     logger.info("Getting current container: %s", e)
319                 return
320             try:
321                 self.api.containers().update(uuid=current['uuid'],
322                                              body={
323                                                  'output': self.final_output_collection.portable_data_hash(),
324                                              }).execute(num_retries=self.num_retries)
325                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
326                                               body={
327                                                   'is_trashed': True
328                                               }).execute(num_retries=self.num_retries)
329             except Exception as e:
330                 logger.info("Setting container output: %s", e)
331         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
332             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
333                                    body={
334                                        'output': self.final_output_collection.portable_data_hash(),
335                                        'success': self.final_status == "success",
336                                        'progress':1.0
337                                    }).execute(num_retries=self.num_retries)
338
339     def arv_executor(self, tool, job_order, **kwargs):
340         self.debug = kwargs.get("debug")
341
342         tool.visit(self.check_features)
343
344         self.project_uuid = kwargs.get("project_uuid")
345         self.pipeline = None
346         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
347                                                                  collection_cache=self.collection_cache)
348         self.fs_access = make_fs_access(kwargs["basedir"])
349
350
351         self.trash_intermediate = kwargs["trash_intermediate"]
352         if self.trash_intermediate and self.work_api != "containers":
353             raise Exception("--trash-intermediate is only supported with --api=containers.")
354
355         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
356         if self.intermediate_output_ttl and self.work_api != "containers":
357             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
358         if self.intermediate_output_ttl < 0:
359             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
360
361         if not kwargs.get("name"):
362             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
363
364         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
365         # Also uploads docker images.
366         override_tools = {}
367         upload_workflow_deps(self, tool, override_tools)
368
369         # Reload tool object which may have been updated by
370         # upload_workflow_deps
371         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
372                                   makeTool=self.arv_make_tool,
373                                   loader=tool.doc_loader,
374                                   avsc_names=tool.doc_schema,
375                                   metadata=tool.metadata,
376                                   override_tools=override_tools)
377
378         # Upload local file references in the job order.
379         job_order = upload_job_order(self, "%s input" % kwargs["name"],
380                                      tool, job_order)
381
382         existing_uuid = kwargs.get("update_workflow")
383         if existing_uuid or kwargs.get("create_workflow"):
384             # Create a pipeline template or workflow record and exit.
385             if self.work_api == "jobs":
386                 tmpl = RunnerTemplate(self, tool, job_order,
387                                       kwargs.get("enable_reuse"),
388                                       uuid=existing_uuid,
389                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
390                                       name=kwargs["name"])
391                 tmpl.save()
392                 # cwltool.main will write our return value to stdout.
393                 return (tmpl.uuid, "success")
394             elif self.work_api == "containers":
395                 return (upload_workflow(self, tool, job_order,
396                                         self.project_uuid,
397                                         uuid=existing_uuid,
398                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
399                                         name=kwargs["name"]),
400                         "success")
401
402         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
403
404         kwargs["make_fs_access"] = make_fs_access
405         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
406         kwargs["use_container"] = True
407         kwargs["tmpdir_prefix"] = "tmp"
408         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
409
410         if self.work_api == "containers":
411             kwargs["outdir"] = "/var/spool/cwl"
412             kwargs["docker_outdir"] = "/var/spool/cwl"
413             kwargs["tmpdir"] = "/tmp"
414             kwargs["docker_tmpdir"] = "/tmp"
415         elif self.work_api == "jobs":
416             kwargs["outdir"] = "$(task.outdir)"
417             kwargs["docker_outdir"] = "$(task.outdir)"
418             kwargs["tmpdir"] = "$(task.tmpdir)"
419
420         runnerjob = None
421         if kwargs.get("submit"):
422             # Submit a runner job to run the workflow for us.
423             if self.work_api == "containers":
424                 if tool.tool["class"] == "CommandLineTool":
425                     kwargs["runnerjob"] = tool.tool["id"]
426                     upload_dependencies(self,
427                                         kwargs["name"],
428                                         tool.doc_loader,
429                                         tool.tool,
430                                         tool.tool["id"],
431                                         False)
432                     runnerjob = tool.job(job_order,
433                                          self.output_callback,
434                                          **kwargs).next()
435                 else:
436                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
437                                                 self.output_name,
438                                                 self.output_tags,
439                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
440                                                 name=kwargs.get("name"),
441                                                 on_error=kwargs.get("on_error"),
442                                                 submit_runner_image=kwargs.get("submit_runner_image"),
443                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
444             elif self.work_api == "jobs":
445                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
446                                       self.output_name,
447                                       self.output_tags,
448                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
449                                       name=kwargs.get("name"),
450                                       on_error=kwargs.get("on_error"),
451                                       submit_runner_image=kwargs.get("submit_runner_image"))
452
453         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
454             # Create pipeline for local run
455             self.pipeline = self.api.pipeline_instances().create(
456                 body={
457                     "owner_uuid": self.project_uuid,
458                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
459                     "components": {},
460                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
461             logger.info("Pipeline instance %s", self.pipeline["uuid"])
462
463         if runnerjob and not kwargs.get("wait"):
464             runnerjob.run(wait=kwargs.get("wait"))
465             return (runnerjob.uuid, "success")
466
467         self.poll_api = arvados.api('v1')
468         self.polling_thread = threading.Thread(target=self.poll_states)
469         self.polling_thread.start()
470
471         if runnerjob:
472             jobiter = iter((runnerjob,))
473         else:
474             if "cwl_runner_job" in kwargs:
475                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
476             jobiter = tool.job(job_order,
477                                self.output_callback,
478                                **kwargs)
479
480         try:
481             self.cond.acquire()
482             # Will continue to hold the lock for the duration of this code
483             # except when in cond.wait(), at which point on_message can update
484             # job state and process output callbacks.
485
486             loopperf = Perf(metrics, "jobiter")
487             loopperf.__enter__()
488             for runnable in jobiter:
489                 loopperf.__exit__()
490
491                 if self.stop_polling.is_set():
492                     break
493
494                 if runnable:
495                     with Perf(metrics, "run"):
496                         runnable.run(**kwargs)
497                 else:
498                     if self.processes:
499                         self.cond.wait(1)
500                     else:
501                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
502                         break
503                 loopperf.__enter__()
504             loopperf.__exit__()
505
506             while self.processes:
507                 self.cond.wait(1)
508
509         except UnsupportedRequirement:
510             raise
511         except:
512             if sys.exc_info()[0] is KeyboardInterrupt:
513                 logger.error("Interrupted, marking pipeline as failed")
514             else:
515                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
516             if self.pipeline:
517                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
518                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
519             if runnerjob and runnerjob.uuid and self.work_api == "containers":
520                 self.api.container_requests().update(uuid=runnerjob.uuid,
521                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
522         finally:
523             self.cond.release()
524             self.stop_polling.set()
525             self.polling_thread.join()
526
527         if self.final_status == "UnsupportedRequirement":
528             raise UnsupportedRequirement("Check log for details.")
529
530         if self.final_output is None:
531             raise WorkflowException("Workflow did not return a result.")
532
533         if kwargs.get("submit") and isinstance(runnerjob, Runner):
534             logger.info("Final output collection %s", runnerjob.final_output)
535         else:
536             if self.output_name is None:
537                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
538             if self.output_tags is None:
539                 self.output_tags = ""
540             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
541             self.set_crunch_output()
542
543         if kwargs.get("compute_checksum"):
544             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
545             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
546
547         if self.trash_intermediate and self.final_status == "success":
548             self.trash_intermediate_output()
549
550         return (self.final_output, self.final_status)
551
552
553 def versionstring():
554     """Print version string of key packages for provenance and debugging."""
555
556     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
557     arvpkg = pkg_resources.require("arvados-python-client")
558     cwlpkg = pkg_resources.require("cwltool")
559
560     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
561                                     "arvados-python-client", arvpkg[0].version,
562                                     "cwltool", cwlpkg[0].version)
563
564
565 def arg_parser():  # type: () -> argparse.ArgumentParser
566     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
567
568     parser.add_argument("--basedir", type=str,
569                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
570     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
571                         help="Output directory, default current directory")
572
573     parser.add_argument("--eval-timeout",
574                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
575                         type=float,
576                         default=20)
577
578     exgroup = parser.add_mutually_exclusive_group()
579     exgroup.add_argument("--print-dot", action="store_true",
580                          help="Print workflow visualization in graphviz format and exit")
581     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
582     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
583
584     exgroup = parser.add_mutually_exclusive_group()
585     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
586     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
587     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
588
589     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
590
591     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
592
593     exgroup = parser.add_mutually_exclusive_group()
594     exgroup.add_argument("--enable-reuse", action="store_true",
595                         default=True, dest="enable_reuse",
596                         help="Enable job or container reuse (default)")
597     exgroup.add_argument("--disable-reuse", action="store_false",
598                         default=True, dest="enable_reuse",
599                         help="Disable job or container reuse")
600
601     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
602     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
603     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
604     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
605                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
606                         default=False)
607
608     exgroup = parser.add_mutually_exclusive_group()
609     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
610                         default=True, dest="submit")
611     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
612                         default=True, dest="submit")
613     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
614                          dest="create_workflow")
615     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
616     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
617
618     exgroup = parser.add_mutually_exclusive_group()
619     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
620                         default=True, dest="wait")
621     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
622                         default=True, dest="wait")
623
624     exgroup = parser.add_mutually_exclusive_group()
625     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
626                         default=True, dest="log_timestamps")
627     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
628                         default=True, dest="log_timestamps")
629
630     parser.add_argument("--api", type=str,
631                         default=None, dest="work_api",
632                         choices=("jobs", "containers"),
633                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
634
635     parser.add_argument("--compute-checksum", action="store_true", default=False,
636                         help="Compute checksum of contents while collecting outputs",
637                         dest="compute_checksum")
638
639     parser.add_argument("--submit-runner-ram", type=int,
640                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
641                         default=1024)
642
643     parser.add_argument("--submit-runner-image", type=str,
644                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
645                         default=None)
646
647     parser.add_argument("--name", type=str,
648                         help="Name to use for workflow execution instance.",
649                         default=None)
650
651     parser.add_argument("--on-error", type=str,
652                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
653                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
654
655     parser.add_argument("--enable-dev", action="store_true",
656                         help="Enable loading and running development versions "
657                              "of CWL spec.", default=False)
658
659     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
660                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
661                         default=0)
662
663     exgroup = parser.add_mutually_exclusive_group()
664     exgroup.add_argument("--trash-intermediate", action="store_true",
665                         default=False, dest="trash_intermediate",
666                          help="Immediately trash intermediate outputs on workflow success.")
667     exgroup.add_argument("--no-trash-intermediate", action="store_false",
668                         default=False, dest="trash_intermediate",
669                         help="Do not trash intermediate outputs (default).")
670
671     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
672     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
673
674     return parser
675
676 def add_arv_hints():
677     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
678     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
679     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
680     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
681     res.close()
682     cwltool.process.supportedProcessRequirements.extend([
683         "http://arvados.org/cwl#RunInSingleContainer",
684         "http://arvados.org/cwl#OutputDirType",
685         "http://arvados.org/cwl#RuntimeConstraints",
686         "http://arvados.org/cwl#PartitionRequirement",
687         "http://arvados.org/cwl#APIRequirement",
688         "http://commonwl.org/cwltool#LoadListingRequirement",
689         "http://arvados.org/cwl#IntermediateOutput"
690     ])
691
692 def main(args, stdout, stderr, api_client=None, keep_client=None):
693     parser = arg_parser()
694
695     job_order_object = None
696     arvargs = parser.parse_args(args)
697
698     if arvargs.version:
699         print versionstring()
700         return
701
702     if arvargs.update_workflow:
703         if arvargs.update_workflow.find('-7fd4e-') == 5:
704             want_api = 'containers'
705         elif arvargs.update_workflow.find('-p5p6p-') == 5:
706             want_api = 'jobs'
707         else:
708             want_api = None
709         if want_api and arvargs.work_api and want_api != arvargs.work_api:
710             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
711                 arvargs.update_workflow, want_api, arvargs.work_api))
712             return 1
713         arvargs.work_api = want_api
714
715     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
716         job_order_object = ({}, "")
717
718     add_arv_hints()
719
720     try:
721         if api_client is None:
722             api_client=arvados.api('v1', model=OrderedJsonModel())
723         if keep_client is None:
724             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
725         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
726                               num_retries=4, output_name=arvargs.output_name,
727                               output_tags=arvargs.output_tags)
728     except Exception as e:
729         logger.error(e)
730         return 1
731
732     if arvargs.debug:
733         logger.setLevel(logging.DEBUG)
734         logging.getLogger('arvados').setLevel(logging.DEBUG)
735
736     if arvargs.quiet:
737         logger.setLevel(logging.WARN)
738         logging.getLogger('arvados').setLevel(logging.WARN)
739         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
740
741     if arvargs.metrics:
742         metrics.setLevel(logging.DEBUG)
743         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
744
745     if arvargs.log_timestamps:
746         arvados.log_handler.setFormatter(logging.Formatter(
747             '%(asctime)s %(name)s %(levelname)s: %(message)s',
748             '%Y-%m-%d %H:%M:%S'))
749     else:
750         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
751
752     arvargs.conformance_test = None
753     arvargs.use_container = True
754     arvargs.relax_path_checks = True
755     arvargs.validate = None
756
757     make_fs_access = partial(CollectionFsAccess,
758                            collection_cache=runner.collection_cache)
759
760     return cwltool.main.main(args=arvargs,
761                              stdout=stdout,
762                              stderr=stderr,
763                              executor=runner.arv_executor,
764                              makeTool=runner.arv_make_tool,
765                              versionfunc=versionstring,
766                              job_order_object=job_order_object,
767                              make_fs_access=make_fs_access,
768                              fetcher_constructor=partial(CollectionFetcher,
769                                                          api_client=api_client,
770                                                          fs_access=make_fs_access(""),
771                                                          num_retries=runner.num_retries),
772                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
773                              logger_handler=arvados.log_handler,
774                              custom_schema_callback=add_arv_hints)