Merge branch '12100-cwltool-update' closes #12100
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20
21 from cwltool.errors import WorkflowException
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 import schema_salad
26 from schema_salad.sourceline import SourceLine
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
42
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.draft2tool import compute_checksums
47 from arvados.api import OrderedJsonModel
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
52
53 arvados.log_handler.setFormatter(logging.Formatter(
54         '%(asctime)s %(name)s %(levelname)s: %(message)s',
55         '%Y-%m-%d %H:%M:%S'))
56
57 class ArvCwlRunner(object):
58     """Execute a CWL tool or workflow, submit work (using either jobs or
59     containers API), wait for them to complete, and report output.
60
61     """
62
63     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
64         self.api = api_client
65         self.processes = {}
66         self.lock = threading.Lock()
67         self.cond = threading.Condition(self.lock)
68         self.final_output = None
69         self.final_status = None
70         self.uploaded = {}
71         self.num_retries = num_retries
72         self.uuid = None
73         self.stop_polling = threading.Event()
74         self.poll_api = None
75         self.pipeline = None
76         self.final_output_collection = None
77         self.output_name = output_name
78         self.output_tags = output_tags
79         self.project_uuid = None
80         self.intermediate_output_ttl = 0
81         self.intermediate_output_collections = []
82         self.trash_intermediate = False
83
84         if keep_client is not None:
85             self.keep_client = keep_client
86         else:
87             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
88
89         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
90
91         self.work_api = None
92         expected_api = ["jobs", "containers"]
93         for api in expected_api:
94             try:
95                 methods = self.api._rootDesc.get('resources')[api]['methods']
96                 if ('httpMethod' in methods['create'] and
97                     (work_api == api or work_api is None)):
98                     self.work_api = api
99                     break
100             except KeyError:
101                 pass
102
103         if not self.work_api:
104             if work_api is None:
105                 raise Exception("No supported APIs")
106             else:
107                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
108
109     def arv_make_tool(self, toolpath_object, **kwargs):
110         kwargs["work_api"] = self.work_api
111         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
112                                                 api_client=self.api,
113                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
114                                                 num_retries=self.num_retries,
115                                                 overrides=kwargs.get("override_tools"))
116         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
117         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
118             return ArvadosCommandTool(self, toolpath_object, **kwargs)
119         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
120             return ArvadosWorkflow(self, toolpath_object, **kwargs)
121         else:
122             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
123
124     def output_callback(self, out, processStatus):
125         if processStatus == "success":
126             logger.info("Overall process status is %s", processStatus)
127             if self.pipeline:
128                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
129                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
130         else:
131             logger.warn("Overall process status is %s", processStatus)
132             if self.pipeline:
133                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
134                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
135         self.final_status = processStatus
136         self.final_output = out
137
138     def on_message(self, event):
139         if "object_uuid" in event:
140             if event["object_uuid"] in self.processes and event["event_type"] == "update":
141                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
142                     uuid = event["object_uuid"]
143                     with self.lock:
144                         j = self.processes[uuid]
145                         logger.info("%s %s is Running", self.label(j), uuid)
146                         j.running = True
147                         j.update_pipeline_component(event["properties"]["new_attributes"])
148                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
149                     uuid = event["object_uuid"]
150                     try:
151                         self.cond.acquire()
152                         j = self.processes[uuid]
153                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
154                         with Perf(metrics, "done %s" % j.name):
155                             j.done(event["properties"]["new_attributes"])
156                         self.cond.notify()
157                     finally:
158                         self.cond.release()
159
160     def label(self, obj):
161         return "[%s %s]" % (self.work_api[0:-1], obj.name)
162
163     def poll_states(self):
164         """Poll status of jobs or containers listed in the processes dict.
165
166         Runs in a separate thread.
167         """
168
169         try:
170             while True:
171                 self.stop_polling.wait(15)
172                 if self.stop_polling.is_set():
173                     break
174                 with self.lock:
175                     keys = self.processes.keys()
176                 if not keys:
177                     continue
178
179                 if self.work_api == "containers":
180                     table = self.poll_api.container_requests()
181                 elif self.work_api == "jobs":
182                     table = self.poll_api.jobs()
183
184                 try:
185                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
186                 except Exception as e:
187                     logger.warn("Error checking states on API server: %s", e)
188                     continue
189
190                 for p in proc_states["items"]:
191                     self.on_message({
192                         "object_uuid": p["uuid"],
193                         "event_type": "update",
194                         "properties": {
195                             "new_attributes": p
196                         }
197                     })
198         except:
199             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
200             self.cond.acquire()
201             self.processes.clear()
202             self.cond.notify()
203             self.cond.release()
204         finally:
205             self.stop_polling.set()
206
207     def get_uploaded(self):
208         return self.uploaded.copy()
209
210     def add_uploaded(self, src, pair):
211         self.uploaded[src] = pair
212
213     def add_intermediate_output(self, uuid):
214         if uuid:
215             self.intermediate_output_collections.append(uuid)
216
217     def trash_intermediate_output(self):
218         logger.info("Cleaning up intermediate output collections")
219         for i in self.intermediate_output_collections:
220             try:
221                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
222             except:
223                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
224             if sys.exc_info()[0] is KeyboardInterrupt:
225                 break
226
227     def check_features(self, obj):
228         if isinstance(obj, dict):
229             if obj.get("writable"):
230                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
231             if obj.get("class") == "DockerRequirement":
232                 if obj.get("dockerOutputDirectory"):
233                     # TODO: can be supported by containers API, but not jobs API.
234                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
235                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
236             for v in obj.itervalues():
237                 self.check_features(v)
238         elif isinstance(obj, list):
239             for i,v in enumerate(obj):
240                 with SourceLine(obj, i, UnsupportedRequirement):
241                     self.check_features(v)
242
243     def make_output_collection(self, name, tagsString, outputObj):
244         outputObj = copy.deepcopy(outputObj)
245
246         files = []
247         def capture(fileobj):
248             files.append(fileobj)
249
250         adjustDirObjs(outputObj, capture)
251         adjustFileObjs(outputObj, capture)
252
253         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
254
255         final = arvados.collection.Collection(api_client=self.api,
256                                               keep_client=self.keep_client,
257                                               num_retries=self.num_retries)
258
259         for k,v in generatemapper.items():
260             if k.startswith("_:"):
261                 if v.type == "Directory":
262                     continue
263                 if v.type == "CreateFile":
264                     with final.open(v.target, "wb") as f:
265                         f.write(v.resolved.encode("utf-8"))
266                     continue
267
268             if not k.startswith("keep:"):
269                 raise Exception("Output source is not in keep or a literal")
270             sp = k.split("/")
271             srccollection = sp[0][5:]
272             try:
273                 reader = self.collection_cache.get(srccollection)
274                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
275                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
276             except arvados.errors.ArgumentError as e:
277                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
278                 raise
279             except IOError as e:
280                 logger.warn("While preparing output collection: %s", e)
281
282         def rewrite(fileobj):
283             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
284             for k in ("basename", "listing", "contents"):
285                 if k in fileobj:
286                     del fileobj[k]
287
288         adjustDirObjs(outputObj, rewrite)
289         adjustFileObjs(outputObj, rewrite)
290
291         with final.open("cwl.output.json", "w") as f:
292             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
293
294         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
295
296         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
297                     final.api_response()["name"],
298                     final.manifest_locator())
299
300         final_uuid = final.manifest_locator()
301         tags = tagsString.split(',')
302         for tag in tags:
303              self.api.links().create(body={
304                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
305                 }).execute(num_retries=self.num_retries)
306
307         def finalcollection(fileobj):
308             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
309
310         adjustDirObjs(outputObj, finalcollection)
311         adjustFileObjs(outputObj, finalcollection)
312
313         return (outputObj, final)
314
315     def set_crunch_output(self):
316         if self.work_api == "containers":
317             try:
318                 current = self.api.containers().current().execute(num_retries=self.num_retries)
319             except ApiError as e:
320                 # Status code 404 just means we're not running in a container.
321                 if e.resp.status != 404:
322                     logger.info("Getting current container: %s", e)
323                 return
324             try:
325                 self.api.containers().update(uuid=current['uuid'],
326                                              body={
327                                                  'output': self.final_output_collection.portable_data_hash(),
328                                              }).execute(num_retries=self.num_retries)
329                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
330                                               body={
331                                                   'is_trashed': True
332                                               }).execute(num_retries=self.num_retries)
333             except Exception as e:
334                 logger.info("Setting container output: %s", e)
335         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
336             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
337                                    body={
338                                        'output': self.final_output_collection.portable_data_hash(),
339                                        'success': self.final_status == "success",
340                                        'progress':1.0
341                                    }).execute(num_retries=self.num_retries)
342
343     def arv_executor(self, tool, job_order, **kwargs):
344         self.debug = kwargs.get("debug")
345
346         tool.visit(self.check_features)
347
348         self.project_uuid = kwargs.get("project_uuid")
349         self.pipeline = None
350         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
351                                                                  collection_cache=self.collection_cache)
352         self.fs_access = make_fs_access(kwargs["basedir"])
353
354
355         self.trash_intermediate = kwargs["trash_intermediate"]
356         if self.trash_intermediate and self.work_api != "containers":
357             raise Exception("--trash-intermediate is only supported with --api=containers.")
358
359         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
360         if self.intermediate_output_ttl and self.work_api != "containers":
361             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
362         if self.intermediate_output_ttl < 0:
363             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
364
365         if not kwargs.get("name"):
366             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
367
368         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
369         # Also uploads docker images.
370         override_tools = {}
371         upload_workflow_deps(self, tool, override_tools)
372
373         # Reload tool object which may have been updated by
374         # upload_workflow_deps
375         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
376                                   makeTool=self.arv_make_tool,
377                                   loader=tool.doc_loader,
378                                   avsc_names=tool.doc_schema,
379                                   metadata=tool.metadata,
380                                   override_tools=override_tools)
381
382         # Upload local file references in the job order.
383         job_order = upload_job_order(self, "%s input" % kwargs["name"],
384                                      tool, job_order)
385
386         existing_uuid = kwargs.get("update_workflow")
387         if existing_uuid or kwargs.get("create_workflow"):
388             # Create a pipeline template or workflow record and exit.
389             if self.work_api == "jobs":
390                 tmpl = RunnerTemplate(self, tool, job_order,
391                                       kwargs.get("enable_reuse"),
392                                       uuid=existing_uuid,
393                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
394                                       name=kwargs["name"])
395                 tmpl.save()
396                 # cwltool.main will write our return value to stdout.
397                 return (tmpl.uuid, "success")
398             elif self.work_api == "containers":
399                 return (upload_workflow(self, tool, job_order,
400                                         self.project_uuid,
401                                         uuid=existing_uuid,
402                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
403                                         name=kwargs["name"]),
404                         "success")
405
406         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
407
408         kwargs["make_fs_access"] = make_fs_access
409         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
410         kwargs["use_container"] = True
411         kwargs["tmpdir_prefix"] = "tmp"
412         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
413
414         if self.work_api == "containers":
415             kwargs["outdir"] = "/var/spool/cwl"
416             kwargs["docker_outdir"] = "/var/spool/cwl"
417             kwargs["tmpdir"] = "/tmp"
418             kwargs["docker_tmpdir"] = "/tmp"
419         elif self.work_api == "jobs":
420             kwargs["outdir"] = "$(task.outdir)"
421             kwargs["docker_outdir"] = "$(task.outdir)"
422             kwargs["tmpdir"] = "$(task.tmpdir)"
423
424         runnerjob = None
425         if kwargs.get("submit"):
426             # Submit a runner job to run the workflow for us.
427             if self.work_api == "containers":
428                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
429                     kwargs["runnerjob"] = tool.tool["id"]
430                     runnerjob = tool.job(job_order,
431                                          self.output_callback,
432                                          **kwargs).next()
433                 else:
434                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
435                                                 self.output_name,
436                                                 self.output_tags,
437                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
438                                                 name=kwargs.get("name"),
439                                                 on_error=kwargs.get("on_error"),
440                                                 submit_runner_image=kwargs.get("submit_runner_image"),
441                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
442             elif self.work_api == "jobs":
443                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
444                                       self.output_name,
445                                       self.output_tags,
446                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
447                                       name=kwargs.get("name"),
448                                       on_error=kwargs.get("on_error"),
449                                       submit_runner_image=kwargs.get("submit_runner_image"))
450         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
451             # Create pipeline for local run
452             self.pipeline = self.api.pipeline_instances().create(
453                 body={
454                     "owner_uuid": self.project_uuid,
455                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
456                     "components": {},
457                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
458             logger.info("Pipeline instance %s", self.pipeline["uuid"])
459
460         if runnerjob and not kwargs.get("wait"):
461             runnerjob.run(wait=kwargs.get("wait"))
462             return (runnerjob.uuid, "success")
463
464         self.poll_api = arvados.api('v1')
465         self.polling_thread = threading.Thread(target=self.poll_states)
466         self.polling_thread.start()
467
468         if runnerjob:
469             jobiter = iter((runnerjob,))
470         else:
471             if "cwl_runner_job" in kwargs:
472                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
473             jobiter = tool.job(job_order,
474                                self.output_callback,
475                                **kwargs)
476
477         try:
478             self.cond.acquire()
479             # Will continue to hold the lock for the duration of this code
480             # except when in cond.wait(), at which point on_message can update
481             # job state and process output callbacks.
482
483             loopperf = Perf(metrics, "jobiter")
484             loopperf.__enter__()
485             for runnable in jobiter:
486                 loopperf.__exit__()
487
488                 if self.stop_polling.is_set():
489                     break
490
491                 if runnable:
492                     with Perf(metrics, "run"):
493                         runnable.run(**kwargs)
494                 else:
495                     if self.processes:
496                         self.cond.wait(1)
497                     else:
498                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
499                         break
500                 loopperf.__enter__()
501             loopperf.__exit__()
502
503             while self.processes:
504                 self.cond.wait(1)
505
506         except UnsupportedRequirement:
507             raise
508         except:
509             if sys.exc_info()[0] is KeyboardInterrupt:
510                 logger.error("Interrupted, marking pipeline as failed")
511             else:
512                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
513             if self.pipeline:
514                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
515                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
516             if runnerjob and runnerjob.uuid and self.work_api == "containers":
517                 self.api.container_requests().update(uuid=runnerjob.uuid,
518                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
519         finally:
520             self.cond.release()
521             self.stop_polling.set()
522             self.polling_thread.join()
523
524         if self.final_status == "UnsupportedRequirement":
525             raise UnsupportedRequirement("Check log for details.")
526
527         if self.final_output is None:
528             raise WorkflowException("Workflow did not return a result.")
529
530         if kwargs.get("submit") and isinstance(runnerjob, Runner):
531             logger.info("Final output collection %s", runnerjob.final_output)
532         else:
533             if self.output_name is None:
534                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
535             if self.output_tags is None:
536                 self.output_tags = ""
537             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
538             self.set_crunch_output()
539
540         if kwargs.get("compute_checksum"):
541             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
542             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
543
544         if self.trash_intermediate and self.final_status == "success":
545             self.trash_intermediate_output()
546
547         return (self.final_output, self.final_status)
548
549
550 def versionstring():
551     """Print version string of key packages for provenance and debugging."""
552
553     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
554     arvpkg = pkg_resources.require("arvados-python-client")
555     cwlpkg = pkg_resources.require("cwltool")
556
557     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
558                                     "arvados-python-client", arvpkg[0].version,
559                                     "cwltool", cwlpkg[0].version)
560
561
562 def arg_parser():  # type: () -> argparse.ArgumentParser
563     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
564
565     parser.add_argument("--basedir", type=str,
566                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
567     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
568                         help="Output directory, default current directory")
569
570     parser.add_argument("--eval-timeout",
571                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
572                         type=float,
573                         default=20)
574
575     exgroup = parser.add_mutually_exclusive_group()
576     exgroup.add_argument("--print-dot", action="store_true",
577                          help="Print workflow visualization in graphviz format and exit")
578     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
579     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
580
581     exgroup = parser.add_mutually_exclusive_group()
582     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
583     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
584     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
585
586     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
587
588     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
589
590     exgroup = parser.add_mutually_exclusive_group()
591     exgroup.add_argument("--enable-reuse", action="store_true",
592                         default=True, dest="enable_reuse",
593                         help="Enable job or container reuse (default)")
594     exgroup.add_argument("--disable-reuse", action="store_false",
595                         default=True, dest="enable_reuse",
596                         help="Disable job or container reuse")
597
598     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
599     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
600     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
601     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
602                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
603                         default=False)
604
605     exgroup = parser.add_mutually_exclusive_group()
606     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
607                         default=True, dest="submit")
608     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
609                         default=True, dest="submit")
610     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
611                          dest="create_workflow")
612     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
613     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
614
615     exgroup = parser.add_mutually_exclusive_group()
616     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
617                         default=True, dest="wait")
618     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
619                         default=True, dest="wait")
620
621     exgroup = parser.add_mutually_exclusive_group()
622     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
623                         default=True, dest="log_timestamps")
624     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
625                         default=True, dest="log_timestamps")
626
627     parser.add_argument("--api", type=str,
628                         default=None, dest="work_api",
629                         choices=("jobs", "containers"),
630                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
631
632     parser.add_argument("--compute-checksum", action="store_true", default=False,
633                         help="Compute checksum of contents while collecting outputs",
634                         dest="compute_checksum")
635
636     parser.add_argument("--submit-runner-ram", type=int,
637                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
638                         default=1024)
639
640     parser.add_argument("--submit-runner-image", type=str,
641                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
642                         default=None)
643
644     parser.add_argument("--name", type=str,
645                         help="Name to use for workflow execution instance.",
646                         default=None)
647
648     parser.add_argument("--on-error", type=str,
649                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
650                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
651
652     parser.add_argument("--enable-dev", action="store_true",
653                         help="Enable loading and running development versions "
654                              "of CWL spec.", default=False)
655
656     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
657                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
658                         default=0)
659
660     exgroup = parser.add_mutually_exclusive_group()
661     exgroup.add_argument("--trash-intermediate", action="store_true",
662                         default=False, dest="trash_intermediate",
663                          help="Immediately trash intermediate outputs on workflow success.")
664     exgroup.add_argument("--no-trash-intermediate", action="store_false",
665                         default=False, dest="trash_intermediate",
666                         help="Do not trash intermediate outputs (default).")
667
668     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
669     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
670
671     return parser
672
673 def add_arv_hints():
674     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
675     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
676     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
677     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
678     res.close()
679     cwltool.process.supportedProcessRequirements.extend([
680         "http://arvados.org/cwl#RunInSingleContainer",
681         "http://arvados.org/cwl#OutputDirType",
682         "http://arvados.org/cwl#RuntimeConstraints",
683         "http://arvados.org/cwl#PartitionRequirement",
684         "http://arvados.org/cwl#APIRequirement",
685         "http://commonwl.org/cwltool#LoadListingRequirement",
686         "http://arvados.org/cwl#IntermediateOutput",
687         "http://arvados.org/cwl#ReuseRequirement"
688     ])
689
690 def main(args, stdout, stderr, api_client=None, keep_client=None):
691     parser = arg_parser()
692
693     job_order_object = None
694     arvargs = parser.parse_args(args)
695
696     if arvargs.version:
697         print versionstring()
698         return
699
700     if arvargs.update_workflow:
701         if arvargs.update_workflow.find('-7fd4e-') == 5:
702             want_api = 'containers'
703         elif arvargs.update_workflow.find('-p5p6p-') == 5:
704             want_api = 'jobs'
705         else:
706             want_api = None
707         if want_api and arvargs.work_api and want_api != arvargs.work_api:
708             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
709                 arvargs.update_workflow, want_api, arvargs.work_api))
710             return 1
711         arvargs.work_api = want_api
712
713     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
714         job_order_object = ({}, "")
715
716     add_arv_hints()
717
718     try:
719         if api_client is None:
720             api_client=arvados.api('v1', model=OrderedJsonModel())
721         if keep_client is None:
722             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
723         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
724                               num_retries=4, output_name=arvargs.output_name,
725                               output_tags=arvargs.output_tags)
726     except Exception as e:
727         logger.error(e)
728         return 1
729
730     if arvargs.debug:
731         logger.setLevel(logging.DEBUG)
732         logging.getLogger('arvados').setLevel(logging.DEBUG)
733
734     if arvargs.quiet:
735         logger.setLevel(logging.WARN)
736         logging.getLogger('arvados').setLevel(logging.WARN)
737         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
738
739     if arvargs.metrics:
740         metrics.setLevel(logging.DEBUG)
741         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
742
743     if arvargs.log_timestamps:
744         arvados.log_handler.setFormatter(logging.Formatter(
745             '%(asctime)s %(name)s %(levelname)s: %(message)s',
746             '%Y-%m-%d %H:%M:%S'))
747     else:
748         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
749
750     arvargs.conformance_test = None
751     arvargs.use_container = True
752     arvargs.relax_path_checks = True
753     arvargs.validate = None
754     arvargs.print_supported_versions = False
755
756     make_fs_access = partial(CollectionFsAccess,
757                            collection_cache=runner.collection_cache)
758
759     return cwltool.main.main(args=arvargs,
760                              stdout=stdout,
761                              stderr=stderr,
762                              executor=runner.arv_executor,
763                              makeTool=runner.arv_make_tool,
764                              versionfunc=versionstring,
765                              job_order_object=job_order_object,
766                              make_fs_access=make_fs_access,
767                              fetcher_constructor=partial(CollectionFetcher,
768                                                          api_client=api_client,
769                                                          fs_access=make_fs_access(""),
770                                                          num_retries=runner.num_retries),
771                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
772                              logger_handler=arvados.log_handler,
773                              custom_schema_callback=add_arv_hints)