Merge branch '11917-dont-clear-cache'
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20
21 from cwltool.errors import WorkflowException
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 import schema_salad
26 from schema_salad.sourceline import SourceLine
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
42
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.draft2tool import compute_checksums
47 from arvados.api import OrderedJsonModel
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
52
53 arvados.log_handler.setFormatter(logging.Formatter(
54         '%(asctime)s %(name)s %(levelname)s: %(message)s',
55         '%Y-%m-%d %H:%M:%S'))
56
57 class ArvCwlRunner(object):
58     """Execute a CWL tool or workflow, submit work (using either jobs or
59     containers API), wait for them to complete, and report output.
60
61     """
62
63     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
64         self.api = api_client
65         self.processes = {}
66         self.lock = threading.Lock()
67         self.cond = threading.Condition(self.lock)
68         self.final_output = None
69         self.final_status = None
70         self.uploaded = {}
71         self.num_retries = num_retries
72         self.uuid = None
73         self.stop_polling = threading.Event()
74         self.poll_api = None
75         self.pipeline = None
76         self.final_output_collection = None
77         self.output_name = output_name
78         self.output_tags = output_tags
79         self.project_uuid = None
80         self.intermediate_output_ttl = 0
81         self.intermediate_output_collections = []
82         self.trash_intermediate = False
83
84         if keep_client is not None:
85             self.keep_client = keep_client
86         else:
87             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
88
89         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
90
91         self.work_api = None
92         expected_api = ["jobs", "containers"]
93         for api in expected_api:
94             try:
95                 methods = self.api._rootDesc.get('resources')[api]['methods']
96                 if ('httpMethod' in methods['create'] and
97                     (work_api == api or work_api is None)):
98                     self.work_api = api
99                     break
100             except KeyError:
101                 pass
102
103         if not self.work_api:
104             if work_api is None:
105                 raise Exception("No supported APIs")
106             else:
107                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
108
109     def arv_make_tool(self, toolpath_object, **kwargs):
110         kwargs["work_api"] = self.work_api
111         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
112                                                 api_client=self.api,
113                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
114                                                 num_retries=self.num_retries,
115                                                 overrides=kwargs.get("override_tools"))
116         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
117             return ArvadosCommandTool(self, toolpath_object, **kwargs)
118         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
119             return ArvadosWorkflow(self, toolpath_object, **kwargs)
120         else:
121             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
122
123     def output_callback(self, out, processStatus):
124         if processStatus == "success":
125             logger.info("Overall process status is %s", processStatus)
126             if self.pipeline:
127                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
128                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
129         else:
130             logger.warn("Overall process status is %s", processStatus)
131             if self.pipeline:
132                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
133                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
134         self.final_status = processStatus
135         self.final_output = out
136
137     def on_message(self, event):
138         if "object_uuid" in event:
139             if event["object_uuid"] in self.processes and event["event_type"] == "update":
140                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
141                     uuid = event["object_uuid"]
142                     with self.lock:
143                         j = self.processes[uuid]
144                         logger.info("%s %s is Running", self.label(j), uuid)
145                         j.running = True
146                         j.update_pipeline_component(event["properties"]["new_attributes"])
147                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
148                     uuid = event["object_uuid"]
149                     try:
150                         self.cond.acquire()
151                         j = self.processes[uuid]
152                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
153                         with Perf(metrics, "done %s" % j.name):
154                             j.done(event["properties"]["new_attributes"])
155                         self.cond.notify()
156                     finally:
157                         self.cond.release()
158
159     def label(self, obj):
160         return "[%s %s]" % (self.work_api[0:-1], obj.name)
161
162     def poll_states(self):
163         """Poll status of jobs or containers listed in the processes dict.
164
165         Runs in a separate thread.
166         """
167
168         try:
169             while True:
170                 self.stop_polling.wait(15)
171                 if self.stop_polling.is_set():
172                     break
173                 with self.lock:
174                     keys = self.processes.keys()
175                 if not keys:
176                     continue
177
178                 if self.work_api == "containers":
179                     table = self.poll_api.container_requests()
180                 elif self.work_api == "jobs":
181                     table = self.poll_api.jobs()
182
183                 try:
184                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
185                 except Exception as e:
186                     logger.warn("Error checking states on API server: %s", e)
187                     continue
188
189                 for p in proc_states["items"]:
190                     self.on_message({
191                         "object_uuid": p["uuid"],
192                         "event_type": "update",
193                         "properties": {
194                             "new_attributes": p
195                         }
196                     })
197         except:
198             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
199             self.cond.acquire()
200             self.processes.clear()
201             self.cond.notify()
202             self.cond.release()
203         finally:
204             self.stop_polling.set()
205
206     def get_uploaded(self):
207         return self.uploaded.copy()
208
209     def add_uploaded(self, src, pair):
210         self.uploaded[src] = pair
211
212     def add_intermediate_output(self, uuid):
213         if uuid:
214             self.intermediate_output_collections.append(uuid)
215
216     def trash_intermediate_output(self):
217         logger.info("Cleaning up intermediate output collections")
218         for i in self.intermediate_output_collections:
219             try:
220                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
221             except:
222                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
223             if sys.exc_info()[0] is KeyboardInterrupt:
224                 break
225
226     def check_features(self, obj):
227         if isinstance(obj, dict):
228             if obj.get("writable"):
229                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
230             if obj.get("class") == "DockerRequirement":
231                 if obj.get("dockerOutputDirectory"):
232                     # TODO: can be supported by containers API, but not jobs API.
233                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
234                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
235             for v in obj.itervalues():
236                 self.check_features(v)
237         elif isinstance(obj, list):
238             for i,v in enumerate(obj):
239                 with SourceLine(obj, i, UnsupportedRequirement):
240                     self.check_features(v)
241
242     def make_output_collection(self, name, tagsString, outputObj):
243         outputObj = copy.deepcopy(outputObj)
244
245         files = []
246         def capture(fileobj):
247             files.append(fileobj)
248
249         adjustDirObjs(outputObj, capture)
250         adjustFileObjs(outputObj, capture)
251
252         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
253
254         final = arvados.collection.Collection(api_client=self.api,
255                                               keep_client=self.keep_client,
256                                               num_retries=self.num_retries)
257
258         for k,v in generatemapper.items():
259             if k.startswith("_:"):
260                 if v.type == "Directory":
261                     continue
262                 if v.type == "CreateFile":
263                     with final.open(v.target, "wb") as f:
264                         f.write(v.resolved.encode("utf-8"))
265                     continue
266
267             if not k.startswith("keep:"):
268                 raise Exception("Output source is not in keep or a literal")
269             sp = k.split("/")
270             srccollection = sp[0][5:]
271             try:
272                 reader = self.collection_cache.get(srccollection)
273                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
274                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
275             except arvados.errors.ArgumentError as e:
276                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
277                 raise
278             except IOError as e:
279                 logger.warn("While preparing output collection: %s", e)
280
281         def rewrite(fileobj):
282             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
283             for k in ("basename", "listing", "contents"):
284                 if k in fileobj:
285                     del fileobj[k]
286
287         adjustDirObjs(outputObj, rewrite)
288         adjustFileObjs(outputObj, rewrite)
289
290         with final.open("cwl.output.json", "w") as f:
291             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
292
293         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
294
295         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
296                     final.api_response()["name"],
297                     final.manifest_locator())
298
299         final_uuid = final.manifest_locator()
300         tags = tagsString.split(',')
301         for tag in tags:
302              self.api.links().create(body={
303                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
304                 }).execute(num_retries=self.num_retries)
305
306         def finalcollection(fileobj):
307             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
308
309         adjustDirObjs(outputObj, finalcollection)
310         adjustFileObjs(outputObj, finalcollection)
311
312         return (outputObj, final)
313
314     def set_crunch_output(self):
315         if self.work_api == "containers":
316             try:
317                 current = self.api.containers().current().execute(num_retries=self.num_retries)
318             except ApiError as e:
319                 # Status code 404 just means we're not running in a container.
320                 if e.resp.status != 404:
321                     logger.info("Getting current container: %s", e)
322                 return
323             try:
324                 self.api.containers().update(uuid=current['uuid'],
325                                              body={
326                                                  'output': self.final_output_collection.portable_data_hash(),
327                                              }).execute(num_retries=self.num_retries)
328                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
329                                               body={
330                                                   'is_trashed': True
331                                               }).execute(num_retries=self.num_retries)
332             except Exception as e:
333                 logger.info("Setting container output: %s", e)
334         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
335             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
336                                    body={
337                                        'output': self.final_output_collection.portable_data_hash(),
338                                        'success': self.final_status == "success",
339                                        'progress':1.0
340                                    }).execute(num_retries=self.num_retries)
341
342     def arv_executor(self, tool, job_order, **kwargs):
343         self.debug = kwargs.get("debug")
344
345         tool.visit(self.check_features)
346
347         self.project_uuid = kwargs.get("project_uuid")
348         self.pipeline = None
349         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
350                                                                  collection_cache=self.collection_cache)
351         self.fs_access = make_fs_access(kwargs["basedir"])
352
353
354         self.trash_intermediate = kwargs["trash_intermediate"]
355         if self.trash_intermediate and self.work_api != "containers":
356             raise Exception("--trash-intermediate is only supported with --api=containers.")
357
358         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
359         if self.intermediate_output_ttl and self.work_api != "containers":
360             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
361         if self.intermediate_output_ttl < 0:
362             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
363
364         if not kwargs.get("name"):
365             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
366
367         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
368         # Also uploads docker images.
369         override_tools = {}
370         upload_workflow_deps(self, tool, override_tools)
371
372         # Reload tool object which may have been updated by
373         # upload_workflow_deps
374         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
375                                   makeTool=self.arv_make_tool,
376                                   loader=tool.doc_loader,
377                                   avsc_names=tool.doc_schema,
378                                   metadata=tool.metadata,
379                                   override_tools=override_tools)
380
381         # Upload local file references in the job order.
382         job_order = upload_job_order(self, "%s input" % kwargs["name"],
383                                      tool, job_order)
384
385         existing_uuid = kwargs.get("update_workflow")
386         if existing_uuid or kwargs.get("create_workflow"):
387             # Create a pipeline template or workflow record and exit.
388             if self.work_api == "jobs":
389                 tmpl = RunnerTemplate(self, tool, job_order,
390                                       kwargs.get("enable_reuse"),
391                                       uuid=existing_uuid,
392                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
393                                       name=kwargs["name"])
394                 tmpl.save()
395                 # cwltool.main will write our return value to stdout.
396                 return (tmpl.uuid, "success")
397             elif self.work_api == "containers":
398                 return (upload_workflow(self, tool, job_order,
399                                         self.project_uuid,
400                                         uuid=existing_uuid,
401                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
402                                         name=kwargs["name"]),
403                         "success")
404
405         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
406
407         kwargs["make_fs_access"] = make_fs_access
408         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
409         kwargs["use_container"] = True
410         kwargs["tmpdir_prefix"] = "tmp"
411         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
412
413         if self.work_api == "containers":
414             kwargs["outdir"] = "/var/spool/cwl"
415             kwargs["docker_outdir"] = "/var/spool/cwl"
416             kwargs["tmpdir"] = "/tmp"
417             kwargs["docker_tmpdir"] = "/tmp"
418         elif self.work_api == "jobs":
419             kwargs["outdir"] = "$(task.outdir)"
420             kwargs["docker_outdir"] = "$(task.outdir)"
421             kwargs["tmpdir"] = "$(task.tmpdir)"
422
423         runnerjob = None
424         if kwargs.get("submit"):
425             # Submit a runner job to run the workflow for us.
426             if self.work_api == "containers":
427                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
428                     kwargs["runnerjob"] = tool.tool["id"]
429                     upload_dependencies(self,
430                                         kwargs["name"],
431                                         tool.doc_loader,
432                                         tool.tool,
433                                         tool.tool["id"],
434                                         False)
435                     runnerjob = tool.job(job_order,
436                                          self.output_callback,
437                                          **kwargs).next()
438                 else:
439                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
440                                                 self.output_name,
441                                                 self.output_tags,
442                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
443                                                 name=kwargs.get("name"),
444                                                 on_error=kwargs.get("on_error"),
445                                                 submit_runner_image=kwargs.get("submit_runner_image"),
446                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
447             elif self.work_api == "jobs":
448                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
449                                       self.output_name,
450                                       self.output_tags,
451                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
452                                       name=kwargs.get("name"),
453                                       on_error=kwargs.get("on_error"),
454                                       submit_runner_image=kwargs.get("submit_runner_image"))
455         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
456             # Create pipeline for local run
457             self.pipeline = self.api.pipeline_instances().create(
458                 body={
459                     "owner_uuid": self.project_uuid,
460                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
461                     "components": {},
462                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
463             logger.info("Pipeline instance %s", self.pipeline["uuid"])
464
465         if runnerjob and not kwargs.get("wait"):
466             runnerjob.run(wait=kwargs.get("wait"))
467             return (runnerjob.uuid, "success")
468
469         self.poll_api = arvados.api('v1')
470         self.polling_thread = threading.Thread(target=self.poll_states)
471         self.polling_thread.start()
472
473         if runnerjob:
474             jobiter = iter((runnerjob,))
475         else:
476             if "cwl_runner_job" in kwargs:
477                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
478             jobiter = tool.job(job_order,
479                                self.output_callback,
480                                **kwargs)
481
482         try:
483             self.cond.acquire()
484             # Will continue to hold the lock for the duration of this code
485             # except when in cond.wait(), at which point on_message can update
486             # job state and process output callbacks.
487
488             loopperf = Perf(metrics, "jobiter")
489             loopperf.__enter__()
490             for runnable in jobiter:
491                 loopperf.__exit__()
492
493                 if self.stop_polling.is_set():
494                     break
495
496                 if runnable:
497                     with Perf(metrics, "run"):
498                         runnable.run(**kwargs)
499                 else:
500                     if self.processes:
501                         self.cond.wait(1)
502                     else:
503                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
504                         break
505                 loopperf.__enter__()
506             loopperf.__exit__()
507
508             while self.processes:
509                 self.cond.wait(1)
510
511         except UnsupportedRequirement:
512             raise
513         except:
514             if sys.exc_info()[0] is KeyboardInterrupt:
515                 logger.error("Interrupted, marking pipeline as failed")
516             else:
517                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
518             if self.pipeline:
519                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
520                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
521             if runnerjob and runnerjob.uuid and self.work_api == "containers":
522                 self.api.container_requests().update(uuid=runnerjob.uuid,
523                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
524         finally:
525             self.cond.release()
526             self.stop_polling.set()
527             self.polling_thread.join()
528
529         if self.final_status == "UnsupportedRequirement":
530             raise UnsupportedRequirement("Check log for details.")
531
532         if self.final_output is None:
533             raise WorkflowException("Workflow did not return a result.")
534
535         if kwargs.get("submit") and isinstance(runnerjob, Runner):
536             logger.info("Final output collection %s", runnerjob.final_output)
537         else:
538             if self.output_name is None:
539                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
540             if self.output_tags is None:
541                 self.output_tags = ""
542             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
543             self.set_crunch_output()
544
545         if kwargs.get("compute_checksum"):
546             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
547             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
548
549         if self.trash_intermediate and self.final_status == "success":
550             self.trash_intermediate_output()
551
552         return (self.final_output, self.final_status)
553
554
555 def versionstring():
556     """Print version string of key packages for provenance and debugging."""
557
558     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
559     arvpkg = pkg_resources.require("arvados-python-client")
560     cwlpkg = pkg_resources.require("cwltool")
561
562     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
563                                     "arvados-python-client", arvpkg[0].version,
564                                     "cwltool", cwlpkg[0].version)
565
566
567 def arg_parser():  # type: () -> argparse.ArgumentParser
568     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
569
570     parser.add_argument("--basedir", type=str,
571                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
572     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
573                         help="Output directory, default current directory")
574
575     parser.add_argument("--eval-timeout",
576                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
577                         type=float,
578                         default=20)
579
580     exgroup = parser.add_mutually_exclusive_group()
581     exgroup.add_argument("--print-dot", action="store_true",
582                          help="Print workflow visualization in graphviz format and exit")
583     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
584     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
585
586     exgroup = parser.add_mutually_exclusive_group()
587     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
588     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
589     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
590
591     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
592
593     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
594
595     exgroup = parser.add_mutually_exclusive_group()
596     exgroup.add_argument("--enable-reuse", action="store_true",
597                         default=True, dest="enable_reuse",
598                         help="Enable job or container reuse (default)")
599     exgroup.add_argument("--disable-reuse", action="store_false",
600                         default=True, dest="enable_reuse",
601                         help="Disable job or container reuse")
602
603     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
604     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
605     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
606     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
607                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
608                         default=False)
609
610     exgroup = parser.add_mutually_exclusive_group()
611     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
612                         default=True, dest="submit")
613     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
614                         default=True, dest="submit")
615     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
616                          dest="create_workflow")
617     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
618     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
619
620     exgroup = parser.add_mutually_exclusive_group()
621     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
622                         default=True, dest="wait")
623     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
624                         default=True, dest="wait")
625
626     exgroup = parser.add_mutually_exclusive_group()
627     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
628                         default=True, dest="log_timestamps")
629     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
630                         default=True, dest="log_timestamps")
631
632     parser.add_argument("--api", type=str,
633                         default=None, dest="work_api",
634                         choices=("jobs", "containers"),
635                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
636
637     parser.add_argument("--compute-checksum", action="store_true", default=False,
638                         help="Compute checksum of contents while collecting outputs",
639                         dest="compute_checksum")
640
641     parser.add_argument("--submit-runner-ram", type=int,
642                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
643                         default=1024)
644
645     parser.add_argument("--submit-runner-image", type=str,
646                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
647                         default=None)
648
649     parser.add_argument("--name", type=str,
650                         help="Name to use for workflow execution instance.",
651                         default=None)
652
653     parser.add_argument("--on-error", type=str,
654                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
655                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
656
657     parser.add_argument("--enable-dev", action="store_true",
658                         help="Enable loading and running development versions "
659                              "of CWL spec.", default=False)
660
661     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
662                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
663                         default=0)
664
665     exgroup = parser.add_mutually_exclusive_group()
666     exgroup.add_argument("--trash-intermediate", action="store_true",
667                         default=False, dest="trash_intermediate",
668                          help="Immediately trash intermediate outputs on workflow success.")
669     exgroup.add_argument("--no-trash-intermediate", action="store_false",
670                         default=False, dest="trash_intermediate",
671                         help="Do not trash intermediate outputs (default).")
672
673     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
674     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
675
676     return parser
677
678 def add_arv_hints():
679     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
680     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
681     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
682     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
683     res.close()
684     cwltool.process.supportedProcessRequirements.extend([
685         "http://arvados.org/cwl#RunInSingleContainer",
686         "http://arvados.org/cwl#OutputDirType",
687         "http://arvados.org/cwl#RuntimeConstraints",
688         "http://arvados.org/cwl#PartitionRequirement",
689         "http://arvados.org/cwl#APIRequirement",
690         "http://commonwl.org/cwltool#LoadListingRequirement",
691         "http://arvados.org/cwl#IntermediateOutput",
692         "http://arvados.org/cwl#ReuseRequirement"
693     ])
694
695 def main(args, stdout, stderr, api_client=None, keep_client=None):
696     parser = arg_parser()
697
698     job_order_object = None
699     arvargs = parser.parse_args(args)
700
701     if arvargs.version:
702         print versionstring()
703         return
704
705     if arvargs.update_workflow:
706         if arvargs.update_workflow.find('-7fd4e-') == 5:
707             want_api = 'containers'
708         elif arvargs.update_workflow.find('-p5p6p-') == 5:
709             want_api = 'jobs'
710         else:
711             want_api = None
712         if want_api and arvargs.work_api and want_api != arvargs.work_api:
713             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
714                 arvargs.update_workflow, want_api, arvargs.work_api))
715             return 1
716         arvargs.work_api = want_api
717
718     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
719         job_order_object = ({}, "")
720
721     add_arv_hints()
722
723     try:
724         if api_client is None:
725             api_client=arvados.api('v1', model=OrderedJsonModel())
726         if keep_client is None:
727             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
728         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
729                               num_retries=4, output_name=arvargs.output_name,
730                               output_tags=arvargs.output_tags)
731     except Exception as e:
732         logger.error(e)
733         return 1
734
735     if arvargs.debug:
736         logger.setLevel(logging.DEBUG)
737         logging.getLogger('arvados').setLevel(logging.DEBUG)
738
739     if arvargs.quiet:
740         logger.setLevel(logging.WARN)
741         logging.getLogger('arvados').setLevel(logging.WARN)
742         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
743
744     if arvargs.metrics:
745         metrics.setLevel(logging.DEBUG)
746         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
747
748     if arvargs.log_timestamps:
749         arvados.log_handler.setFormatter(logging.Formatter(
750             '%(asctime)s %(name)s %(levelname)s: %(message)s',
751             '%Y-%m-%d %H:%M:%S'))
752     else:
753         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
754
755     arvargs.conformance_test = None
756     arvargs.use_container = True
757     arvargs.relax_path_checks = True
758     arvargs.validate = None
759
760     make_fs_access = partial(CollectionFsAccess,
761                            collection_cache=runner.collection_cache)
762
763     return cwltool.main.main(args=arvargs,
764                              stdout=stdout,
765                              stderr=stderr,
766                              executor=runner.arv_executor,
767                              makeTool=runner.arv_make_tool,
768                              versionfunc=versionstring,
769                              job_order_object=job_order_object,
770                              make_fs_access=make_fs_access,
771                              fetcher_constructor=partial(CollectionFetcher,
772                                                          api_client=api_client,
773                                                          fs_access=make_fs_access(""),
774                                                          num_retries=runner.num_retries),
775                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
776                              logger_handler=arvados.log_handler,
777                              custom_schema_callback=add_arv_hints)