12199: Merge branch 'master' into 12199-dispatch-to-node-type
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20
21 from cwltool.errors import WorkflowException
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 from .arvcontainer import ArvadosContainer, RunnerContainer
33 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
34 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
35 from .arvtool import ArvadosCommandTool
36 from .arvworkflow import ArvadosWorkflow, upload_workflow
37 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
38 from .perf import Perf
39 from .pathmapper import NoFollowPathMapper
40 from ._version import __version__
41
42 from cwltool.pack import pack
43 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
44 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
45 from cwltool.draft2tool import compute_checksums
46 from arvados.api import OrderedJsonModel
47
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50 logger.setLevel(logging.INFO)
51
52 arvados.log_handler.setFormatter(logging.Formatter(
53         '%(asctime)s %(name)s %(levelname)s: %(message)s',
54         '%Y-%m-%d %H:%M:%S'))
55
56 class ArvCwlRunner(object):
57     """Execute a CWL tool or workflow, submit work (using either jobs or
58     containers API), wait for them to complete, and report output.
59
60     """
61
62     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63         self.api = api_client
64         self.processes = {}
65         self.lock = threading.Lock()
66         self.cond = threading.Condition(self.lock)
67         self.final_output = None
68         self.final_status = None
69         self.uploaded = {}
70         self.num_retries = num_retries
71         self.uuid = None
72         self.stop_polling = threading.Event()
73         self.poll_api = None
74         self.pipeline = None
75         self.final_output_collection = None
76         self.output_name = output_name
77         self.output_tags = output_tags
78         self.project_uuid = None
79         self.intermediate_output_ttl = 0
80         self.intermediate_output_collections = []
81         self.trash_intermediate = False
82
83         if keep_client is not None:
84             self.keep_client = keep_client
85         else:
86             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
87
88         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
89
90         self.work_api = None
91         expected_api = ["jobs", "containers"]
92         for api in expected_api:
93             try:
94                 methods = self.api._rootDesc.get('resources')[api]['methods']
95                 if ('httpMethod' in methods['create'] and
96                     (work_api == api or work_api is None)):
97                     self.work_api = api
98                     break
99             except KeyError:
100                 pass
101
102         if not self.work_api:
103             if work_api is None:
104                 raise Exception("No supported APIs")
105             else:
106                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
107
108     def arv_make_tool(self, toolpath_object, **kwargs):
109         kwargs["work_api"] = self.work_api
110         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
111                                                 api_client=self.api,
112                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
113                                                 num_retries=self.num_retries)
114         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
115         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
116             return ArvadosCommandTool(self, toolpath_object, **kwargs)
117         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
118             return ArvadosWorkflow(self, toolpath_object, **kwargs)
119         else:
120             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
121
122     def output_callback(self, out, processStatus):
123         if processStatus == "success":
124             logger.info("Overall process status is %s", processStatus)
125             if self.pipeline:
126                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
127                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
128         else:
129             logger.warn("Overall process status is %s", processStatus)
130             if self.pipeline:
131                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
132                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
133         self.final_status = processStatus
134         self.final_output = out
135
136     def on_message(self, event):
137         if "object_uuid" in event:
138             if event["object_uuid"] in self.processes and event["event_type"] == "update":
139                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
140                     uuid = event["object_uuid"]
141                     with self.lock:
142                         j = self.processes[uuid]
143                         logger.info("%s %s is Running", self.label(j), uuid)
144                         j.running = True
145                         j.update_pipeline_component(event["properties"]["new_attributes"])
146                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
147                     uuid = event["object_uuid"]
148                     try:
149                         self.cond.acquire()
150                         j = self.processes[uuid]
151                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
152                         with Perf(metrics, "done %s" % j.name):
153                             j.done(event["properties"]["new_attributes"])
154                         self.cond.notify()
155                     finally:
156                         self.cond.release()
157
158     def label(self, obj):
159         return "[%s %s]" % (self.work_api[0:-1], obj.name)
160
161     def poll_states(self):
162         """Poll status of jobs or containers listed in the processes dict.
163
164         Runs in a separate thread.
165         """
166
167         try:
168             while True:
169                 self.stop_polling.wait(15)
170                 if self.stop_polling.is_set():
171                     break
172                 with self.lock:
173                     keys = self.processes.keys()
174                 if not keys:
175                     continue
176
177                 if self.work_api == "containers":
178                     table = self.poll_api.container_requests()
179                 elif self.work_api == "jobs":
180                     table = self.poll_api.jobs()
181
182                 try:
183                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
184                 except Exception as e:
185                     logger.warn("Error checking states on API server: %s", e)
186                     continue
187
188                 for p in proc_states["items"]:
189                     self.on_message({
190                         "object_uuid": p["uuid"],
191                         "event_type": "update",
192                         "properties": {
193                             "new_attributes": p
194                         }
195                     })
196         except:
197             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
198             self.cond.acquire()
199             self.processes.clear()
200             self.cond.notify()
201             self.cond.release()
202         finally:
203             self.stop_polling.set()
204
205     def get_uploaded(self):
206         return self.uploaded.copy()
207
208     def add_uploaded(self, src, pair):
209         self.uploaded[src] = pair
210
211     def add_intermediate_output(self, uuid):
212         if uuid:
213             self.intermediate_output_collections.append(uuid)
214
215     def trash_intermediate_output(self):
216         logger.info("Cleaning up intermediate output collections")
217         for i in self.intermediate_output_collections:
218             try:
219                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
220             except:
221                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
222             if sys.exc_info()[0] is KeyboardInterrupt:
223                 break
224
225     def check_features(self, obj):
226         if isinstance(obj, dict):
227             if obj.get("writable") and self.work_api != "containers":
228                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
229             if obj.get("class") == "DockerRequirement":
230                 if obj.get("dockerOutputDirectory"):
231                     if self.work_api != "containers":
232                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
233                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
234                     if not obj.get("dockerOutputDirectory").startswith('/'):
235                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
236                             "Option 'dockerOutputDirectory' must be an absolute path.")
237             for v in obj.itervalues():
238                 self.check_features(v)
239         elif isinstance(obj, list):
240             for i,v in enumerate(obj):
241                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
242                     self.check_features(v)
243
244     def make_output_collection(self, name, tagsString, outputObj):
245         outputObj = copy.deepcopy(outputObj)
246
247         files = []
248         def capture(fileobj):
249             files.append(fileobj)
250
251         adjustDirObjs(outputObj, capture)
252         adjustFileObjs(outputObj, capture)
253
254         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
255
256         final = arvados.collection.Collection(api_client=self.api,
257                                               keep_client=self.keep_client,
258                                               num_retries=self.num_retries)
259
260         for k,v in generatemapper.items():
261             if k.startswith("_:"):
262                 if v.type == "Directory":
263                     continue
264                 if v.type == "CreateFile":
265                     with final.open(v.target, "wb") as f:
266                         f.write(v.resolved.encode("utf-8"))
267                     continue
268
269             if not k.startswith("keep:"):
270                 raise Exception("Output source is not in keep or a literal")
271             sp = k.split("/")
272             srccollection = sp[0][5:]
273             try:
274                 reader = self.collection_cache.get(srccollection)
275                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
276                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
277             except arvados.errors.ArgumentError as e:
278                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
279                 raise
280             except IOError as e:
281                 logger.warn("While preparing output collection: %s", e)
282
283         def rewrite(fileobj):
284             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
285             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
286                 if k in fileobj:
287                     del fileobj[k]
288
289         adjustDirObjs(outputObj, rewrite)
290         adjustFileObjs(outputObj, rewrite)
291
292         with final.open("cwl.output.json", "w") as f:
293             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
294
295         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
296
297         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
298                     final.api_response()["name"],
299                     final.manifest_locator())
300
301         final_uuid = final.manifest_locator()
302         tags = tagsString.split(',')
303         for tag in tags:
304              self.api.links().create(body={
305                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
306                 }).execute(num_retries=self.num_retries)
307
308         def finalcollection(fileobj):
309             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
310
311         adjustDirObjs(outputObj, finalcollection)
312         adjustFileObjs(outputObj, finalcollection)
313
314         return (outputObj, final)
315
316     def set_crunch_output(self):
317         if self.work_api == "containers":
318             try:
319                 current = self.api.containers().current().execute(num_retries=self.num_retries)
320             except ApiError as e:
321                 # Status code 404 just means we're not running in a container.
322                 if e.resp.status != 404:
323                     logger.info("Getting current container: %s", e)
324                 return
325             try:
326                 self.api.containers().update(uuid=current['uuid'],
327                                              body={
328                                                  'output': self.final_output_collection.portable_data_hash(),
329                                              }).execute(num_retries=self.num_retries)
330                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
331                                               body={
332                                                   'is_trashed': True
333                                               }).execute(num_retries=self.num_retries)
334             except Exception as e:
335                 logger.info("Setting container output: %s", e)
336         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
337             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
338                                    body={
339                                        'output': self.final_output_collection.portable_data_hash(),
340                                        'success': self.final_status == "success",
341                                        'progress':1.0
342                                    }).execute(num_retries=self.num_retries)
343
344     def arv_executor(self, tool, job_order, **kwargs):
345         self.debug = kwargs.get("debug")
346
347         tool.visit(self.check_features)
348
349         self.project_uuid = kwargs.get("project_uuid")
350         self.pipeline = None
351         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
352                                                                  collection_cache=self.collection_cache)
353         self.fs_access = make_fs_access(kwargs["basedir"])
354
355
356         self.trash_intermediate = kwargs["trash_intermediate"]
357         if self.trash_intermediate and self.work_api != "containers":
358             raise Exception("--trash-intermediate is only supported with --api=containers.")
359
360         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
361         if self.intermediate_output_ttl and self.work_api != "containers":
362             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
363         if self.intermediate_output_ttl < 0:
364             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
365
366         if not kwargs.get("name"):
367             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
368
369         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
370         # Also uploads docker images.
371         merged_map = upload_workflow_deps(self, tool)
372
373         # Reload tool object which may have been updated by
374         # upload_workflow_deps
375         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
376                                   makeTool=self.arv_make_tool,
377                                   loader=tool.doc_loader,
378                                   avsc_names=tool.doc_schema,
379                                   metadata=tool.metadata)
380
381         # Upload local file references in the job order.
382         job_order = upload_job_order(self, "%s input" % kwargs["name"],
383                                      tool, job_order)
384
385         existing_uuid = kwargs.get("update_workflow")
386         if existing_uuid or kwargs.get("create_workflow"):
387             # Create a pipeline template or workflow record and exit.
388             if self.work_api == "jobs":
389                 tmpl = RunnerTemplate(self, tool, job_order,
390                                       kwargs.get("enable_reuse"),
391                                       uuid=existing_uuid,
392                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
393                                       name=kwargs["name"],
394                                       merged_map=merged_map)
395                 tmpl.save()
396                 # cwltool.main will write our return value to stdout.
397                 return (tmpl.uuid, "success")
398             elif self.work_api == "containers":
399                 return (upload_workflow(self, tool, job_order,
400                                         self.project_uuid,
401                                         uuid=existing_uuid,
402                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
403                                         name=kwargs["name"],
404                                         merged_map=merged_map),
405                         "success")
406
407         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
408
409         kwargs["make_fs_access"] = make_fs_access
410         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
411         kwargs["use_container"] = True
412         kwargs["tmpdir_prefix"] = "tmp"
413         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
414
415         if self.work_api == "containers":
416             kwargs["outdir"] = "/var/spool/cwl"
417             kwargs["docker_outdir"] = "/var/spool/cwl"
418             kwargs["tmpdir"] = "/tmp"
419             kwargs["docker_tmpdir"] = "/tmp"
420         elif self.work_api == "jobs":
421             kwargs["outdir"] = "$(task.outdir)"
422             kwargs["docker_outdir"] = "$(task.outdir)"
423             kwargs["tmpdir"] = "$(task.tmpdir)"
424
425         runnerjob = None
426         if kwargs.get("submit"):
427             # Submit a runner job to run the workflow for us.
428             if self.work_api == "containers":
429                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
430                     kwargs["runnerjob"] = tool.tool["id"]
431                     runnerjob = tool.job(job_order,
432                                          self.output_callback,
433                                          **kwargs).next()
434                 else:
435                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
436                                                 self.output_name,
437                                                 self.output_tags,
438                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
439                                                 name=kwargs.get("name"),
440                                                 on_error=kwargs.get("on_error"),
441                                                 submit_runner_image=kwargs.get("submit_runner_image"),
442                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
443                                                 merged_map=merged_map)
444             elif self.work_api == "jobs":
445                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
446                                       self.output_name,
447                                       self.output_tags,
448                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
449                                       name=kwargs.get("name"),
450                                       on_error=kwargs.get("on_error"),
451                                       submit_runner_image=kwargs.get("submit_runner_image"),
452                                       merged_map=merged_map)
453         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
454             # Create pipeline for local run
455             self.pipeline = self.api.pipeline_instances().create(
456                 body={
457                     "owner_uuid": self.project_uuid,
458                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
459                     "components": {},
460                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
461             logger.info("Pipeline instance %s", self.pipeline["uuid"])
462
463         if runnerjob and not kwargs.get("wait"):
464             runnerjob.run(wait=kwargs.get("wait"))
465             return (runnerjob.uuid, "success")
466
467         self.poll_api = arvados.api('v1')
468         self.polling_thread = threading.Thread(target=self.poll_states)
469         self.polling_thread.start()
470
471         if runnerjob:
472             jobiter = iter((runnerjob,))
473         else:
474             if "cwl_runner_job" in kwargs:
475                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
476             jobiter = tool.job(job_order,
477                                self.output_callback,
478                                **kwargs)
479
480         try:
481             self.cond.acquire()
482             # Will continue to hold the lock for the duration of this code
483             # except when in cond.wait(), at which point on_message can update
484             # job state and process output callbacks.
485
486             loopperf = Perf(metrics, "jobiter")
487             loopperf.__enter__()
488             for runnable in jobiter:
489                 loopperf.__exit__()
490
491                 if self.stop_polling.is_set():
492                     break
493
494                 if runnable:
495                     with Perf(metrics, "run"):
496                         runnable.run(**kwargs)
497                 else:
498                     if self.processes:
499                         self.cond.wait(1)
500                     else:
501                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
502                         break
503                 loopperf.__enter__()
504             loopperf.__exit__()
505
506             while self.processes:
507                 self.cond.wait(1)
508
509         except UnsupportedRequirement:
510             raise
511         except:
512             if sys.exc_info()[0] is KeyboardInterrupt:
513                 logger.error("Interrupted, marking pipeline as failed")
514             else:
515                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
516             if self.pipeline:
517                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
518                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
519             if runnerjob and runnerjob.uuid and self.work_api == "containers":
520                 self.api.container_requests().update(uuid=runnerjob.uuid,
521                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
522         finally:
523             self.cond.release()
524             self.stop_polling.set()
525             self.polling_thread.join()
526
527         if self.final_status == "UnsupportedRequirement":
528             raise UnsupportedRequirement("Check log for details.")
529
530         if self.final_output is None:
531             raise WorkflowException("Workflow did not return a result.")
532
533         if kwargs.get("submit") and isinstance(runnerjob, Runner):
534             logger.info("Final output collection %s", runnerjob.final_output)
535         else:
536             if self.output_name is None:
537                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
538             if self.output_tags is None:
539                 self.output_tags = ""
540             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
541             self.set_crunch_output()
542
543         if kwargs.get("compute_checksum"):
544             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
545             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
546
547         if self.trash_intermediate and self.final_status == "success":
548             self.trash_intermediate_output()
549
550         return (self.final_output, self.final_status)
551
552
553 def versionstring():
554     """Print version string of key packages for provenance and debugging."""
555
556     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
557     arvpkg = pkg_resources.require("arvados-python-client")
558     cwlpkg = pkg_resources.require("cwltool")
559
560     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
561                                     "arvados-python-client", arvpkg[0].version,
562                                     "cwltool", cwlpkg[0].version)
563
564
565 def arg_parser():  # type: () -> argparse.ArgumentParser
566     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
567
568     parser.add_argument("--basedir", type=str,
569                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
570     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
571                         help="Output directory, default current directory")
572
573     parser.add_argument("--eval-timeout",
574                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
575                         type=float,
576                         default=20)
577
578     exgroup = parser.add_mutually_exclusive_group()
579     exgroup.add_argument("--print-dot", action="store_true",
580                          help="Print workflow visualization in graphviz format and exit")
581     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
582     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
583
584     exgroup = parser.add_mutually_exclusive_group()
585     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
586     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
587     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
588
589     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
590
591     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
592
593     exgroup = parser.add_mutually_exclusive_group()
594     exgroup.add_argument("--enable-reuse", action="store_true",
595                         default=True, dest="enable_reuse",
596                         help="Enable job or container reuse (default)")
597     exgroup.add_argument("--disable-reuse", action="store_false",
598                         default=True, dest="enable_reuse",
599                         help="Disable job or container reuse")
600
601     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
602     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
603     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
604     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
605                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
606                         default=False)
607
608     exgroup = parser.add_mutually_exclusive_group()
609     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
610                         default=True, dest="submit")
611     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
612                         default=True, dest="submit")
613     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
614                          dest="create_workflow")
615     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
616     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
617
618     exgroup = parser.add_mutually_exclusive_group()
619     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
620                         default=True, dest="wait")
621     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
622                         default=True, dest="wait")
623
624     exgroup = parser.add_mutually_exclusive_group()
625     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
626                         default=True, dest="log_timestamps")
627     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
628                         default=True, dest="log_timestamps")
629
630     parser.add_argument("--api", type=str,
631                         default=None, dest="work_api",
632                         choices=("jobs", "containers"),
633                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
634
635     parser.add_argument("--compute-checksum", action="store_true", default=False,
636                         help="Compute checksum of contents while collecting outputs",
637                         dest="compute_checksum")
638
639     parser.add_argument("--submit-runner-ram", type=int,
640                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
641                         default=1024)
642
643     parser.add_argument("--submit-runner-image", type=str,
644                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
645                         default=None)
646
647     parser.add_argument("--name", type=str,
648                         help="Name to use for workflow execution instance.",
649                         default=None)
650
651     parser.add_argument("--on-error", type=str,
652                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
653                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
654
655     parser.add_argument("--enable-dev", action="store_true",
656                         help="Enable loading and running development versions "
657                              "of CWL spec.", default=False)
658
659     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
660                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
661                         default=0)
662
663     exgroup = parser.add_mutually_exclusive_group()
664     exgroup.add_argument("--trash-intermediate", action="store_true",
665                         default=False, dest="trash_intermediate",
666                          help="Immediately trash intermediate outputs on workflow success.")
667     exgroup.add_argument("--no-trash-intermediate", action="store_false",
668                         default=False, dest="trash_intermediate",
669                         help="Do not trash intermediate outputs (default).")
670
671     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
672     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
673
674     return parser
675
676 def add_arv_hints():
677     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
678     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
679     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
680     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
681     res.close()
682     cwltool.process.supportedProcessRequirements.extend([
683         "http://arvados.org/cwl#RunInSingleContainer",
684         "http://arvados.org/cwl#OutputDirType",
685         "http://arvados.org/cwl#RuntimeConstraints",
686         "http://arvados.org/cwl#PartitionRequirement",
687         "http://arvados.org/cwl#APIRequirement",
688         "http://commonwl.org/cwltool#LoadListingRequirement",
689         "http://arvados.org/cwl#IntermediateOutput",
690         "http://arvados.org/cwl#ReuseRequirement"
691     ])
692
693 def main(args, stdout, stderr, api_client=None, keep_client=None):
694     parser = arg_parser()
695
696     job_order_object = None
697     arvargs = parser.parse_args(args)
698
699     if arvargs.version:
700         print versionstring()
701         return
702
703     if arvargs.update_workflow:
704         if arvargs.update_workflow.find('-7fd4e-') == 5:
705             want_api = 'containers'
706         elif arvargs.update_workflow.find('-p5p6p-') == 5:
707             want_api = 'jobs'
708         else:
709             want_api = None
710         if want_api and arvargs.work_api and want_api != arvargs.work_api:
711             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
712                 arvargs.update_workflow, want_api, arvargs.work_api))
713             return 1
714         arvargs.work_api = want_api
715
716     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
717         job_order_object = ({}, "")
718
719     add_arv_hints()
720
721     try:
722         if api_client is None:
723             api_client=arvados.api('v1', model=OrderedJsonModel())
724         if keep_client is None:
725             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
726         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
727                               num_retries=4, output_name=arvargs.output_name,
728                               output_tags=arvargs.output_tags)
729     except Exception as e:
730         logger.error(e)
731         return 1
732
733     if arvargs.debug:
734         logger.setLevel(logging.DEBUG)
735         logging.getLogger('arvados').setLevel(logging.DEBUG)
736
737     if arvargs.quiet:
738         logger.setLevel(logging.WARN)
739         logging.getLogger('arvados').setLevel(logging.WARN)
740         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
741
742     if arvargs.metrics:
743         metrics.setLevel(logging.DEBUG)
744         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
745
746     if arvargs.log_timestamps:
747         arvados.log_handler.setFormatter(logging.Formatter(
748             '%(asctime)s %(name)s %(levelname)s: %(message)s',
749             '%Y-%m-%d %H:%M:%S'))
750     else:
751         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
752
753     arvargs.conformance_test = None
754     arvargs.use_container = True
755     arvargs.relax_path_checks = True
756     arvargs.print_supported_versions = False
757
758     make_fs_access = partial(CollectionFsAccess,
759                            collection_cache=runner.collection_cache)
760
761     return cwltool.main.main(args=arvargs,
762                              stdout=stdout,
763                              stderr=stderr,
764                              executor=runner.arv_executor,
765                              makeTool=runner.arv_make_tool,
766                              versionfunc=versionstring,
767                              job_order_object=job_order_object,
768                              make_fs_access=make_fs_access,
769                              fetcher_constructor=partial(CollectionFetcher,
770                                                          api_client=api_client,
771                                                          fs_access=make_fs_access(""),
772                                                          num_retries=runner.num_retries),
773                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
774                              logger_handler=arvados.log_handler,
775                              custom_schema_callback=add_arv_hints)