Merge branch 'master' of git.curoverse.com:arvados into 13076-r-autogen-api
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20
21 from cwltool.errors import WorkflowException
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
42
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.command_line_tool import compute_checksums
47 from arvados.api import OrderedJsonModel
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
52
53 arvados.log_handler.setFormatter(logging.Formatter(
54         '%(asctime)s %(name)s %(levelname)s: %(message)s',
55         '%Y-%m-%d %H:%M:%S'))
56
57 DEFAULT_PRIORITY = 500
58
59 class ArvCwlRunner(object):
60     """Execute a CWL tool or workflow, submit work (using either jobs or
61     containers API), wait for them to complete, and report output.
62
63     """
64
65     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
66         self.api = api_client
67         self.processes = {}
68         self.lock = threading.Lock()
69         self.cond = threading.Condition(self.lock)
70         self.final_output = None
71         self.final_status = None
72         self.uploaded = {}
73         self.num_retries = num_retries
74         self.uuid = None
75         self.stop_polling = threading.Event()
76         self.poll_api = None
77         self.pipeline = None
78         self.final_output_collection = None
79         self.output_name = output_name
80         self.output_tags = output_tags
81         self.project_uuid = None
82         self.intermediate_output_ttl = 0
83         self.intermediate_output_collections = []
84         self.trash_intermediate = False
85
86         if keep_client is not None:
87             self.keep_client = keep_client
88         else:
89             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
90
91         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
92
93         self.work_api = None
94         expected_api = ["jobs", "containers"]
95         for api in expected_api:
96             try:
97                 methods = self.api._rootDesc.get('resources')[api]['methods']
98                 if ('httpMethod' in methods['create'] and
99                     (work_api == api or work_api is None)):
100                     self.work_api = api
101                     break
102             except KeyError:
103                 pass
104
105         if not self.work_api:
106             if work_api is None:
107                 raise Exception("No supported APIs")
108             else:
109                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
110
111     def arv_make_tool(self, toolpath_object, **kwargs):
112         kwargs["work_api"] = self.work_api
113         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
114                                                 api_client=self.api,
115                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
116                                                 num_retries=self.num_retries)
117         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
118         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
119             return ArvadosCommandTool(self, toolpath_object, **kwargs)
120         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
121             return ArvadosWorkflow(self, toolpath_object, **kwargs)
122         else:
123             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
124
125     def output_callback(self, out, processStatus):
126         if processStatus == "success":
127             logger.info("Overall process status is %s", processStatus)
128             if self.pipeline:
129                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
131         else:
132             logger.warn("Overall process status is %s", processStatus)
133             if self.pipeline:
134                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
135                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
136         self.final_status = processStatus
137         self.final_output = out
138
139     def on_message(self, event):
140         if "object_uuid" in event:
141             if event["object_uuid"] in self.processes and event["event_type"] == "update":
142                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
143                     uuid = event["object_uuid"]
144                     with self.lock:
145                         j = self.processes[uuid]
146                         logger.info("%s %s is Running", self.label(j), uuid)
147                         j.running = True
148                         j.update_pipeline_component(event["properties"]["new_attributes"])
149                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
150                     uuid = event["object_uuid"]
151                     try:
152                         self.cond.acquire()
153                         j = self.processes[uuid]
154                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
155                         with Perf(metrics, "done %s" % j.name):
156                             j.done(event["properties"]["new_attributes"])
157                         self.cond.notify()
158                     finally:
159                         self.cond.release()
160
161     def label(self, obj):
162         return "[%s %s]" % (self.work_api[0:-1], obj.name)
163
164     def poll_states(self):
165         """Poll status of jobs or containers listed in the processes dict.
166
167         Runs in a separate thread.
168         """
169
170         try:
171             while True:
172                 self.stop_polling.wait(15)
173                 if self.stop_polling.is_set():
174                     break
175                 with self.lock:
176                     keys = self.processes.keys()
177                 if not keys:
178                     continue
179
180                 if self.work_api == "containers":
181                     table = self.poll_api.container_requests()
182                 elif self.work_api == "jobs":
183                     table = self.poll_api.jobs()
184
185                 try:
186                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
187                 except Exception as e:
188                     logger.warn("Error checking states on API server: %s", e)
189                     continue
190
191                 for p in proc_states["items"]:
192                     self.on_message({
193                         "object_uuid": p["uuid"],
194                         "event_type": "update",
195                         "properties": {
196                             "new_attributes": p
197                         }
198                     })
199         except:
200             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
201             self.cond.acquire()
202             self.processes.clear()
203             self.cond.notify()
204             self.cond.release()
205         finally:
206             self.stop_polling.set()
207
208     def get_uploaded(self):
209         return self.uploaded.copy()
210
211     def add_uploaded(self, src, pair):
212         self.uploaded[src] = pair
213
214     def add_intermediate_output(self, uuid):
215         if uuid:
216             self.intermediate_output_collections.append(uuid)
217
218     def trash_intermediate_output(self):
219         logger.info("Cleaning up intermediate output collections")
220         for i in self.intermediate_output_collections:
221             try:
222                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
223             except:
224                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
225             if sys.exc_info()[0] is KeyboardInterrupt:
226                 break
227
228     def check_features(self, obj):
229         if isinstance(obj, dict):
230             if obj.get("writable") and self.work_api != "containers":
231                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
232             if obj.get("class") == "DockerRequirement":
233                 if obj.get("dockerOutputDirectory"):
234                     if self.work_api != "containers":
235                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
236                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
237                     if not obj.get("dockerOutputDirectory").startswith('/'):
238                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
239                             "Option 'dockerOutputDirectory' must be an absolute path.")
240             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
241                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
242             for v in obj.itervalues():
243                 self.check_features(v)
244         elif isinstance(obj, list):
245             for i,v in enumerate(obj):
246                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
247                     self.check_features(v)
248
249     def make_output_collection(self, name, tagsString, outputObj):
250         outputObj = copy.deepcopy(outputObj)
251
252         files = []
253         def capture(fileobj):
254             files.append(fileobj)
255
256         adjustDirObjs(outputObj, capture)
257         adjustFileObjs(outputObj, capture)
258
259         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
260
261         final = arvados.collection.Collection(api_client=self.api,
262                                               keep_client=self.keep_client,
263                                               num_retries=self.num_retries)
264
265         for k,v in generatemapper.items():
266             if k.startswith("_:"):
267                 if v.type == "Directory":
268                     continue
269                 if v.type == "CreateFile":
270                     with final.open(v.target, "wb") as f:
271                         f.write(v.resolved.encode("utf-8"))
272                     continue
273
274             if not k.startswith("keep:"):
275                 raise Exception("Output source is not in keep or a literal")
276             sp = k.split("/")
277             srccollection = sp[0][5:]
278             try:
279                 reader = self.collection_cache.get(srccollection)
280                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
281                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
282             except arvados.errors.ArgumentError as e:
283                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
284                 raise
285             except IOError as e:
286                 logger.warn("While preparing output collection: %s", e)
287
288         def rewrite(fileobj):
289             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
290             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
291                 if k in fileobj:
292                     del fileobj[k]
293
294         adjustDirObjs(outputObj, rewrite)
295         adjustFileObjs(outputObj, rewrite)
296
297         with final.open("cwl.output.json", "w") as f:
298             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
299
300         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
301
302         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
303                     final.api_response()["name"],
304                     final.manifest_locator())
305
306         final_uuid = final.manifest_locator()
307         tags = tagsString.split(',')
308         for tag in tags:
309              self.api.links().create(body={
310                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
311                 }).execute(num_retries=self.num_retries)
312
313         def finalcollection(fileobj):
314             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
315
316         adjustDirObjs(outputObj, finalcollection)
317         adjustFileObjs(outputObj, finalcollection)
318
319         return (outputObj, final)
320
321     def set_crunch_output(self):
322         if self.work_api == "containers":
323             try:
324                 current = self.api.containers().current().execute(num_retries=self.num_retries)
325             except ApiError as e:
326                 # Status code 404 just means we're not running in a container.
327                 if e.resp.status != 404:
328                     logger.info("Getting current container: %s", e)
329                 return
330             try:
331                 self.api.containers().update(uuid=current['uuid'],
332                                              body={
333                                                  'output': self.final_output_collection.portable_data_hash(),
334                                              }).execute(num_retries=self.num_retries)
335                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
336                                               body={
337                                                   'is_trashed': True
338                                               }).execute(num_retries=self.num_retries)
339             except Exception as e:
340                 logger.info("Setting container output: %s", e)
341         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
342             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
343                                    body={
344                                        'output': self.final_output_collection.portable_data_hash(),
345                                        'success': self.final_status == "success",
346                                        'progress':1.0
347                                    }).execute(num_retries=self.num_retries)
348
349     def arv_executor(self, tool, job_order, **kwargs):
350         self.debug = kwargs.get("debug")
351
352         tool.visit(self.check_features)
353
354         self.project_uuid = kwargs.get("project_uuid")
355         self.pipeline = None
356         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
357                                                                  collection_cache=self.collection_cache)
358         self.fs_access = make_fs_access(kwargs["basedir"])
359         self.secret_store = kwargs.get("secret_store")
360
361         self.trash_intermediate = kwargs["trash_intermediate"]
362         if self.trash_intermediate and self.work_api != "containers":
363             raise Exception("--trash-intermediate is only supported with --api=containers.")
364
365         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
366         if self.intermediate_output_ttl and self.work_api != "containers":
367             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
368         if self.intermediate_output_ttl < 0:
369             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
370
371         if not kwargs.get("name"):
372             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
373
374         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
375         # Also uploads docker images.
376         merged_map = upload_workflow_deps(self, tool)
377
378         # Reload tool object which may have been updated by
379         # upload_workflow_deps
380         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
381                                   makeTool=self.arv_make_tool,
382                                   loader=tool.doc_loader,
383                                   avsc_names=tool.doc_schema,
384                                   metadata=tool.metadata)
385
386         # Upload local file references in the job order.
387         job_order = upload_job_order(self, "%s input" % kwargs["name"],
388                                      tool, job_order)
389
390         existing_uuid = kwargs.get("update_workflow")
391         if existing_uuid or kwargs.get("create_workflow"):
392             # Create a pipeline template or workflow record and exit.
393             if self.work_api == "jobs":
394                 tmpl = RunnerTemplate(self, tool, job_order,
395                                       kwargs.get("enable_reuse"),
396                                       uuid=existing_uuid,
397                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
398                                       name=kwargs["name"],
399                                       merged_map=merged_map)
400                 tmpl.save()
401                 # cwltool.main will write our return value to stdout.
402                 return (tmpl.uuid, "success")
403             elif self.work_api == "containers":
404                 return (upload_workflow(self, tool, job_order,
405                                         self.project_uuid,
406                                         uuid=existing_uuid,
407                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
408                                         name=kwargs["name"],
409                                         merged_map=merged_map),
410                         "success")
411
412         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
413         self.eval_timeout = kwargs.get("eval_timeout")
414
415         kwargs["make_fs_access"] = make_fs_access
416         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
417         kwargs["use_container"] = True
418         kwargs["tmpdir_prefix"] = "tmp"
419         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
420
421         if self.work_api == "containers":
422             if self.ignore_docker_for_reuse:
423                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
424             kwargs["outdir"] = "/var/spool/cwl"
425             kwargs["docker_outdir"] = "/var/spool/cwl"
426             kwargs["tmpdir"] = "/tmp"
427             kwargs["docker_tmpdir"] = "/tmp"
428         elif self.work_api == "jobs":
429             if kwargs["priority"] != DEFAULT_PRIORITY:
430                 raise Exception("--priority not implemented for jobs API.")
431             kwargs["outdir"] = "$(task.outdir)"
432             kwargs["docker_outdir"] = "$(task.outdir)"
433             kwargs["tmpdir"] = "$(task.tmpdir)"
434
435         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
436             raise Exception("--priority must be in the range 1..1000.")
437
438         runnerjob = None
439         if kwargs.get("submit"):
440             # Submit a runner job to run the workflow for us.
441             if self.work_api == "containers":
442                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
443                     kwargs["runnerjob"] = tool.tool["id"]
444                     runnerjob = tool.job(job_order,
445                                          self.output_callback,
446                                          **kwargs).next()
447                 else:
448                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
449                                                 self.output_name,
450                                                 self.output_tags,
451                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
452                                                 name=kwargs.get("name"),
453                                                 on_error=kwargs.get("on_error"),
454                                                 submit_runner_image=kwargs.get("submit_runner_image"),
455                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
456                                                 merged_map=merged_map,
457                                                 priority=kwargs.get("priority"),
458                                                 secret_store=self.secret_store)
459             elif self.work_api == "jobs":
460                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
461                                       self.output_name,
462                                       self.output_tags,
463                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
464                                       name=kwargs.get("name"),
465                                       on_error=kwargs.get("on_error"),
466                                       submit_runner_image=kwargs.get("submit_runner_image"),
467                                       merged_map=merged_map)
468         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
469             # Create pipeline for local run
470             self.pipeline = self.api.pipeline_instances().create(
471                 body={
472                     "owner_uuid": self.project_uuid,
473                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
474                     "components": {},
475                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
476             logger.info("Pipeline instance %s", self.pipeline["uuid"])
477
478         if runnerjob and not kwargs.get("wait"):
479             runnerjob.run(wait=kwargs.get("wait"))
480             return (runnerjob.uuid, "success")
481
482         self.poll_api = arvados.api('v1')
483         self.polling_thread = threading.Thread(target=self.poll_states)
484         self.polling_thread.start()
485
486         if runnerjob:
487             jobiter = iter((runnerjob,))
488         else:
489             if "cwl_runner_job" in kwargs:
490                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
491             jobiter = tool.job(job_order,
492                                self.output_callback,
493                                **kwargs)
494
495         try:
496             self.cond.acquire()
497             # Will continue to hold the lock for the duration of this code
498             # except when in cond.wait(), at which point on_message can update
499             # job state and process output callbacks.
500
501             loopperf = Perf(metrics, "jobiter")
502             loopperf.__enter__()
503             for runnable in jobiter:
504                 loopperf.__exit__()
505
506                 if self.stop_polling.is_set():
507                     break
508
509                 if runnable:
510                     with Perf(metrics, "run"):
511                         runnable.run(**kwargs)
512                 else:
513                     if self.processes:
514                         self.cond.wait(1)
515                     else:
516                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
517                         break
518                 loopperf.__enter__()
519             loopperf.__exit__()
520
521             while self.processes:
522                 self.cond.wait(1)
523
524         except UnsupportedRequirement:
525             raise
526         except:
527             if sys.exc_info()[0] is KeyboardInterrupt:
528                 logger.error("Interrupted, marking pipeline as failed")
529             else:
530                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
531             if self.pipeline:
532                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
533                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
534             if runnerjob and runnerjob.uuid and self.work_api == "containers":
535                 self.api.container_requests().update(uuid=runnerjob.uuid,
536                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
537         finally:
538             self.cond.release()
539             self.stop_polling.set()
540             self.polling_thread.join()
541
542         if self.final_status == "UnsupportedRequirement":
543             raise UnsupportedRequirement("Check log for details.")
544
545         if self.final_output is None:
546             raise WorkflowException("Workflow did not return a result.")
547
548         if kwargs.get("submit") and isinstance(runnerjob, Runner):
549             logger.info("Final output collection %s", runnerjob.final_output)
550         else:
551             if self.output_name is None:
552                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
553             if self.output_tags is None:
554                 self.output_tags = ""
555             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
556             self.set_crunch_output()
557
558         if kwargs.get("compute_checksum"):
559             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
560             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
561
562         if self.trash_intermediate and self.final_status == "success":
563             self.trash_intermediate_output()
564
565         return (self.final_output, self.final_status)
566
567
568 def versionstring():
569     """Print version string of key packages for provenance and debugging."""
570
571     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
572     arvpkg = pkg_resources.require("arvados-python-client")
573     cwlpkg = pkg_resources.require("cwltool")
574
575     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
576                                     "arvados-python-client", arvpkg[0].version,
577                                     "cwltool", cwlpkg[0].version)
578
579
580 def arg_parser():  # type: () -> argparse.ArgumentParser
581     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
582
583     parser.add_argument("--basedir", type=str,
584                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
585     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
586                         help="Output directory, default current directory")
587
588     parser.add_argument("--eval-timeout",
589                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
590                         type=float,
591                         default=20)
592
593     exgroup = parser.add_mutually_exclusive_group()
594     exgroup.add_argument("--print-dot", action="store_true",
595                          help="Print workflow visualization in graphviz format and exit")
596     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
597     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
598
599     exgroup = parser.add_mutually_exclusive_group()
600     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
601     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
602     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
603
604     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
605
606     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
607
608     exgroup = parser.add_mutually_exclusive_group()
609     exgroup.add_argument("--enable-reuse", action="store_true",
610                         default=True, dest="enable_reuse",
611                         help="Enable job or container reuse (default)")
612     exgroup.add_argument("--disable-reuse", action="store_false",
613                         default=True, dest="enable_reuse",
614                         help="Disable job or container reuse")
615
616     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
617     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
618     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
619     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
620                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
621                         default=False)
622
623     exgroup = parser.add_mutually_exclusive_group()
624     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
625                         default=True, dest="submit")
626     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
627                         default=True, dest="submit")
628     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
629                          dest="create_workflow")
630     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
631     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
632
633     exgroup = parser.add_mutually_exclusive_group()
634     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
635                         default=True, dest="wait")
636     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
637                         default=True, dest="wait")
638
639     exgroup = parser.add_mutually_exclusive_group()
640     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
641                         default=True, dest="log_timestamps")
642     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
643                         default=True, dest="log_timestamps")
644
645     parser.add_argument("--api", type=str,
646                         default=None, dest="work_api",
647                         choices=("jobs", "containers"),
648                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
649
650     parser.add_argument("--compute-checksum", action="store_true", default=False,
651                         help="Compute checksum of contents while collecting outputs",
652                         dest="compute_checksum")
653
654     parser.add_argument("--submit-runner-ram", type=int,
655                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
656                         default=1024)
657
658     parser.add_argument("--submit-runner-image", type=str,
659                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
660                         default=None)
661
662     parser.add_argument("--name", type=str,
663                         help="Name to use for workflow execution instance.",
664                         default=None)
665
666     parser.add_argument("--on-error", type=str,
667                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
668                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
669
670     parser.add_argument("--enable-dev", action="store_true",
671                         help="Enable loading and running development versions "
672                              "of CWL spec.", default=False)
673
674     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
675                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
676                         default=0)
677
678     parser.add_argument("--priority", type=int,
679                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
680                         default=DEFAULT_PRIORITY)
681
682     exgroup = parser.add_mutually_exclusive_group()
683     exgroup.add_argument("--trash-intermediate", action="store_true",
684                         default=False, dest="trash_intermediate",
685                          help="Immediately trash intermediate outputs on workflow success.")
686     exgroup.add_argument("--no-trash-intermediate", action="store_false",
687                         default=False, dest="trash_intermediate",
688                         help="Do not trash intermediate outputs (default).")
689
690     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
691     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
692
693     return parser
694
695 def add_arv_hints():
696     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
697     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
698     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
699     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
700     res.close()
701     cwltool.process.supportedProcessRequirements.extend([
702         "http://arvados.org/cwl#RunInSingleContainer",
703         "http://arvados.org/cwl#OutputDirType",
704         "http://arvados.org/cwl#RuntimeConstraints",
705         "http://arvados.org/cwl#PartitionRequirement",
706         "http://arvados.org/cwl#APIRequirement",
707         "http://commonwl.org/cwltool#LoadListingRequirement",
708         "http://arvados.org/cwl#IntermediateOutput",
709         "http://arvados.org/cwl#ReuseRequirement"
710     ])
711
712 def main(args, stdout, stderr, api_client=None, keep_client=None):
713     parser = arg_parser()
714
715     job_order_object = None
716     arvargs = parser.parse_args(args)
717
718     if arvargs.update_workflow:
719         if arvargs.update_workflow.find('-7fd4e-') == 5:
720             want_api = 'containers'
721         elif arvargs.update_workflow.find('-p5p6p-') == 5:
722             want_api = 'jobs'
723         else:
724             want_api = None
725         if want_api and arvargs.work_api and want_api != arvargs.work_api:
726             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
727                 arvargs.update_workflow, want_api, arvargs.work_api))
728             return 1
729         arvargs.work_api = want_api
730
731     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
732         job_order_object = ({}, "")
733
734     add_arv_hints()
735
736     try:
737         if api_client is None:
738             api_client=arvados.api('v1', model=OrderedJsonModel())
739         if keep_client is None:
740             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
741         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
742                               num_retries=4, output_name=arvargs.output_name,
743                               output_tags=arvargs.output_tags)
744     except Exception as e:
745         logger.error(e)
746         return 1
747
748     if arvargs.debug:
749         logger.setLevel(logging.DEBUG)
750         logging.getLogger('arvados').setLevel(logging.DEBUG)
751
752     if arvargs.quiet:
753         logger.setLevel(logging.WARN)
754         logging.getLogger('arvados').setLevel(logging.WARN)
755         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
756
757     if arvargs.metrics:
758         metrics.setLevel(logging.DEBUG)
759         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
760
761     if arvargs.log_timestamps:
762         arvados.log_handler.setFormatter(logging.Formatter(
763             '%(asctime)s %(name)s %(levelname)s: %(message)s',
764             '%Y-%m-%d %H:%M:%S'))
765     else:
766         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
767
768     arvargs.conformance_test = None
769     arvargs.use_container = True
770     arvargs.relax_path_checks = True
771     arvargs.print_supported_versions = False
772
773     make_fs_access = partial(CollectionFsAccess,
774                            collection_cache=runner.collection_cache)
775
776     return cwltool.main.main(args=arvargs,
777                              stdout=stdout,
778                              stderr=stderr,
779                              executor=runner.arv_executor,
780                              makeTool=runner.arv_make_tool,
781                              versionfunc=versionstring,
782                              job_order_object=job_order_object,
783                              make_fs_access=make_fs_access,
784                              fetcher_constructor=partial(CollectionFetcher,
785                                                          api_client=api_client,
786                                                          fs_access=make_fs_access(""),
787                                                          num_retries=runner.num_retries),
788                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
789                              logger_handler=arvados.log_handler,
790                              custom_schema_callback=add_arv_hints)