Merge branch '13090-eval-timeout' closes #13090
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20
21 from cwltool.errors import WorkflowException
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 from .arvcontainer import ArvadosContainer, RunnerContainer
33 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
34 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
35 from .arvtool import ArvadosCommandTool
36 from .arvworkflow import ArvadosWorkflow, upload_workflow
37 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
38 from .perf import Perf
39 from .pathmapper import NoFollowPathMapper
40 from ._version import __version__
41
42 from cwltool.pack import pack
43 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
44 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
45 from cwltool.draft2tool import compute_checksums
46 from arvados.api import OrderedJsonModel
47
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50 logger.setLevel(logging.INFO)
51
52 arvados.log_handler.setFormatter(logging.Formatter(
53         '%(asctime)s %(name)s %(levelname)s: %(message)s',
54         '%Y-%m-%d %H:%M:%S'))
55
56 class ArvCwlRunner(object):
57     """Execute a CWL tool or workflow, submit work (using either jobs or
58     containers API), wait for them to complete, and report output.
59
60     """
61
62     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63         self.api = api_client
64         self.processes = {}
65         self.lock = threading.Lock()
66         self.cond = threading.Condition(self.lock)
67         self.final_output = None
68         self.final_status = None
69         self.uploaded = {}
70         self.num_retries = num_retries
71         self.uuid = None
72         self.stop_polling = threading.Event()
73         self.poll_api = None
74         self.pipeline = None
75         self.final_output_collection = None
76         self.output_name = output_name
77         self.output_tags = output_tags
78         self.project_uuid = None
79         self.intermediate_output_ttl = 0
80         self.intermediate_output_collections = []
81         self.trash_intermediate = False
82
83         if keep_client is not None:
84             self.keep_client = keep_client
85         else:
86             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
87
88         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
89
90         self.work_api = None
91         expected_api = ["jobs", "containers"]
92         for api in expected_api:
93             try:
94                 methods = self.api._rootDesc.get('resources')[api]['methods']
95                 if ('httpMethod' in methods['create'] and
96                     (work_api == api or work_api is None)):
97                     self.work_api = api
98                     break
99             except KeyError:
100                 pass
101
102         if not self.work_api:
103             if work_api is None:
104                 raise Exception("No supported APIs")
105             else:
106                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
107
108     def arv_make_tool(self, toolpath_object, **kwargs):
109         kwargs["work_api"] = self.work_api
110         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
111                                                 api_client=self.api,
112                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
113                                                 num_retries=self.num_retries)
114         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
115         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
116             return ArvadosCommandTool(self, toolpath_object, **kwargs)
117         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
118             return ArvadosWorkflow(self, toolpath_object, **kwargs)
119         else:
120             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
121
122     def output_callback(self, out, processStatus):
123         if processStatus == "success":
124             logger.info("Overall process status is %s", processStatus)
125             if self.pipeline:
126                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
127                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
128         else:
129             logger.warn("Overall process status is %s", processStatus)
130             if self.pipeline:
131                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
132                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
133         self.final_status = processStatus
134         self.final_output = out
135
136     def on_message(self, event):
137         if "object_uuid" in event:
138             if event["object_uuid"] in self.processes and event["event_type"] == "update":
139                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
140                     uuid = event["object_uuid"]
141                     with self.lock:
142                         j = self.processes[uuid]
143                         logger.info("%s %s is Running", self.label(j), uuid)
144                         j.running = True
145                         j.update_pipeline_component(event["properties"]["new_attributes"])
146                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
147                     uuid = event["object_uuid"]
148                     try:
149                         self.cond.acquire()
150                         j = self.processes[uuid]
151                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
152                         with Perf(metrics, "done %s" % j.name):
153                             j.done(event["properties"]["new_attributes"])
154                         self.cond.notify()
155                     finally:
156                         self.cond.release()
157
158     def label(self, obj):
159         return "[%s %s]" % (self.work_api[0:-1], obj.name)
160
161     def poll_states(self):
162         """Poll status of jobs or containers listed in the processes dict.
163
164         Runs in a separate thread.
165         """
166
167         try:
168             while True:
169                 self.stop_polling.wait(15)
170                 if self.stop_polling.is_set():
171                     break
172                 with self.lock:
173                     keys = self.processes.keys()
174                 if not keys:
175                     continue
176
177                 if self.work_api == "containers":
178                     table = self.poll_api.container_requests()
179                 elif self.work_api == "jobs":
180                     table = self.poll_api.jobs()
181
182                 try:
183                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
184                 except Exception as e:
185                     logger.warn("Error checking states on API server: %s", e)
186                     continue
187
188                 for p in proc_states["items"]:
189                     self.on_message({
190                         "object_uuid": p["uuid"],
191                         "event_type": "update",
192                         "properties": {
193                             "new_attributes": p
194                         }
195                     })
196         except:
197             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
198             self.cond.acquire()
199             self.processes.clear()
200             self.cond.notify()
201             self.cond.release()
202         finally:
203             self.stop_polling.set()
204
205     def get_uploaded(self):
206         return self.uploaded.copy()
207
208     def add_uploaded(self, src, pair):
209         self.uploaded[src] = pair
210
211     def add_intermediate_output(self, uuid):
212         if uuid:
213             self.intermediate_output_collections.append(uuid)
214
215     def trash_intermediate_output(self):
216         logger.info("Cleaning up intermediate output collections")
217         for i in self.intermediate_output_collections:
218             try:
219                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
220             except:
221                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
222             if sys.exc_info()[0] is KeyboardInterrupt:
223                 break
224
225     def check_features(self, obj):
226         if isinstance(obj, dict):
227             if obj.get("writable") and self.work_api != "containers":
228                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
229             if obj.get("class") == "DockerRequirement":
230                 if obj.get("dockerOutputDirectory"):
231                     if self.work_api != "containers":
232                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
233                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
234                     if not obj.get("dockerOutputDirectory").startswith('/'):
235                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
236                             "Option 'dockerOutputDirectory' must be an absolute path.")
237             for v in obj.itervalues():
238                 self.check_features(v)
239         elif isinstance(obj, list):
240             for i,v in enumerate(obj):
241                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
242                     self.check_features(v)
243
244     def make_output_collection(self, name, tagsString, outputObj):
245         outputObj = copy.deepcopy(outputObj)
246
247         files = []
248         def capture(fileobj):
249             files.append(fileobj)
250
251         adjustDirObjs(outputObj, capture)
252         adjustFileObjs(outputObj, capture)
253
254         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
255
256         final = arvados.collection.Collection(api_client=self.api,
257                                               keep_client=self.keep_client,
258                                               num_retries=self.num_retries)
259
260         for k,v in generatemapper.items():
261             if k.startswith("_:"):
262                 if v.type == "Directory":
263                     continue
264                 if v.type == "CreateFile":
265                     with final.open(v.target, "wb") as f:
266                         f.write(v.resolved.encode("utf-8"))
267                     continue
268
269             if not k.startswith("keep:"):
270                 raise Exception("Output source is not in keep or a literal")
271             sp = k.split("/")
272             srccollection = sp[0][5:]
273             try:
274                 reader = self.collection_cache.get(srccollection)
275                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
276                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
277             except arvados.errors.ArgumentError as e:
278                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
279                 raise
280             except IOError as e:
281                 logger.warn("While preparing output collection: %s", e)
282
283         def rewrite(fileobj):
284             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
285             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
286                 if k in fileobj:
287                     del fileobj[k]
288
289         adjustDirObjs(outputObj, rewrite)
290         adjustFileObjs(outputObj, rewrite)
291
292         with final.open("cwl.output.json", "w") as f:
293             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
294
295         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
296
297         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
298                     final.api_response()["name"],
299                     final.manifest_locator())
300
301         final_uuid = final.manifest_locator()
302         tags = tagsString.split(',')
303         for tag in tags:
304              self.api.links().create(body={
305                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
306                 }).execute(num_retries=self.num_retries)
307
308         def finalcollection(fileobj):
309             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
310
311         adjustDirObjs(outputObj, finalcollection)
312         adjustFileObjs(outputObj, finalcollection)
313
314         return (outputObj, final)
315
316     def set_crunch_output(self):
317         if self.work_api == "containers":
318             try:
319                 current = self.api.containers().current().execute(num_retries=self.num_retries)
320             except ApiError as e:
321                 # Status code 404 just means we're not running in a container.
322                 if e.resp.status != 404:
323                     logger.info("Getting current container: %s", e)
324                 return
325             try:
326                 self.api.containers().update(uuid=current['uuid'],
327                                              body={
328                                                  'output': self.final_output_collection.portable_data_hash(),
329                                              }).execute(num_retries=self.num_retries)
330                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
331                                               body={
332                                                   'is_trashed': True
333                                               }).execute(num_retries=self.num_retries)
334             except Exception as e:
335                 logger.info("Setting container output: %s", e)
336         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
337             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
338                                    body={
339                                        'output': self.final_output_collection.portable_data_hash(),
340                                        'success': self.final_status == "success",
341                                        'progress':1.0
342                                    }).execute(num_retries=self.num_retries)
343
344     def arv_executor(self, tool, job_order, **kwargs):
345         self.debug = kwargs.get("debug")
346
347         tool.visit(self.check_features)
348
349         self.project_uuid = kwargs.get("project_uuid")
350         self.pipeline = None
351         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
352                                                                  collection_cache=self.collection_cache)
353         self.fs_access = make_fs_access(kwargs["basedir"])
354
355
356         self.trash_intermediate = kwargs["trash_intermediate"]
357         if self.trash_intermediate and self.work_api != "containers":
358             raise Exception("--trash-intermediate is only supported with --api=containers.")
359
360         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
361         if self.intermediate_output_ttl and self.work_api != "containers":
362             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
363         if self.intermediate_output_ttl < 0:
364             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
365
366         if not kwargs.get("name"):
367             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
368
369         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
370         # Also uploads docker images.
371         merged_map = upload_workflow_deps(self, tool)
372
373         # Reload tool object which may have been updated by
374         # upload_workflow_deps
375         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
376                                   makeTool=self.arv_make_tool,
377                                   loader=tool.doc_loader,
378                                   avsc_names=tool.doc_schema,
379                                   metadata=tool.metadata)
380
381         # Upload local file references in the job order.
382         job_order = upload_job_order(self, "%s input" % kwargs["name"],
383                                      tool, job_order)
384
385         existing_uuid = kwargs.get("update_workflow")
386         if existing_uuid or kwargs.get("create_workflow"):
387             # Create a pipeline template or workflow record and exit.
388             if self.work_api == "jobs":
389                 tmpl = RunnerTemplate(self, tool, job_order,
390                                       kwargs.get("enable_reuse"),
391                                       uuid=existing_uuid,
392                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
393                                       name=kwargs["name"],
394                                       merged_map=merged_map)
395                 tmpl.save()
396                 # cwltool.main will write our return value to stdout.
397                 return (tmpl.uuid, "success")
398             elif self.work_api == "containers":
399                 return (upload_workflow(self, tool, job_order,
400                                         self.project_uuid,
401                                         uuid=existing_uuid,
402                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
403                                         name=kwargs["name"],
404                                         merged_map=merged_map),
405                         "success")
406
407         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
408         self.eval_timeout = kwargs.get("eval_timeout")
409
410         kwargs["make_fs_access"] = make_fs_access
411         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
412         kwargs["use_container"] = True
413         kwargs["tmpdir_prefix"] = "tmp"
414         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
415
416         if self.work_api == "containers":
417             if self.ignore_docker_for_reuse:
418                 raise validate.ValidationException("--ignore-docker-for-reuse not supported with containers API.")
419             kwargs["outdir"] = "/var/spool/cwl"
420             kwargs["docker_outdir"] = "/var/spool/cwl"
421             kwargs["tmpdir"] = "/tmp"
422             kwargs["docker_tmpdir"] = "/tmp"
423         elif self.work_api == "jobs":
424             kwargs["outdir"] = "$(task.outdir)"
425             kwargs["docker_outdir"] = "$(task.outdir)"
426             kwargs["tmpdir"] = "$(task.tmpdir)"
427
428         runnerjob = None
429         if kwargs.get("submit"):
430             # Submit a runner job to run the workflow for us.
431             if self.work_api == "containers":
432                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
433                     kwargs["runnerjob"] = tool.tool["id"]
434                     runnerjob = tool.job(job_order,
435                                          self.output_callback,
436                                          **kwargs).next()
437                 else:
438                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
439                                                 self.output_name,
440                                                 self.output_tags,
441                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
442                                                 name=kwargs.get("name"),
443                                                 on_error=kwargs.get("on_error"),
444                                                 submit_runner_image=kwargs.get("submit_runner_image"),
445                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
446                                                 merged_map=merged_map)
447             elif self.work_api == "jobs":
448                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
449                                       self.output_name,
450                                       self.output_tags,
451                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
452                                       name=kwargs.get("name"),
453                                       on_error=kwargs.get("on_error"),
454                                       submit_runner_image=kwargs.get("submit_runner_image"),
455                                       merged_map=merged_map)
456         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
457             # Create pipeline for local run
458             self.pipeline = self.api.pipeline_instances().create(
459                 body={
460                     "owner_uuid": self.project_uuid,
461                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
462                     "components": {},
463                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
464             logger.info("Pipeline instance %s", self.pipeline["uuid"])
465
466         if runnerjob and not kwargs.get("wait"):
467             runnerjob.run(wait=kwargs.get("wait"))
468             return (runnerjob.uuid, "success")
469
470         self.poll_api = arvados.api('v1')
471         self.polling_thread = threading.Thread(target=self.poll_states)
472         self.polling_thread.start()
473
474         if runnerjob:
475             jobiter = iter((runnerjob,))
476         else:
477             if "cwl_runner_job" in kwargs:
478                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
479             jobiter = tool.job(job_order,
480                                self.output_callback,
481                                **kwargs)
482
483         try:
484             self.cond.acquire()
485             # Will continue to hold the lock for the duration of this code
486             # except when in cond.wait(), at which point on_message can update
487             # job state and process output callbacks.
488
489             loopperf = Perf(metrics, "jobiter")
490             loopperf.__enter__()
491             for runnable in jobiter:
492                 loopperf.__exit__()
493
494                 if self.stop_polling.is_set():
495                     break
496
497                 if runnable:
498                     with Perf(metrics, "run"):
499                         runnable.run(**kwargs)
500                 else:
501                     if self.processes:
502                         self.cond.wait(1)
503                     else:
504                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
505                         break
506                 loopperf.__enter__()
507             loopperf.__exit__()
508
509             while self.processes:
510                 self.cond.wait(1)
511
512         except UnsupportedRequirement:
513             raise
514         except:
515             if sys.exc_info()[0] is KeyboardInterrupt:
516                 logger.error("Interrupted, marking pipeline as failed")
517             else:
518                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
519             if self.pipeline:
520                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
521                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
522             if runnerjob and runnerjob.uuid and self.work_api == "containers":
523                 self.api.container_requests().update(uuid=runnerjob.uuid,
524                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
525         finally:
526             self.cond.release()
527             self.stop_polling.set()
528             self.polling_thread.join()
529
530         if self.final_status == "UnsupportedRequirement":
531             raise UnsupportedRequirement("Check log for details.")
532
533         if self.final_output is None:
534             raise WorkflowException("Workflow did not return a result.")
535
536         if kwargs.get("submit") and isinstance(runnerjob, Runner):
537             logger.info("Final output collection %s", runnerjob.final_output)
538         else:
539             if self.output_name is None:
540                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
541             if self.output_tags is None:
542                 self.output_tags = ""
543             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
544             self.set_crunch_output()
545
546         if kwargs.get("compute_checksum"):
547             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
548             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
549
550         if self.trash_intermediate and self.final_status == "success":
551             self.trash_intermediate_output()
552
553         return (self.final_output, self.final_status)
554
555
556 def versionstring():
557     """Print version string of key packages for provenance and debugging."""
558
559     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
560     arvpkg = pkg_resources.require("arvados-python-client")
561     cwlpkg = pkg_resources.require("cwltool")
562
563     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
564                                     "arvados-python-client", arvpkg[0].version,
565                                     "cwltool", cwlpkg[0].version)
566
567
568 def arg_parser():  # type: () -> argparse.ArgumentParser
569     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
570
571     parser.add_argument("--basedir", type=str,
572                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
573     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
574                         help="Output directory, default current directory")
575
576     parser.add_argument("--eval-timeout",
577                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
578                         type=float,
579                         default=20)
580
581     exgroup = parser.add_mutually_exclusive_group()
582     exgroup.add_argument("--print-dot", action="store_true",
583                          help="Print workflow visualization in graphviz format and exit")
584     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
585     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
586
587     exgroup = parser.add_mutually_exclusive_group()
588     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
589     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
590     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
591
592     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
593
594     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
595
596     exgroup = parser.add_mutually_exclusive_group()
597     exgroup.add_argument("--enable-reuse", action="store_true",
598                         default=True, dest="enable_reuse",
599                         help="Enable job or container reuse (default)")
600     exgroup.add_argument("--disable-reuse", action="store_false",
601                         default=True, dest="enable_reuse",
602                         help="Disable job or container reuse")
603
604     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
605     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
606     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
607     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
608                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
609                         default=False)
610
611     exgroup = parser.add_mutually_exclusive_group()
612     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
613                         default=True, dest="submit")
614     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
615                         default=True, dest="submit")
616     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
617                          dest="create_workflow")
618     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
619     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
620
621     exgroup = parser.add_mutually_exclusive_group()
622     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
623                         default=True, dest="wait")
624     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
625                         default=True, dest="wait")
626
627     exgroup = parser.add_mutually_exclusive_group()
628     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
629                         default=True, dest="log_timestamps")
630     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
631                         default=True, dest="log_timestamps")
632
633     parser.add_argument("--api", type=str,
634                         default=None, dest="work_api",
635                         choices=("jobs", "containers"),
636                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
637
638     parser.add_argument("--compute-checksum", action="store_true", default=False,
639                         help="Compute checksum of contents while collecting outputs",
640                         dest="compute_checksum")
641
642     parser.add_argument("--submit-runner-ram", type=int,
643                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
644                         default=1024)
645
646     parser.add_argument("--submit-runner-image", type=str,
647                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
648                         default=None)
649
650     parser.add_argument("--name", type=str,
651                         help="Name to use for workflow execution instance.",
652                         default=None)
653
654     parser.add_argument("--on-error", type=str,
655                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
656                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
657
658     parser.add_argument("--enable-dev", action="store_true",
659                         help="Enable loading and running development versions "
660                              "of CWL spec.", default=False)
661
662     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
663                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
664                         default=0)
665
666     exgroup = parser.add_mutually_exclusive_group()
667     exgroup.add_argument("--trash-intermediate", action="store_true",
668                         default=False, dest="trash_intermediate",
669                          help="Immediately trash intermediate outputs on workflow success.")
670     exgroup.add_argument("--no-trash-intermediate", action="store_false",
671                         default=False, dest="trash_intermediate",
672                         help="Do not trash intermediate outputs (default).")
673
674     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
675     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
676
677     return parser
678
679 def add_arv_hints():
680     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
681     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
682     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
683     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
684     res.close()
685     cwltool.process.supportedProcessRequirements.extend([
686         "http://arvados.org/cwl#RunInSingleContainer",
687         "http://arvados.org/cwl#OutputDirType",
688         "http://arvados.org/cwl#RuntimeConstraints",
689         "http://arvados.org/cwl#PartitionRequirement",
690         "http://arvados.org/cwl#APIRequirement",
691         "http://commonwl.org/cwltool#LoadListingRequirement",
692         "http://arvados.org/cwl#IntermediateOutput",
693         "http://arvados.org/cwl#ReuseRequirement"
694     ])
695
696 def main(args, stdout, stderr, api_client=None, keep_client=None):
697     parser = arg_parser()
698
699     job_order_object = None
700     arvargs = parser.parse_args(args)
701
702     if arvargs.version:
703         print versionstring()
704         return
705
706     if arvargs.update_workflow:
707         if arvargs.update_workflow.find('-7fd4e-') == 5:
708             want_api = 'containers'
709         elif arvargs.update_workflow.find('-p5p6p-') == 5:
710             want_api = 'jobs'
711         else:
712             want_api = None
713         if want_api and arvargs.work_api and want_api != arvargs.work_api:
714             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
715                 arvargs.update_workflow, want_api, arvargs.work_api))
716             return 1
717         arvargs.work_api = want_api
718
719     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
720         job_order_object = ({}, "")
721
722     add_arv_hints()
723
724     try:
725         if api_client is None:
726             api_client=arvados.api('v1', model=OrderedJsonModel())
727         if keep_client is None:
728             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
729         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
730                               num_retries=4, output_name=arvargs.output_name,
731                               output_tags=arvargs.output_tags)
732     except Exception as e:
733         logger.error(e)
734         return 1
735
736     if arvargs.debug:
737         logger.setLevel(logging.DEBUG)
738         logging.getLogger('arvados').setLevel(logging.DEBUG)
739
740     if arvargs.quiet:
741         logger.setLevel(logging.WARN)
742         logging.getLogger('arvados').setLevel(logging.WARN)
743         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
744
745     if arvargs.metrics:
746         metrics.setLevel(logging.DEBUG)
747         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
748
749     if arvargs.log_timestamps:
750         arvados.log_handler.setFormatter(logging.Formatter(
751             '%(asctime)s %(name)s %(levelname)s: %(message)s',
752             '%Y-%m-%d %H:%M:%S'))
753     else:
754         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
755
756     arvargs.conformance_test = None
757     arvargs.use_container = True
758     arvargs.relax_path_checks = True
759     arvargs.print_supported_versions = False
760
761     make_fs_access = partial(CollectionFsAccess,
762                            collection_cache=runner.collection_cache)
763
764     return cwltool.main.main(args=arvargs,
765                              stdout=stdout,
766                              stderr=stderr,
767                              executor=runner.arv_executor,
768                              makeTool=runner.arv_make_tool,
769                              versionfunc=versionstring,
770                              job_order_object=job_order_object,
771                              make_fs_access=make_fs_access,
772                              fetcher_constructor=partial(CollectionFetcher,
773                                                          api_client=api_client,
774                                                          fs_access=make_fs_access(""),
775                                                          num_retries=runner.num_retries),
776                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
777                              logger_handler=arvados.log_handler,
778                              custom_schema_callback=add_arv_hints)