695597f839c1ceee5ccbb67f3391218d3d1a8e34
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20
21 from cwltool.errors import WorkflowException
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 import schema_salad
26 from schema_salad.sourceline import SourceLine
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
42
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.draft2tool import compute_checksums
47 from arvados.api import OrderedJsonModel
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
52
53 arvados.log_handler.setFormatter(logging.Formatter(
54         '%(asctime)s %(name)s %(levelname)s: %(message)s',
55         '%Y-%m-%d %H:%M:%S'))
56
57 class ArvCwlRunner(object):
58     """Execute a CWL tool or workflow, submit work (using either jobs or
59     containers API), wait for them to complete, and report output.
60
61     """
62
63     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
64         self.api = api_client
65         self.processes = {}
66         self.lock = threading.Lock()
67         self.cond = threading.Condition(self.lock)
68         self.final_output = None
69         self.final_status = None
70         self.uploaded = {}
71         self.num_retries = num_retries
72         self.uuid = None
73         self.stop_polling = threading.Event()
74         self.poll_api = None
75         self.pipeline = None
76         self.final_output_collection = None
77         self.output_name = output_name
78         self.output_tags = output_tags
79         self.project_uuid = None
80         self.intermediate_output_ttl = 0
81         self.intermediate_output_collections = []
82         self.trash_intermediate = False
83
84         if keep_client is not None:
85             self.keep_client = keep_client
86         else:
87             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
88
89         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
90
91         self.work_api = None
92         expected_api = ["jobs", "containers"]
93         for api in expected_api:
94             try:
95                 methods = self.api._rootDesc.get('resources')[api]['methods']
96                 if ('httpMethod' in methods['create'] and
97                     (work_api == api or work_api is None)):
98                     self.work_api = api
99                     break
100             except KeyError:
101                 pass
102
103         if not self.work_api:
104             if work_api is None:
105                 raise Exception("No supported APIs")
106             else:
107                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
108
109     def arv_make_tool(self, toolpath_object, **kwargs):
110         kwargs["work_api"] = self.work_api
111         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
112                                                 api_client=self.api,
113                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
114                                                 num_retries=self.num_retries,
115                                                 overrides=kwargs.get("override_tools"))
116         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
117             return ArvadosCommandTool(self, toolpath_object, **kwargs)
118         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
119             return ArvadosWorkflow(self, toolpath_object, **kwargs)
120         else:
121             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
122
123     def output_callback(self, out, processStatus):
124         if processStatus == "success":
125             logger.info("Overall process status is %s", processStatus)
126             if self.pipeline:
127                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
128                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
129         else:
130             logger.warn("Overall process status is %s", processStatus)
131             if self.pipeline:
132                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
133                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
134         self.final_status = processStatus
135         self.final_output = out
136
137     def on_message(self, event):
138         if "object_uuid" in event:
139             if event["object_uuid"] in self.processes and event["event_type"] == "update":
140                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
141                     uuid = event["object_uuid"]
142                     with self.lock:
143                         j = self.processes[uuid]
144                         logger.info("%s %s is Running", self.label(j), uuid)
145                         j.running = True
146                         j.update_pipeline_component(event["properties"]["new_attributes"])
147                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
148                     uuid = event["object_uuid"]
149                     try:
150                         self.cond.acquire()
151                         j = self.processes[uuid]
152                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
153                         with Perf(metrics, "done %s" % j.name):
154                             j.done(event["properties"]["new_attributes"])
155                         self.cond.notify()
156                     finally:
157                         self.cond.release()
158
159     def label(self, obj):
160         return "[%s %s]" % (self.work_api[0:-1], obj.name)
161
162     def poll_states(self):
163         """Poll status of jobs or containers listed in the processes dict.
164
165         Runs in a separate thread.
166         """
167
168         try:
169             while True:
170                 self.stop_polling.wait(15)
171                 if self.stop_polling.is_set():
172                     break
173                 with self.lock:
174                     keys = self.processes.keys()
175                 if not keys:
176                     continue
177
178                 if self.work_api == "containers":
179                     table = self.poll_api.container_requests()
180                 elif self.work_api == "jobs":
181                     table = self.poll_api.jobs()
182
183                 try:
184                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
185                 except Exception as e:
186                     logger.warn("Error checking states on API server: %s", e)
187                     continue
188
189                 for p in proc_states["items"]:
190                     self.on_message({
191                         "object_uuid": p["uuid"],
192                         "event_type": "update",
193                         "properties": {
194                             "new_attributes": p
195                         }
196                     })
197         except:
198             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
199             self.cond.acquire()
200             self.processes.clear()
201             self.cond.notify()
202             self.cond.release()
203         finally:
204             self.stop_polling.set()
205
206     def get_uploaded(self):
207         return self.uploaded.copy()
208
209     def add_uploaded(self, src, pair):
210         self.uploaded[src] = pair
211
212     def add_intermediate_output(self, uuid):
213         if uuid:
214             self.intermediate_output_collections.append(uuid)
215
216     def trash_intermediate_output(self):
217         logger.info("Cleaning up intermediate output collections")
218         for i in self.intermediate_output_collections:
219             try:
220                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
221             except:
222                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
223             if sys.exc_info()[0] is KeyboardInterrupt:
224                 break
225
226     def check_features(self, obj):
227         if isinstance(obj, dict):
228             if obj.get("writable"):
229                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
230             if obj.get("class") == "DockerRequirement":
231                 if obj.get("dockerOutputDirectory"):
232                     # TODO: can be supported by containers API, but not jobs API.
233                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
234                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
235             for v in obj.itervalues():
236                 self.check_features(v)
237         elif isinstance(obj, list):
238             for i,v in enumerate(obj):
239                 with SourceLine(obj, i, UnsupportedRequirement):
240                     self.check_features(v)
241
242     def make_output_collection(self, name, tagsString, outputObj):
243         outputObj = copy.deepcopy(outputObj)
244
245         files = []
246         def capture(fileobj):
247             files.append(fileobj)
248
249         adjustDirObjs(outputObj, capture)
250         adjustFileObjs(outputObj, capture)
251
252         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
253
254         final = arvados.collection.Collection(api_client=self.api,
255                                               keep_client=self.keep_client,
256                                               num_retries=self.num_retries)
257
258         for k,v in generatemapper.items():
259             if k.startswith("_:"):
260                 if v.type == "Directory":
261                     continue
262                 if v.type == "CreateFile":
263                     with final.open(v.target, "wb") as f:
264                         f.write(v.resolved.encode("utf-8"))
265                     continue
266
267             if not k.startswith("keep:"):
268                 raise Exception("Output source is not in keep or a literal")
269             sp = k.split("/")
270             srccollection = sp[0][5:]
271             try:
272                 reader = self.collection_cache.get(srccollection)
273                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
274                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
275             except arvados.errors.ArgumentError as e:
276                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
277                 raise
278             except IOError as e:
279                 logger.warn("While preparing output collection: %s", e)
280
281         def rewrite(fileobj):
282             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
283             for k in ("basename", "listing", "contents"):
284                 if k in fileobj:
285                     del fileobj[k]
286
287         adjustDirObjs(outputObj, rewrite)
288         adjustFileObjs(outputObj, rewrite)
289
290         with final.open("cwl.output.json", "w") as f:
291             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
292
293         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
294
295         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
296                     final.api_response()["name"],
297                     final.manifest_locator())
298
299         final_uuid = final.manifest_locator()
300         tags = tagsString.split(',')
301         for tag in tags:
302              self.api.links().create(body={
303                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
304                 }).execute(num_retries=self.num_retries)
305
306         def finalcollection(fileobj):
307             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
308
309         adjustDirObjs(outputObj, finalcollection)
310         adjustFileObjs(outputObj, finalcollection)
311
312         return (outputObj, final)
313
314     def set_crunch_output(self):
315         if self.work_api == "containers":
316             try:
317                 current = self.api.containers().current().execute(num_retries=self.num_retries)
318             except ApiError as e:
319                 # Status code 404 just means we're not running in a container.
320                 if e.resp.status != 404:
321                     logger.info("Getting current container: %s", e)
322                 return
323             try:
324                 self.api.containers().update(uuid=current['uuid'],
325                                              body={
326                                                  'output': self.final_output_collection.portable_data_hash(),
327                                              }).execute(num_retries=self.num_retries)
328                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
329                                               body={
330                                                   'is_trashed': True
331                                               }).execute(num_retries=self.num_retries)
332             except Exception as e:
333                 logger.info("Setting container output: %s", e)
334         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
335             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
336                                    body={
337                                        'output': self.final_output_collection.portable_data_hash(),
338                                        'success': self.final_status == "success",
339                                        'progress':1.0
340                                    }).execute(num_retries=self.num_retries)
341
342     def arv_executor(self, tool, job_order, **kwargs):
343         self.debug = kwargs.get("debug")
344
345         tool.visit(self.check_features)
346
347         self.project_uuid = kwargs.get("project_uuid")
348         self.pipeline = None
349         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
350                                                                  collection_cache=self.collection_cache)
351         self.fs_access = make_fs_access(kwargs["basedir"])
352
353
354         self.trash_intermediate = kwargs["trash_intermediate"]
355         if self.trash_intermediate and self.work_api != "containers":
356             raise Exception("--trash-intermediate is only supported with --api=containers.")
357
358         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
359         if self.intermediate_output_ttl and self.work_api != "containers":
360             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
361         if self.intermediate_output_ttl < 0:
362             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
363
364         if not kwargs.get("name"):
365             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
366
367         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
368         # Also uploads docker images.
369         override_tools = {}
370         upload_workflow_deps(self, tool, override_tools)
371
372         # Reload tool object which may have been updated by
373         # upload_workflow_deps
374         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
375                                   makeTool=self.arv_make_tool,
376                                   loader=tool.doc_loader,
377                                   avsc_names=tool.doc_schema,
378                                   metadata=tool.metadata,
379                                   override_tools=override_tools)
380
381         # Upload local file references in the job order.
382         job_order = upload_job_order(self, "%s input" % kwargs["name"],
383                                      tool, job_order)
384
385         existing_uuid = kwargs.get("update_workflow")
386         if existing_uuid or kwargs.get("create_workflow"):
387             # Create a pipeline template or workflow record and exit.
388             if self.work_api == "jobs":
389                 tmpl = RunnerTemplate(self, tool, job_order,
390                                       kwargs.get("enable_reuse"),
391                                       uuid=existing_uuid,
392                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
393                                       name=kwargs["name"])
394                 tmpl.save()
395                 # cwltool.main will write our return value to stdout.
396                 return (tmpl.uuid, "success")
397             elif self.work_api == "containers":
398                 return (upload_workflow(self, tool, job_order,
399                                         self.project_uuid,
400                                         uuid=existing_uuid,
401                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
402                                         name=kwargs["name"]),
403                         "success")
404
405         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
406
407         kwargs["make_fs_access"] = make_fs_access
408         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
409         kwargs["use_container"] = True
410         kwargs["tmpdir_prefix"] = "tmp"
411         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
412
413         if self.work_api == "containers":
414             kwargs["outdir"] = "/var/spool/cwl"
415             kwargs["docker_outdir"] = "/var/spool/cwl"
416             kwargs["tmpdir"] = "/tmp"
417             kwargs["docker_tmpdir"] = "/tmp"
418         elif self.work_api == "jobs":
419             kwargs["outdir"] = "$(task.outdir)"
420             kwargs["docker_outdir"] = "$(task.outdir)"
421             kwargs["tmpdir"] = "$(task.tmpdir)"
422
423         runnerjob = None
424         if kwargs.get("submit"):
425             # Submit a runner job to run the workflow for us.
426             if self.work_api == "containers":
427                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
428                     kwargs["runnerjob"] = tool.tool["id"]
429                     runnerjob = tool.job(job_order,
430                                          self.output_callback,
431                                          **kwargs).next()
432                 else:
433                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
434                                                 self.output_name,
435                                                 self.output_tags,
436                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
437                                                 name=kwargs.get("name"),
438                                                 on_error=kwargs.get("on_error"),
439                                                 submit_runner_image=kwargs.get("submit_runner_image"),
440                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
441             elif self.work_api == "jobs":
442                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
443                                       self.output_name,
444                                       self.output_tags,
445                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
446                                       name=kwargs.get("name"),
447                                       on_error=kwargs.get("on_error"),
448                                       submit_runner_image=kwargs.get("submit_runner_image"))
449         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
450             # Create pipeline for local run
451             self.pipeline = self.api.pipeline_instances().create(
452                 body={
453                     "owner_uuid": self.project_uuid,
454                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
455                     "components": {},
456                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
457             logger.info("Pipeline instance %s", self.pipeline["uuid"])
458
459         if runnerjob and not kwargs.get("wait"):
460             runnerjob.run(wait=kwargs.get("wait"))
461             return (runnerjob.uuid, "success")
462
463         self.poll_api = arvados.api('v1')
464         self.polling_thread = threading.Thread(target=self.poll_states)
465         self.polling_thread.start()
466
467         if runnerjob:
468             jobiter = iter((runnerjob,))
469         else:
470             if "cwl_runner_job" in kwargs:
471                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
472             jobiter = tool.job(job_order,
473                                self.output_callback,
474                                **kwargs)
475
476         try:
477             self.cond.acquire()
478             # Will continue to hold the lock for the duration of this code
479             # except when in cond.wait(), at which point on_message can update
480             # job state and process output callbacks.
481
482             loopperf = Perf(metrics, "jobiter")
483             loopperf.__enter__()
484             for runnable in jobiter:
485                 loopperf.__exit__()
486
487                 if self.stop_polling.is_set():
488                     break
489
490                 if runnable:
491                     with Perf(metrics, "run"):
492                         runnable.run(**kwargs)
493                 else:
494                     if self.processes:
495                         self.cond.wait(1)
496                     else:
497                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
498                         break
499                 loopperf.__enter__()
500             loopperf.__exit__()
501
502             while self.processes:
503                 self.cond.wait(1)
504
505         except UnsupportedRequirement:
506             raise
507         except:
508             if sys.exc_info()[0] is KeyboardInterrupt:
509                 logger.error("Interrupted, marking pipeline as failed")
510             else:
511                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
512             if self.pipeline:
513                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
514                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
515             if runnerjob and runnerjob.uuid and self.work_api == "containers":
516                 self.api.container_requests().update(uuid=runnerjob.uuid,
517                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
518         finally:
519             self.cond.release()
520             self.stop_polling.set()
521             self.polling_thread.join()
522
523         if self.final_status == "UnsupportedRequirement":
524             raise UnsupportedRequirement("Check log for details.")
525
526         if self.final_output is None:
527             raise WorkflowException("Workflow did not return a result.")
528
529         if kwargs.get("submit") and isinstance(runnerjob, Runner):
530             logger.info("Final output collection %s", runnerjob.final_output)
531         else:
532             if self.output_name is None:
533                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
534             if self.output_tags is None:
535                 self.output_tags = ""
536             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
537             self.set_crunch_output()
538
539         if kwargs.get("compute_checksum"):
540             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
541             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
542
543         if self.trash_intermediate and self.final_status == "success":
544             self.trash_intermediate_output()
545
546         return (self.final_output, self.final_status)
547
548
549 def versionstring():
550     """Print version string of key packages for provenance and debugging."""
551
552     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
553     arvpkg = pkg_resources.require("arvados-python-client")
554     cwlpkg = pkg_resources.require("cwltool")
555
556     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
557                                     "arvados-python-client", arvpkg[0].version,
558                                     "cwltool", cwlpkg[0].version)
559
560
561 def arg_parser():  # type: () -> argparse.ArgumentParser
562     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
563
564     parser.add_argument("--basedir", type=str,
565                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
566     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
567                         help="Output directory, default current directory")
568
569     parser.add_argument("--eval-timeout",
570                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
571                         type=float,
572                         default=20)
573
574     exgroup = parser.add_mutually_exclusive_group()
575     exgroup.add_argument("--print-dot", action="store_true",
576                          help="Print workflow visualization in graphviz format and exit")
577     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
578     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
579
580     exgroup = parser.add_mutually_exclusive_group()
581     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
582     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
583     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
584
585     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
586
587     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
588
589     exgroup = parser.add_mutually_exclusive_group()
590     exgroup.add_argument("--enable-reuse", action="store_true",
591                         default=True, dest="enable_reuse",
592                         help="Enable job or container reuse (default)")
593     exgroup.add_argument("--disable-reuse", action="store_false",
594                         default=True, dest="enable_reuse",
595                         help="Disable job or container reuse")
596
597     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
598     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
599     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
600     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
601                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
602                         default=False)
603
604     exgroup = parser.add_mutually_exclusive_group()
605     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
606                         default=True, dest="submit")
607     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
608                         default=True, dest="submit")
609     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
610                          dest="create_workflow")
611     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
612     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
613
614     exgroup = parser.add_mutually_exclusive_group()
615     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
616                         default=True, dest="wait")
617     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
618                         default=True, dest="wait")
619
620     exgroup = parser.add_mutually_exclusive_group()
621     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
622                         default=True, dest="log_timestamps")
623     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
624                         default=True, dest="log_timestamps")
625
626     parser.add_argument("--api", type=str,
627                         default=None, dest="work_api",
628                         choices=("jobs", "containers"),
629                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
630
631     parser.add_argument("--compute-checksum", action="store_true", default=False,
632                         help="Compute checksum of contents while collecting outputs",
633                         dest="compute_checksum")
634
635     parser.add_argument("--submit-runner-ram", type=int,
636                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
637                         default=1024)
638
639     parser.add_argument("--submit-runner-image", type=str,
640                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
641                         default=None)
642
643     parser.add_argument("--name", type=str,
644                         help="Name to use for workflow execution instance.",
645                         default=None)
646
647     parser.add_argument("--on-error", type=str,
648                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
649                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
650
651     parser.add_argument("--enable-dev", action="store_true",
652                         help="Enable loading and running development versions "
653                              "of CWL spec.", default=False)
654
655     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
656                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
657                         default=0)
658
659     exgroup = parser.add_mutually_exclusive_group()
660     exgroup.add_argument("--trash-intermediate", action="store_true",
661                         default=False, dest="trash_intermediate",
662                          help="Immediately trash intermediate outputs on workflow success.")
663     exgroup.add_argument("--no-trash-intermediate", action="store_false",
664                         default=False, dest="trash_intermediate",
665                         help="Do not trash intermediate outputs (default).")
666
667     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
668     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
669
670     return parser
671
672 def add_arv_hints():
673     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
674     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
675     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
676     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
677     res.close()
678     cwltool.process.supportedProcessRequirements.extend([
679         "http://arvados.org/cwl#RunInSingleContainer",
680         "http://arvados.org/cwl#OutputDirType",
681         "http://arvados.org/cwl#RuntimeConstraints",
682         "http://arvados.org/cwl#PartitionRequirement",
683         "http://arvados.org/cwl#APIRequirement",
684         "http://commonwl.org/cwltool#LoadListingRequirement",
685         "http://arvados.org/cwl#IntermediateOutput",
686         "http://arvados.org/cwl#ReuseRequirement"
687     ])
688
689 def main(args, stdout, stderr, api_client=None, keep_client=None):
690     parser = arg_parser()
691
692     job_order_object = None
693     arvargs = parser.parse_args(args)
694
695     if arvargs.version:
696         print versionstring()
697         return
698
699     if arvargs.update_workflow:
700         if arvargs.update_workflow.find('-7fd4e-') == 5:
701             want_api = 'containers'
702         elif arvargs.update_workflow.find('-p5p6p-') == 5:
703             want_api = 'jobs'
704         else:
705             want_api = None
706         if want_api and arvargs.work_api and want_api != arvargs.work_api:
707             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
708                 arvargs.update_workflow, want_api, arvargs.work_api))
709             return 1
710         arvargs.work_api = want_api
711
712     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
713         job_order_object = ({}, "")
714
715     add_arv_hints()
716
717     try:
718         if api_client is None:
719             api_client=arvados.api('v1', model=OrderedJsonModel())
720         if keep_client is None:
721             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
722         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
723                               num_retries=4, output_name=arvargs.output_name,
724                               output_tags=arvargs.output_tags)
725     except Exception as e:
726         logger.error(e)
727         return 1
728
729     if arvargs.debug:
730         logger.setLevel(logging.DEBUG)
731         logging.getLogger('arvados').setLevel(logging.DEBUG)
732
733     if arvargs.quiet:
734         logger.setLevel(logging.WARN)
735         logging.getLogger('arvados').setLevel(logging.WARN)
736         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
737
738     if arvargs.metrics:
739         metrics.setLevel(logging.DEBUG)
740         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
741
742     if arvargs.log_timestamps:
743         arvados.log_handler.setFormatter(logging.Formatter(
744             '%(asctime)s %(name)s %(levelname)s: %(message)s',
745             '%Y-%m-%d %H:%M:%S'))
746     else:
747         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
748
749     arvargs.conformance_test = None
750     arvargs.use_container = True
751     arvargs.relax_path_checks = True
752     arvargs.validate = None
753     arvargs.print_supported_versions = False
754
755     make_fs_access = partial(CollectionFsAccess,
756                            collection_cache=runner.collection_cache)
757
758     return cwltool.main.main(args=arvargs,
759                              stdout=stdout,
760                              stderr=stderr,
761                              executor=runner.arv_executor,
762                              makeTool=runner.arv_make_tool,
763                              versionfunc=versionstring,
764                              job_order_object=job_order_object,
765                              make_fs_access=make_fs_access,
766                              fetcher_constructor=partial(CollectionFetcher,
767                                                          api_client=api_client,
768                                                          fs_access=make_fs_access(""),
769                                                          num_retries=runner.num_retries),
770                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
771                              logger_handler=arvados.log_handler,
772                              custom_schema_callback=add_arv_hints)