13111: Merge branch 'master' into 13111-webdav-projects
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20
21 from cwltool.errors import WorkflowException
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
42
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.command_line_tool import compute_checksums
47 from arvados.api import OrderedJsonModel
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
52
53 arvados.log_handler.setFormatter(logging.Formatter(
54         '%(asctime)s %(name)s %(levelname)s: %(message)s',
55         '%Y-%m-%d %H:%M:%S'))
56
57 DEFAULT_PRIORITY = 500
58
59 class ArvCwlRunner(object):
60     """Execute a CWL tool or workflow, submit work (using either jobs or
61     containers API), wait for them to complete, and report output.
62
63     """
64
65     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
66         self.api = api_client
67         self.processes = {}
68         self.lock = threading.Lock()
69         self.cond = threading.Condition(self.lock)
70         self.final_output = None
71         self.final_status = None
72         self.uploaded = {}
73         self.num_retries = num_retries
74         self.uuid = None
75         self.stop_polling = threading.Event()
76         self.poll_api = None
77         self.pipeline = None
78         self.final_output_collection = None
79         self.output_name = output_name
80         self.output_tags = output_tags
81         self.project_uuid = None
82         self.intermediate_output_ttl = 0
83         self.intermediate_output_collections = []
84         self.trash_intermediate = False
85
86         if keep_client is not None:
87             self.keep_client = keep_client
88         else:
89             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
90
91         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
92
93         self.work_api = None
94         expected_api = ["jobs", "containers"]
95         for api in expected_api:
96             try:
97                 methods = self.api._rootDesc.get('resources')[api]['methods']
98                 if ('httpMethod' in methods['create'] and
99                     (work_api == api or work_api is None)):
100                     self.work_api = api
101                     break
102             except KeyError:
103                 pass
104
105         if not self.work_api:
106             if work_api is None:
107                 raise Exception("No supported APIs")
108             else:
109                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
110
111     def arv_make_tool(self, toolpath_object, **kwargs):
112         kwargs["work_api"] = self.work_api
113         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
114                                                 api_client=self.api,
115                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
116                                                 num_retries=self.num_retries)
117         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
118         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
119             return ArvadosCommandTool(self, toolpath_object, **kwargs)
120         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
121             return ArvadosWorkflow(self, toolpath_object, **kwargs)
122         else:
123             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
124
125     def output_callback(self, out, processStatus):
126         if processStatus == "success":
127             logger.info("Overall process status is %s", processStatus)
128             if self.pipeline:
129                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
131         else:
132             logger.warn("Overall process status is %s", processStatus)
133             if self.pipeline:
134                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
135                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
136         self.final_status = processStatus
137         self.final_output = out
138
139     def on_message(self, event):
140         if "object_uuid" in event:
141             if event["object_uuid"] in self.processes and event["event_type"] == "update":
142                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
143                     uuid = event["object_uuid"]
144                     with self.lock:
145                         j = self.processes[uuid]
146                         logger.info("%s %s is Running", self.label(j), uuid)
147                         j.running = True
148                         j.update_pipeline_component(event["properties"]["new_attributes"])
149                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
150                     uuid = event["object_uuid"]
151                     try:
152                         self.cond.acquire()
153                         j = self.processes[uuid]
154                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
155                         with Perf(metrics, "done %s" % j.name):
156                             j.done(event["properties"]["new_attributes"])
157                         self.cond.notify()
158                     finally:
159                         self.cond.release()
160
161     def label(self, obj):
162         return "[%s %s]" % (self.work_api[0:-1], obj.name)
163
164     def poll_states(self):
165         """Poll status of jobs or containers listed in the processes dict.
166
167         Runs in a separate thread.
168         """
169
170         try:
171             while True:
172                 self.stop_polling.wait(15)
173                 if self.stop_polling.is_set():
174                     break
175                 with self.lock:
176                     keys = self.processes.keys()
177                 if not keys:
178                     continue
179
180                 if self.work_api == "containers":
181                     table = self.poll_api.container_requests()
182                 elif self.work_api == "jobs":
183                     table = self.poll_api.jobs()
184
185                 try:
186                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
187                 except Exception as e:
188                     logger.warn("Error checking states on API server: %s", e)
189                     continue
190
191                 for p in proc_states["items"]:
192                     self.on_message({
193                         "object_uuid": p["uuid"],
194                         "event_type": "update",
195                         "properties": {
196                             "new_attributes": p
197                         }
198                     })
199         except:
200             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
201             self.cond.acquire()
202             self.processes.clear()
203             self.cond.notify()
204             self.cond.release()
205         finally:
206             self.stop_polling.set()
207
208     def get_uploaded(self):
209         return self.uploaded.copy()
210
211     def add_uploaded(self, src, pair):
212         self.uploaded[src] = pair
213
214     def add_intermediate_output(self, uuid):
215         if uuid:
216             self.intermediate_output_collections.append(uuid)
217
218     def trash_intermediate_output(self):
219         logger.info("Cleaning up intermediate output collections")
220         for i in self.intermediate_output_collections:
221             try:
222                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
223             except:
224                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
225             if sys.exc_info()[0] is KeyboardInterrupt:
226                 break
227
228     def check_features(self, obj):
229         if isinstance(obj, dict):
230             if obj.get("writable") and self.work_api != "containers":
231                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
232             if obj.get("class") == "DockerRequirement":
233                 if obj.get("dockerOutputDirectory"):
234                     if self.work_api != "containers":
235                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
236                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
237                     if not obj.get("dockerOutputDirectory").startswith('/'):
238                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
239                             "Option 'dockerOutputDirectory' must be an absolute path.")
240             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
241                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
242             for v in obj.itervalues():
243                 self.check_features(v)
244         elif isinstance(obj, list):
245             for i,v in enumerate(obj):
246                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
247                     self.check_features(v)
248
249     def make_output_collection(self, name, tagsString, outputObj):
250         outputObj = copy.deepcopy(outputObj)
251
252         files = []
253         def capture(fileobj):
254             files.append(fileobj)
255
256         adjustDirObjs(outputObj, capture)
257         adjustFileObjs(outputObj, capture)
258
259         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
260
261         final = arvados.collection.Collection(api_client=self.api,
262                                               keep_client=self.keep_client,
263                                               num_retries=self.num_retries)
264
265         for k,v in generatemapper.items():
266             if k.startswith("_:"):
267                 if v.type == "Directory":
268                     continue
269                 if v.type == "CreateFile":
270                     with final.open(v.target, "wb") as f:
271                         f.write(v.resolved.encode("utf-8"))
272                     continue
273
274             if not k.startswith("keep:"):
275                 raise Exception("Output source is not in keep or a literal")
276             sp = k.split("/")
277             srccollection = sp[0][5:]
278             try:
279                 reader = self.collection_cache.get(srccollection)
280                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
281                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
282             except arvados.errors.ArgumentError as e:
283                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
284                 raise
285             except IOError as e:
286                 logger.warn("While preparing output collection: %s", e)
287
288         def rewrite(fileobj):
289             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
290             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
291                 if k in fileobj:
292                     del fileobj[k]
293
294         adjustDirObjs(outputObj, rewrite)
295         adjustFileObjs(outputObj, rewrite)
296
297         with final.open("cwl.output.json", "w") as f:
298             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
299
300         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
301
302         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
303                     final.api_response()["name"],
304                     final.manifest_locator())
305
306         final_uuid = final.manifest_locator()
307         tags = tagsString.split(',')
308         for tag in tags:
309              self.api.links().create(body={
310                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
311                 }).execute(num_retries=self.num_retries)
312
313         def finalcollection(fileobj):
314             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
315
316         adjustDirObjs(outputObj, finalcollection)
317         adjustFileObjs(outputObj, finalcollection)
318
319         return (outputObj, final)
320
321     def set_crunch_output(self):
322         if self.work_api == "containers":
323             try:
324                 current = self.api.containers().current().execute(num_retries=self.num_retries)
325             except ApiError as e:
326                 # Status code 404 just means we're not running in a container.
327                 if e.resp.status != 404:
328                     logger.info("Getting current container: %s", e)
329                 return
330             try:
331                 self.api.containers().update(uuid=current['uuid'],
332                                              body={
333                                                  'output': self.final_output_collection.portable_data_hash(),
334                                              }).execute(num_retries=self.num_retries)
335                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
336                                               body={
337                                                   'is_trashed': True
338                                               }).execute(num_retries=self.num_retries)
339             except Exception as e:
340                 logger.info("Setting container output: %s", e)
341         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
342             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
343                                    body={
344                                        'output': self.final_output_collection.portable_data_hash(),
345                                        'success': self.final_status == "success",
346                                        'progress':1.0
347                                    }).execute(num_retries=self.num_retries)
348
349     def arv_executor(self, tool, job_order, **kwargs):
350         self.debug = kwargs.get("debug")
351
352         tool.visit(self.check_features)
353
354         self.project_uuid = kwargs.get("project_uuid")
355         self.pipeline = None
356         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
357                                                                  collection_cache=self.collection_cache)
358         self.fs_access = make_fs_access(kwargs["basedir"])
359         self.secret_store = kwargs.get("secret_store")
360
361         self.trash_intermediate = kwargs["trash_intermediate"]
362         if self.trash_intermediate and self.work_api != "containers":
363             raise Exception("--trash-intermediate is only supported with --api=containers.")
364
365         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
366         if self.intermediate_output_ttl and self.work_api != "containers":
367             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
368         if self.intermediate_output_ttl < 0:
369             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
370
371         if not kwargs.get("name"):
372             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
373
374         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
375         # Also uploads docker images.
376         merged_map = upload_workflow_deps(self, tool)
377
378         # Reload tool object which may have been updated by
379         # upload_workflow_deps
380         # Don't validate this time because it will just print redundant errors.
381         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
382                                   makeTool=self.arv_make_tool,
383                                   loader=tool.doc_loader,
384                                   avsc_names=tool.doc_schema,
385                                   metadata=tool.metadata,
386                                   do_validate=False)
387
388         # Upload local file references in the job order.
389         job_order = upload_job_order(self, "%s input" % kwargs["name"],
390                                      tool, job_order)
391
392         existing_uuid = kwargs.get("update_workflow")
393         if existing_uuid or kwargs.get("create_workflow"):
394             # Create a pipeline template or workflow record and exit.
395             if self.work_api == "jobs":
396                 tmpl = RunnerTemplate(self, tool, job_order,
397                                       kwargs.get("enable_reuse"),
398                                       uuid=existing_uuid,
399                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
400                                       name=kwargs["name"],
401                                       merged_map=merged_map)
402                 tmpl.save()
403                 # cwltool.main will write our return value to stdout.
404                 return (tmpl.uuid, "success")
405             elif self.work_api == "containers":
406                 return (upload_workflow(self, tool, job_order,
407                                         self.project_uuid,
408                                         uuid=existing_uuid,
409                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
410                                         name=kwargs["name"],
411                                         merged_map=merged_map),
412                         "success")
413
414         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
415         self.eval_timeout = kwargs.get("eval_timeout")
416
417         kwargs["make_fs_access"] = make_fs_access
418         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
419         kwargs["use_container"] = True
420         kwargs["tmpdir_prefix"] = "tmp"
421         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
422
423         if self.work_api == "containers":
424             if self.ignore_docker_for_reuse:
425                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
426             kwargs["outdir"] = "/var/spool/cwl"
427             kwargs["docker_outdir"] = "/var/spool/cwl"
428             kwargs["tmpdir"] = "/tmp"
429             kwargs["docker_tmpdir"] = "/tmp"
430         elif self.work_api == "jobs":
431             if kwargs["priority"] != DEFAULT_PRIORITY:
432                 raise Exception("--priority not implemented for jobs API.")
433             kwargs["outdir"] = "$(task.outdir)"
434             kwargs["docker_outdir"] = "$(task.outdir)"
435             kwargs["tmpdir"] = "$(task.tmpdir)"
436
437         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
438             raise Exception("--priority must be in the range 1..1000.")
439
440         runnerjob = None
441         if kwargs.get("submit"):
442             # Submit a runner job to run the workflow for us.
443             if self.work_api == "containers":
444                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
445                     kwargs["runnerjob"] = tool.tool["id"]
446                     runnerjob = tool.job(job_order,
447                                          self.output_callback,
448                                          **kwargs).next()
449                 else:
450                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
451                                                 self.output_name,
452                                                 self.output_tags,
453                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
454                                                 name=kwargs.get("name"),
455                                                 on_error=kwargs.get("on_error"),
456                                                 submit_runner_image=kwargs.get("submit_runner_image"),
457                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
458                                                 merged_map=merged_map,
459                                                 priority=kwargs.get("priority"),
460                                                 secret_store=self.secret_store)
461             elif self.work_api == "jobs":
462                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
463                                       self.output_name,
464                                       self.output_tags,
465                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
466                                       name=kwargs.get("name"),
467                                       on_error=kwargs.get("on_error"),
468                                       submit_runner_image=kwargs.get("submit_runner_image"),
469                                       merged_map=merged_map)
470         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
471             # Create pipeline for local run
472             self.pipeline = self.api.pipeline_instances().create(
473                 body={
474                     "owner_uuid": self.project_uuid,
475                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
476                     "components": {},
477                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
478             logger.info("Pipeline instance %s", self.pipeline["uuid"])
479
480         if runnerjob and not kwargs.get("wait"):
481             runnerjob.run(wait=kwargs.get("wait"))
482             return (runnerjob.uuid, "success")
483
484         self.poll_api = arvados.api('v1')
485         self.polling_thread = threading.Thread(target=self.poll_states)
486         self.polling_thread.start()
487
488         if runnerjob:
489             jobiter = iter((runnerjob,))
490         else:
491             if "cwl_runner_job" in kwargs:
492                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
493             jobiter = tool.job(job_order,
494                                self.output_callback,
495                                **kwargs)
496
497         try:
498             self.cond.acquire()
499             # Will continue to hold the lock for the duration of this code
500             # except when in cond.wait(), at which point on_message can update
501             # job state and process output callbacks.
502
503             loopperf = Perf(metrics, "jobiter")
504             loopperf.__enter__()
505             for runnable in jobiter:
506                 loopperf.__exit__()
507
508                 if self.stop_polling.is_set():
509                     break
510
511                 if runnable:
512                     with Perf(metrics, "run"):
513                         runnable.run(**kwargs)
514                 else:
515                     if self.processes:
516                         self.cond.wait(1)
517                     else:
518                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
519                         break
520                 loopperf.__enter__()
521             loopperf.__exit__()
522
523             while self.processes:
524                 self.cond.wait(1)
525
526         except UnsupportedRequirement:
527             raise
528         except:
529             if sys.exc_info()[0] is KeyboardInterrupt:
530                 logger.error("Interrupted, marking pipeline as failed")
531             else:
532                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
533             if self.pipeline:
534                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
535                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
536             if runnerjob and runnerjob.uuid and self.work_api == "containers":
537                 self.api.container_requests().update(uuid=runnerjob.uuid,
538                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
539         finally:
540             self.cond.release()
541             self.stop_polling.set()
542             self.polling_thread.join()
543
544         if self.final_status == "UnsupportedRequirement":
545             raise UnsupportedRequirement("Check log for details.")
546
547         if self.final_output is None:
548             raise WorkflowException("Workflow did not return a result.")
549
550         if kwargs.get("submit") and isinstance(runnerjob, Runner):
551             logger.info("Final output collection %s", runnerjob.final_output)
552         else:
553             if self.output_name is None:
554                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
555             if self.output_tags is None:
556                 self.output_tags = ""
557             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
558             self.set_crunch_output()
559
560         if kwargs.get("compute_checksum"):
561             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
562             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
563
564         if self.trash_intermediate and self.final_status == "success":
565             self.trash_intermediate_output()
566
567         return (self.final_output, self.final_status)
568
569
570 def versionstring():
571     """Print version string of key packages for provenance and debugging."""
572
573     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
574     arvpkg = pkg_resources.require("arvados-python-client")
575     cwlpkg = pkg_resources.require("cwltool")
576
577     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
578                                     "arvados-python-client", arvpkg[0].version,
579                                     "cwltool", cwlpkg[0].version)
580
581
582 def arg_parser():  # type: () -> argparse.ArgumentParser
583     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
584
585     parser.add_argument("--basedir", type=str,
586                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
587     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
588                         help="Output directory, default current directory")
589
590     parser.add_argument("--eval-timeout",
591                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
592                         type=float,
593                         default=20)
594
595     exgroup = parser.add_mutually_exclusive_group()
596     exgroup.add_argument("--print-dot", action="store_true",
597                          help="Print workflow visualization in graphviz format and exit")
598     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
599     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
600
601     exgroup = parser.add_mutually_exclusive_group()
602     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
603     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
604     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
605
606     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
607
608     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
609
610     exgroup = parser.add_mutually_exclusive_group()
611     exgroup.add_argument("--enable-reuse", action="store_true",
612                         default=True, dest="enable_reuse",
613                         help="Enable job or container reuse (default)")
614     exgroup.add_argument("--disable-reuse", action="store_false",
615                         default=True, dest="enable_reuse",
616                         help="Disable job or container reuse")
617
618     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
619     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
620     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
621     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
622                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
623                         default=False)
624
625     exgroup = parser.add_mutually_exclusive_group()
626     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
627                         default=True, dest="submit")
628     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
629                         default=True, dest="submit")
630     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
631                          dest="create_workflow")
632     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
633     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
634
635     exgroup = parser.add_mutually_exclusive_group()
636     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
637                         default=True, dest="wait")
638     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
639                         default=True, dest="wait")
640
641     exgroup = parser.add_mutually_exclusive_group()
642     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
643                         default=True, dest="log_timestamps")
644     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
645                         default=True, dest="log_timestamps")
646
647     parser.add_argument("--api", type=str,
648                         default=None, dest="work_api",
649                         choices=("jobs", "containers"),
650                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
651
652     parser.add_argument("--compute-checksum", action="store_true", default=False,
653                         help="Compute checksum of contents while collecting outputs",
654                         dest="compute_checksum")
655
656     parser.add_argument("--submit-runner-ram", type=int,
657                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
658                         default=1024)
659
660     parser.add_argument("--submit-runner-image", type=str,
661                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
662                         default=None)
663
664     parser.add_argument("--name", type=str,
665                         help="Name to use for workflow execution instance.",
666                         default=None)
667
668     parser.add_argument("--on-error", type=str,
669                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
670                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
671
672     parser.add_argument("--enable-dev", action="store_true",
673                         help="Enable loading and running development versions "
674                              "of CWL spec.", default=False)
675
676     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
677                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
678                         default=0)
679
680     parser.add_argument("--priority", type=int,
681                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
682                         default=DEFAULT_PRIORITY)
683
684     parser.add_argument("--disable-validate", dest="do_validate",
685                         action="store_false", default=True,
686                         help=argparse.SUPPRESS)
687
688     parser.add_argument("--disable-js-validation",
689                         action="store_true", default=False,
690                         help=argparse.SUPPRESS)
691
692     exgroup = parser.add_mutually_exclusive_group()
693     exgroup.add_argument("--trash-intermediate", action="store_true",
694                         default=False, dest="trash_intermediate",
695                          help="Immediately trash intermediate outputs on workflow success.")
696     exgroup.add_argument("--no-trash-intermediate", action="store_false",
697                         default=False, dest="trash_intermediate",
698                         help="Do not trash intermediate outputs (default).")
699
700     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
701     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
702
703     return parser
704
705 def add_arv_hints():
706     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
707     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
708     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
709     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
710     res.close()
711     cwltool.process.supportedProcessRequirements.extend([
712         "http://arvados.org/cwl#RunInSingleContainer",
713         "http://arvados.org/cwl#OutputDirType",
714         "http://arvados.org/cwl#RuntimeConstraints",
715         "http://arvados.org/cwl#PartitionRequirement",
716         "http://arvados.org/cwl#APIRequirement",
717         "http://commonwl.org/cwltool#LoadListingRequirement",
718         "http://arvados.org/cwl#IntermediateOutput",
719         "http://arvados.org/cwl#ReuseRequirement"
720     ])
721
722 def main(args, stdout, stderr, api_client=None, keep_client=None):
723     parser = arg_parser()
724
725     job_order_object = None
726     arvargs = parser.parse_args(args)
727
728     if arvargs.update_workflow:
729         if arvargs.update_workflow.find('-7fd4e-') == 5:
730             want_api = 'containers'
731         elif arvargs.update_workflow.find('-p5p6p-') == 5:
732             want_api = 'jobs'
733         else:
734             want_api = None
735         if want_api and arvargs.work_api and want_api != arvargs.work_api:
736             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
737                 arvargs.update_workflow, want_api, arvargs.work_api))
738             return 1
739         arvargs.work_api = want_api
740
741     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
742         job_order_object = ({}, "")
743
744     add_arv_hints()
745
746     try:
747         if api_client is None:
748             api_client=arvados.api('v1', model=OrderedJsonModel())
749         if keep_client is None:
750             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
751         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
752                               num_retries=4, output_name=arvargs.output_name,
753                               output_tags=arvargs.output_tags)
754     except Exception as e:
755         logger.error(e)
756         return 1
757
758     if arvargs.debug:
759         logger.setLevel(logging.DEBUG)
760         logging.getLogger('arvados').setLevel(logging.DEBUG)
761
762     if arvargs.quiet:
763         logger.setLevel(logging.WARN)
764         logging.getLogger('arvados').setLevel(logging.WARN)
765         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
766
767     if arvargs.metrics:
768         metrics.setLevel(logging.DEBUG)
769         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
770
771     if arvargs.log_timestamps:
772         arvados.log_handler.setFormatter(logging.Formatter(
773             '%(asctime)s %(name)s %(levelname)s: %(message)s',
774             '%Y-%m-%d %H:%M:%S'))
775     else:
776         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
777
778     arvargs.conformance_test = None
779     arvargs.use_container = True
780     arvargs.relax_path_checks = True
781     arvargs.print_supported_versions = False
782
783     make_fs_access = partial(CollectionFsAccess,
784                            collection_cache=runner.collection_cache)
785
786     return cwltool.main.main(args=arvargs,
787                              stdout=stdout,
788                              stderr=stderr,
789                              executor=runner.arv_executor,
790                              makeTool=runner.arv_make_tool,
791                              versionfunc=versionstring,
792                              job_order_object=job_order_object,
793                              make_fs_access=make_fs_access,
794                              fetcher_constructor=partial(CollectionFetcher,
795                                                          api_client=api_client,
796                                                          fs_access=make_fs_access(""),
797                                                          num_retries=runner.num_retries),
798                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
799                              logger_handler=arvados.log_handler,
800                              custom_schema_callback=add_arv_hints)