8784: Fix test for latest firefox.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77         self.intermediate_output_ttl = 0
78         self.intermediate_output_collections = []
79         self.trash_intermediate = False
80
81         if keep_client is not None:
82             self.keep_client = keep_client
83         else:
84             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
85
86         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
87
88         self.work_api = None
89         expected_api = ["jobs", "containers"]
90         for api in expected_api:
91             try:
92                 methods = self.api._rootDesc.get('resources')[api]['methods']
93                 if ('httpMethod' in methods['create'] and
94                     (work_api == api or work_api is None)):
95                     self.work_api = api
96                     break
97             except KeyError:
98                 pass
99
100         if not self.work_api:
101             if work_api is None:
102                 raise Exception("No supported APIs")
103             else:
104                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
105
106     def arv_make_tool(self, toolpath_object, **kwargs):
107         kwargs["work_api"] = self.work_api
108         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
109                                                 api_client=self.api,
110                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
111                                                 num_retries=self.num_retries,
112                                                 overrides=kwargs.get("override_tools"))
113         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
114             return ArvadosCommandTool(self, toolpath_object, **kwargs)
115         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
116             return ArvadosWorkflow(self, toolpath_object, **kwargs)
117         else:
118             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
119
120     def output_callback(self, out, processStatus):
121         if processStatus == "success":
122             logger.info("Overall process status is %s", processStatus)
123             if self.pipeline:
124                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
125                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
126         else:
127             logger.warn("Overall process status is %s", processStatus)
128             if self.pipeline:
129                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
131         self.final_status = processStatus
132         self.final_output = out
133
134     def on_message(self, event):
135         if "object_uuid" in event:
136             if event["object_uuid"] in self.processes and event["event_type"] == "update":
137                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
138                     uuid = event["object_uuid"]
139                     with self.lock:
140                         j = self.processes[uuid]
141                         logger.info("%s %s is Running", self.label(j), uuid)
142                         j.running = True
143                         j.update_pipeline_component(event["properties"]["new_attributes"])
144                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
145                     uuid = event["object_uuid"]
146                     try:
147                         self.cond.acquire()
148                         j = self.processes[uuid]
149                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
150                         with Perf(metrics, "done %s" % j.name):
151                             j.done(event["properties"]["new_attributes"])
152                         self.cond.notify()
153                     finally:
154                         self.cond.release()
155
156     def label(self, obj):
157         return "[%s %s]" % (self.work_api[0:-1], obj.name)
158
159     def poll_states(self):
160         """Poll status of jobs or containers listed in the processes dict.
161
162         Runs in a separate thread.
163         """
164
165         try:
166             while True:
167                 self.stop_polling.wait(15)
168                 if self.stop_polling.is_set():
169                     break
170                 with self.lock:
171                     keys = self.processes.keys()
172                 if not keys:
173                     continue
174
175                 if self.work_api == "containers":
176                     table = self.poll_api.container_requests()
177                 elif self.work_api == "jobs":
178                     table = self.poll_api.jobs()
179
180                 try:
181                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
182                 except Exception as e:
183                     logger.warn("Error checking states on API server: %s", e)
184                     continue
185
186                 for p in proc_states["items"]:
187                     self.on_message({
188                         "object_uuid": p["uuid"],
189                         "event_type": "update",
190                         "properties": {
191                             "new_attributes": p
192                         }
193                     })
194         except:
195             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
196             self.cond.acquire()
197             self.processes.clear()
198             self.cond.notify()
199             self.cond.release()
200         finally:
201             self.stop_polling.set()
202
203     def get_uploaded(self):
204         return self.uploaded.copy()
205
206     def add_uploaded(self, src, pair):
207         self.uploaded[src] = pair
208
209     def add_intermediate_output(self, uuid):
210         if uuid:
211             self.intermediate_output_collections.append(uuid)
212
213     def trash_intermediate_output(self):
214         logger.info("Cleaning up intermediate output collections")
215         for i in self.intermediate_output_collections:
216             try:
217                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
218             except:
219                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
220             if sys.exc_info()[0] is KeyboardInterrupt:
221                 break
222
223     def check_features(self, obj):
224         if isinstance(obj, dict):
225             if obj.get("writable"):
226                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
227             if obj.get("class") == "DockerRequirement":
228                 if obj.get("dockerOutputDirectory"):
229                     # TODO: can be supported by containers API, but not jobs API.
230                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
231                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
232             for v in obj.itervalues():
233                 self.check_features(v)
234         elif isinstance(obj, list):
235             for i,v in enumerate(obj):
236                 with SourceLine(obj, i, UnsupportedRequirement):
237                     self.check_features(v)
238
239     def make_output_collection(self, name, tagsString, outputObj):
240         outputObj = copy.deepcopy(outputObj)
241
242         files = []
243         def capture(fileobj):
244             files.append(fileobj)
245
246         adjustDirObjs(outputObj, capture)
247         adjustFileObjs(outputObj, capture)
248
249         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
250
251         final = arvados.collection.Collection(api_client=self.api,
252                                               keep_client=self.keep_client,
253                                               num_retries=self.num_retries)
254
255         for k,v in generatemapper.items():
256             if k.startswith("_:"):
257                 if v.type == "Directory":
258                     continue
259                 if v.type == "CreateFile":
260                     with final.open(v.target, "wb") as f:
261                         f.write(v.resolved.encode("utf-8"))
262                     continue
263
264             if not k.startswith("keep:"):
265                 raise Exception("Output source is not in keep or a literal")
266             sp = k.split("/")
267             srccollection = sp[0][5:]
268             try:
269                 reader = self.collection_cache.get(srccollection)
270                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
271                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
272             except arvados.errors.ArgumentError as e:
273                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
274                 raise
275             except IOError as e:
276                 logger.warn("While preparing output collection: %s", e)
277
278         def rewrite(fileobj):
279             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
280             for k in ("basename", "listing", "contents"):
281                 if k in fileobj:
282                     del fileobj[k]
283
284         adjustDirObjs(outputObj, rewrite)
285         adjustFileObjs(outputObj, rewrite)
286
287         with final.open("cwl.output.json", "w") as f:
288             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
289
290         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
291
292         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
293                     final.api_response()["name"],
294                     final.manifest_locator())
295
296         final_uuid = final.manifest_locator()
297         tags = tagsString.split(',')
298         for tag in tags:
299              self.api.links().create(body={
300                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
301                 }).execute(num_retries=self.num_retries)
302
303         def finalcollection(fileobj):
304             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
305
306         adjustDirObjs(outputObj, finalcollection)
307         adjustFileObjs(outputObj, finalcollection)
308
309         return (outputObj, final)
310
311     def set_crunch_output(self):
312         if self.work_api == "containers":
313             try:
314                 current = self.api.containers().current().execute(num_retries=self.num_retries)
315             except ApiError as e:
316                 # Status code 404 just means we're not running in a container.
317                 if e.resp.status != 404:
318                     logger.info("Getting current container: %s", e)
319                 return
320             try:
321                 self.api.containers().update(uuid=current['uuid'],
322                                              body={
323                                                  'output': self.final_output_collection.portable_data_hash(),
324                                              }).execute(num_retries=self.num_retries)
325                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
326                                               body={
327                                                   'is_trashed': True
328                                               }).execute(num_retries=self.num_retries)
329             except Exception as e:
330                 logger.info("Setting container output: %s", e)
331         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
332             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
333                                    body={
334                                        'output': self.final_output_collection.portable_data_hash(),
335                                        'success': self.final_status == "success",
336                                        'progress':1.0
337                                    }).execute(num_retries=self.num_retries)
338
339     def arv_executor(self, tool, job_order, **kwargs):
340         self.debug = kwargs.get("debug")
341
342         tool.visit(self.check_features)
343
344         self.project_uuid = kwargs.get("project_uuid")
345         self.pipeline = None
346         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
347                                                                  collection_cache=self.collection_cache)
348         self.fs_access = make_fs_access(kwargs["basedir"])
349
350
351         self.trash_intermediate = kwargs["trash_intermediate"]
352         if self.trash_intermediate and self.work_api != "containers":
353             raise Exception("--trash-intermediate is only supported with --api=containers.")
354
355         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
356         if self.intermediate_output_ttl and self.work_api != "containers":
357             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
358         if self.intermediate_output_ttl < 0:
359             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
360
361         if not kwargs.get("name"):
362             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
363
364         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
365         # Also uploads docker images.
366         override_tools = {}
367         upload_workflow_deps(self, tool, override_tools)
368
369         # Reload tool object which may have been updated by
370         # upload_workflow_deps
371         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
372                                   makeTool=self.arv_make_tool,
373                                   loader=tool.doc_loader,
374                                   avsc_names=tool.doc_schema,
375                                   metadata=tool.metadata,
376                                   override_tools=override_tools)
377
378         # Upload local file references in the job order.
379         job_order = upload_job_order(self, "%s input" % kwargs["name"],
380                                      tool, job_order)
381
382         existing_uuid = kwargs.get("update_workflow")
383         if existing_uuid or kwargs.get("create_workflow"):
384             # Create a pipeline template or workflow record and exit.
385             if self.work_api == "jobs":
386                 tmpl = RunnerTemplate(self, tool, job_order,
387                                       kwargs.get("enable_reuse"),
388                                       uuid=existing_uuid,
389                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
390                                       name=kwargs["name"])
391                 tmpl.save()
392                 # cwltool.main will write our return value to stdout.
393                 return (tmpl.uuid, "success")
394             elif self.work_api == "containers":
395                 return (upload_workflow(self, tool, job_order,
396                                         self.project_uuid,
397                                         uuid=existing_uuid,
398                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
399                                         name=kwargs["name"]),
400                         "success")
401
402         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
403
404         kwargs["make_fs_access"] = make_fs_access
405         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
406         kwargs["use_container"] = True
407         kwargs["tmpdir_prefix"] = "tmp"
408         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
409
410         if self.work_api == "containers":
411             kwargs["outdir"] = "/var/spool/cwl"
412             kwargs["docker_outdir"] = "/var/spool/cwl"
413             kwargs["tmpdir"] = "/tmp"
414             kwargs["docker_tmpdir"] = "/tmp"
415         elif self.work_api == "jobs":
416             kwargs["outdir"] = "$(task.outdir)"
417             kwargs["docker_outdir"] = "$(task.outdir)"
418             kwargs["tmpdir"] = "$(task.tmpdir)"
419
420         runnerjob = None
421         if kwargs.get("submit"):
422             # Submit a runner job to run the workflow for us.
423             if self.work_api == "containers":
424                 if tool.tool["class"] == "CommandLineTool":
425                     kwargs["runnerjob"] = tool.tool["id"]
426                     upload_dependencies(self,
427                                         kwargs["name"],
428                                         tool.doc_loader,
429                                         tool.tool,
430                                         tool.tool["id"],
431                                         False)
432                     runnerjob = tool.job(job_order,
433                                          self.output_callback,
434                                          **kwargs).next()
435                 else:
436                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
437                                                 self.output_name,
438                                                 self.output_tags,
439                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
440                                                 name=kwargs.get("name"),
441                                                 on_error=kwargs.get("on_error"),
442                                                 submit_runner_image=kwargs.get("submit_runner_image"),
443                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
444             elif self.work_api == "jobs":
445                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
446                                       self.output_name,
447                                       self.output_tags,
448                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
449                                       name=kwargs.get("name"),
450                                       on_error=kwargs.get("on_error"),
451                                       submit_runner_image=kwargs.get("submit_runner_image"))
452         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
453             # Create pipeline for local run
454             self.pipeline = self.api.pipeline_instances().create(
455                 body={
456                     "owner_uuid": self.project_uuid,
457                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
458                     "components": {},
459                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
460             logger.info("Pipeline instance %s", self.pipeline["uuid"])
461
462         if runnerjob and not kwargs.get("wait"):
463             runnerjob.run(wait=kwargs.get("wait"))
464             return (runnerjob.uuid, "success")
465
466         self.poll_api = arvados.api('v1')
467         self.polling_thread = threading.Thread(target=self.poll_states)
468         self.polling_thread.start()
469
470         if runnerjob:
471             jobiter = iter((runnerjob,))
472         else:
473             if "cwl_runner_job" in kwargs:
474                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
475             jobiter = tool.job(job_order,
476                                self.output_callback,
477                                **kwargs)
478
479         try:
480             self.cond.acquire()
481             # Will continue to hold the lock for the duration of this code
482             # except when in cond.wait(), at which point on_message can update
483             # job state and process output callbacks.
484
485             loopperf = Perf(metrics, "jobiter")
486             loopperf.__enter__()
487             for runnable in jobiter:
488                 loopperf.__exit__()
489
490                 if self.stop_polling.is_set():
491                     break
492
493                 if runnable:
494                     with Perf(metrics, "run"):
495                         runnable.run(**kwargs)
496                 else:
497                     if self.processes:
498                         self.cond.wait(1)
499                     else:
500                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
501                         break
502                 loopperf.__enter__()
503             loopperf.__exit__()
504
505             while self.processes:
506                 self.cond.wait(1)
507
508         except UnsupportedRequirement:
509             raise
510         except:
511             if sys.exc_info()[0] is KeyboardInterrupt:
512                 logger.error("Interrupted, marking pipeline as failed")
513             else:
514                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
515             if self.pipeline:
516                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
517                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
518             if runnerjob and runnerjob.uuid and self.work_api == "containers":
519                 self.api.container_requests().update(uuid=runnerjob.uuid,
520                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
521         finally:
522             self.cond.release()
523             self.stop_polling.set()
524             self.polling_thread.join()
525
526         if self.final_status == "UnsupportedRequirement":
527             raise UnsupportedRequirement("Check log for details.")
528
529         if self.final_output is None:
530             raise WorkflowException("Workflow did not return a result.")
531
532         if kwargs.get("submit") and isinstance(runnerjob, Runner):
533             logger.info("Final output collection %s", runnerjob.final_output)
534         else:
535             if self.output_name is None:
536                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
537             if self.output_tags is None:
538                 self.output_tags = ""
539             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
540             self.set_crunch_output()
541
542         if kwargs.get("compute_checksum"):
543             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
544             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
545
546         if self.trash_intermediate and self.final_status == "success":
547             self.trash_intermediate_output()
548
549         return (self.final_output, self.final_status)
550
551
552 def versionstring():
553     """Print version string of key packages for provenance and debugging."""
554
555     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
556     arvpkg = pkg_resources.require("arvados-python-client")
557     cwlpkg = pkg_resources.require("cwltool")
558
559     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
560                                     "arvados-python-client", arvpkg[0].version,
561                                     "cwltool", cwlpkg[0].version)
562
563
564 def arg_parser():  # type: () -> argparse.ArgumentParser
565     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
566
567     parser.add_argument("--basedir", type=str,
568                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
569     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
570                         help="Output directory, default current directory")
571
572     parser.add_argument("--eval-timeout",
573                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
574                         type=float,
575                         default=20)
576
577     exgroup = parser.add_mutually_exclusive_group()
578     exgroup.add_argument("--print-dot", action="store_true",
579                          help="Print workflow visualization in graphviz format and exit")
580     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
581     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
582
583     exgroup = parser.add_mutually_exclusive_group()
584     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
585     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
586     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
587
588     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
589
590     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
591
592     exgroup = parser.add_mutually_exclusive_group()
593     exgroup.add_argument("--enable-reuse", action="store_true",
594                         default=True, dest="enable_reuse",
595                         help="Enable job or container reuse (default)")
596     exgroup.add_argument("--disable-reuse", action="store_false",
597                         default=True, dest="enable_reuse",
598                         help="Disable job or container reuse")
599
600     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
601     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
602     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
603     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
604                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
605                         default=False)
606
607     exgroup = parser.add_mutually_exclusive_group()
608     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
609                         default=True, dest="submit")
610     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
611                         default=True, dest="submit")
612     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
613                          dest="create_workflow")
614     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
615     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
616
617     exgroup = parser.add_mutually_exclusive_group()
618     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
619                         default=True, dest="wait")
620     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
621                         default=True, dest="wait")
622
623     exgroup = parser.add_mutually_exclusive_group()
624     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
625                         default=True, dest="log_timestamps")
626     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
627                         default=True, dest="log_timestamps")
628
629     parser.add_argument("--api", type=str,
630                         default=None, dest="work_api",
631                         choices=("jobs", "containers"),
632                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
633
634     parser.add_argument("--compute-checksum", action="store_true", default=False,
635                         help="Compute checksum of contents while collecting outputs",
636                         dest="compute_checksum")
637
638     parser.add_argument("--submit-runner-ram", type=int,
639                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
640                         default=1024)
641
642     parser.add_argument("--submit-runner-image", type=str,
643                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
644                         default=None)
645
646     parser.add_argument("--name", type=str,
647                         help="Name to use for workflow execution instance.",
648                         default=None)
649
650     parser.add_argument("--on-error", type=str,
651                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
652                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
653
654     parser.add_argument("--enable-dev", action="store_true",
655                         help="Enable loading and running development versions "
656                              "of CWL spec.", default=False)
657
658     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
659                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
660                         default=0)
661
662     exgroup = parser.add_mutually_exclusive_group()
663     exgroup.add_argument("--trash-intermediate", action="store_true",
664                         default=False, dest="trash_intermediate",
665                          help="Immediately trash intermediate outputs on workflow success.")
666     exgroup.add_argument("--no-trash-intermediate", action="store_false",
667                         default=False, dest="trash_intermediate",
668                         help="Do not trash intermediate outputs (default).")
669
670     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
671     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
672
673     return parser
674
675 def add_arv_hints():
676     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
677     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
678     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
679     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
680     res.close()
681     cwltool.process.supportedProcessRequirements.extend([
682         "http://arvados.org/cwl#RunInSingleContainer",
683         "http://arvados.org/cwl#OutputDirType",
684         "http://arvados.org/cwl#RuntimeConstraints",
685         "http://arvados.org/cwl#PartitionRequirement",
686         "http://arvados.org/cwl#APIRequirement",
687         "http://commonwl.org/cwltool#LoadListingRequirement",
688         "http://arvados.org/cwl#IntermediateOutput"
689     ])
690
691 def main(args, stdout, stderr, api_client=None, keep_client=None):
692     parser = arg_parser()
693
694     job_order_object = None
695     arvargs = parser.parse_args(args)
696
697     if arvargs.version:
698         print versionstring()
699         return
700
701     if arvargs.update_workflow:
702         if arvargs.update_workflow.find('-7fd4e-') == 5:
703             want_api = 'containers'
704         elif arvargs.update_workflow.find('-p5p6p-') == 5:
705             want_api = 'jobs'
706         else:
707             want_api = None
708         if want_api and arvargs.work_api and want_api != arvargs.work_api:
709             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
710                 arvargs.update_workflow, want_api, arvargs.work_api))
711             return 1
712         arvargs.work_api = want_api
713
714     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
715         job_order_object = ({}, "")
716
717     add_arv_hints()
718
719     try:
720         if api_client is None:
721             api_client=arvados.api('v1', model=OrderedJsonModel())
722         if keep_client is None:
723             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
724         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
725                               num_retries=4, output_name=arvargs.output_name,
726                               output_tags=arvargs.output_tags)
727     except Exception as e:
728         logger.error(e)
729         return 1
730
731     if arvargs.debug:
732         logger.setLevel(logging.DEBUG)
733         logging.getLogger('arvados').setLevel(logging.DEBUG)
734
735     if arvargs.quiet:
736         logger.setLevel(logging.WARN)
737         logging.getLogger('arvados').setLevel(logging.WARN)
738         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
739
740     if arvargs.metrics:
741         metrics.setLevel(logging.DEBUG)
742         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
743
744     if arvargs.log_timestamps:
745         arvados.log_handler.setFormatter(logging.Formatter(
746             '%(asctime)s %(name)s %(levelname)s: %(message)s',
747             '%Y-%m-%d %H:%M:%S'))
748     else:
749         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
750
751     arvargs.conformance_test = None
752     arvargs.use_container = True
753     arvargs.relax_path_checks = True
754     arvargs.validate = None
755
756     make_fs_access = partial(CollectionFsAccess,
757                            collection_cache=runner.collection_cache)
758
759     return cwltool.main.main(args=arvargs,
760                              stdout=stdout,
761                              stderr=stderr,
762                              executor=runner.arv_executor,
763                              makeTool=runner.arv_make_tool,
764                              versionfunc=versionstring,
765                              job_order_object=job_order_object,
766                              make_fs_access=make_fs_access,
767                              fetcher_constructor=partial(CollectionFetcher,
768                                                          api_client=api_client,
769                                                          fs_access=make_fs_access(""),
770                                                          num_retries=runner.num_retries),
771                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
772                              logger_handler=arvados.log_handler,
773                              custom_schema_callback=add_arv_hints)