11462: Store CollectionReader objects in a central cache to avoid redundant
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77
78         if keep_client is not None:
79             self.keep_client = keep_client
80         else:
81             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
82
83         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
84
85         self.work_api = None
86         expected_api = ["jobs", "containers"]
87         for api in expected_api:
88             try:
89                 methods = self.api._rootDesc.get('resources')[api]['methods']
90                 if ('httpMethod' in methods['create'] and
91                     (work_api == api or work_api is None)):
92                     self.work_api = api
93                     break
94             except KeyError:
95                 pass
96
97         if not self.work_api:
98             if work_api is None:
99                 raise Exception("No supported APIs")
100             else:
101                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
102
103     def arv_make_tool(self, toolpath_object, **kwargs):
104         kwargs["work_api"] = self.work_api
105         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
106                                                 api_client=self.api,
107                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
108                                                 num_retries=self.num_retries)
109         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
110             return ArvadosCommandTool(self, toolpath_object, **kwargs)
111         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
112             return ArvadosWorkflow(self, toolpath_object, **kwargs)
113         else:
114             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
115
116     def output_callback(self, out, processStatus):
117         if processStatus == "success":
118             logger.info("Overall process status is %s", processStatus)
119             if self.pipeline:
120                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
121                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
122         else:
123             logger.warn("Overall process status is %s", processStatus)
124             if self.pipeline:
125                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
126                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
127         self.final_status = processStatus
128         self.final_output = out
129
130     def on_message(self, event):
131         if "object_uuid" in event:
132             if event["object_uuid"] in self.processes and event["event_type"] == "update":
133                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
134                     uuid = event["object_uuid"]
135                     with self.lock:
136                         j = self.processes[uuid]
137                         logger.info("%s %s is Running", self.label(j), uuid)
138                         j.running = True
139                         j.update_pipeline_component(event["properties"]["new_attributes"])
140                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
141                     uuid = event["object_uuid"]
142                     try:
143                         self.cond.acquire()
144                         j = self.processes[uuid]
145                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
146                         with Perf(metrics, "done %s" % j.name):
147                             j.done(event["properties"]["new_attributes"])
148                         self.cond.notify()
149                     finally:
150                         self.cond.release()
151
152     def label(self, obj):
153         return "[%s %s]" % (self.work_api[0:-1], obj.name)
154
155     def poll_states(self):
156         """Poll status of jobs or containers listed in the processes dict.
157
158         Runs in a separate thread.
159         """
160
161         try:
162             while True:
163                 self.stop_polling.wait(15)
164                 if self.stop_polling.is_set():
165                     break
166                 with self.lock:
167                     keys = self.processes.keys()
168                 if not keys:
169                     continue
170
171                 if self.work_api == "containers":
172                     table = self.poll_api.container_requests()
173                 elif self.work_api == "jobs":
174                     table = self.poll_api.jobs()
175
176                 try:
177                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
178                 except Exception as e:
179                     logger.warn("Error checking states on API server: %s", e)
180                     continue
181
182                 for p in proc_states["items"]:
183                     self.on_message({
184                         "object_uuid": p["uuid"],
185                         "event_type": "update",
186                         "properties": {
187                             "new_attributes": p
188                         }
189                     })
190         except:
191             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
192             self.cond.acquire()
193             self.processes.clear()
194             self.cond.notify()
195             self.cond.release()
196         finally:
197             self.stop_polling.set()
198
199     def get_uploaded(self):
200         return self.uploaded.copy()
201
202     def add_uploaded(self, src, pair):
203         self.uploaded[src] = pair
204
205     def check_features(self, obj):
206         if isinstance(obj, dict):
207             if obj.get("writable"):
208                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
209             if obj.get("class") == "DockerRequirement":
210                 if obj.get("dockerOutputDirectory"):
211                     # TODO: can be supported by containers API, but not jobs API.
212                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
213                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
214             for v in obj.itervalues():
215                 self.check_features(v)
216         elif isinstance(obj, list):
217             for i,v in enumerate(obj):
218                 with SourceLine(obj, i, UnsupportedRequirement):
219                     self.check_features(v)
220
221     def make_output_collection(self, name, tagsString, outputObj):
222         outputObj = copy.deepcopy(outputObj)
223
224         files = []
225         def capture(fileobj):
226             files.append(fileobj)
227
228         adjustDirObjs(outputObj, capture)
229         adjustFileObjs(outputObj, capture)
230
231         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
232
233         final = arvados.collection.Collection(api_client=self.api,
234                                               keep_client=self.keep_client,
235                                               num_retries=self.num_retries)
236
237         for k,v in generatemapper.items():
238             if k.startswith("_:"):
239                 if v.type == "Directory":
240                     continue
241                 if v.type == "CreateFile":
242                     with final.open(v.target, "wb") as f:
243                         f.write(v.resolved.encode("utf-8"))
244                     continue
245
246             if not k.startswith("keep:"):
247                 raise Exception("Output source is not in keep or a literal")
248             sp = k.split("/")
249             srccollection = sp[0][5:]
250             reader = self.collection_cache.get(srccollection)
251             try:
252                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
253                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
254             except IOError as e:
255                 logger.warn("While preparing output collection: %s", e)
256
257         def rewrite(fileobj):
258             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
259             for k in ("basename", "listing", "contents"):
260                 if k in fileobj:
261                     del fileobj[k]
262
263         adjustDirObjs(outputObj, rewrite)
264         adjustFileObjs(outputObj, rewrite)
265
266         with final.open("cwl.output.json", "w") as f:
267             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
268
269         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
270
271         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
272                     final.api_response()["name"],
273                     final.manifest_locator())
274
275         final_uuid = final.manifest_locator()
276         tags = tagsString.split(',')
277         for tag in tags:
278              self.api.links().create(body={
279                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
280                 }).execute(num_retries=self.num_retries)
281
282         def finalcollection(fileobj):
283             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
284
285         adjustDirObjs(outputObj, finalcollection)
286         adjustFileObjs(outputObj, finalcollection)
287
288         return (outputObj, final)
289
290     def set_crunch_output(self):
291         if self.work_api == "containers":
292             try:
293                 current = self.api.containers().current().execute(num_retries=self.num_retries)
294             except ApiError as e:
295                 # Status code 404 just means we're not running in a container.
296                 if e.resp.status != 404:
297                     logger.info("Getting current container: %s", e)
298                 return
299             try:
300                 self.api.containers().update(uuid=current['uuid'],
301                                              body={
302                                                  'output': self.final_output_collection.portable_data_hash(),
303                                              }).execute(num_retries=self.num_retries)
304                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
305                                               body={
306                                                   'is_trashed': True
307                                               }).execute(num_retries=self.num_retries)
308             except Exception as e:
309                 logger.info("Setting container output: %s", e)
310         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
311             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
312                                    body={
313                                        'output': self.final_output_collection.portable_data_hash(),
314                                        'success': self.final_status == "success",
315                                        'progress':1.0
316                                    }).execute(num_retries=self.num_retries)
317
318     def arv_executor(self, tool, job_order, **kwargs):
319         self.debug = kwargs.get("debug")
320
321         tool.visit(self.check_features)
322
323         self.project_uuid = kwargs.get("project_uuid")
324         self.pipeline = None
325         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
326                                                                  collection_cache=self.collection_cache)
327         self.fs_access = make_fs_access(kwargs["basedir"])
328
329         if not kwargs.get("name"):
330             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
331
332         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
333         # Also uploads docker images.
334         upload_workflow_deps(self, tool)
335
336         # Reload tool object which may have been updated by
337         # upload_workflow_deps
338         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
339                                   makeTool=self.arv_make_tool,
340                                   loader=tool.doc_loader,
341                                   avsc_names=tool.doc_schema,
342                                   metadata=tool.metadata)
343
344         # Upload local file references in the job order.
345         job_order = upload_job_order(self, "%s input" % kwargs["name"],
346                                      tool, job_order)
347
348         existing_uuid = kwargs.get("update_workflow")
349         if existing_uuid or kwargs.get("create_workflow"):
350             # Create a pipeline template or workflow record and exit.
351             if self.work_api == "jobs":
352                 tmpl = RunnerTemplate(self, tool, job_order,
353                                       kwargs.get("enable_reuse"),
354                                       uuid=existing_uuid,
355                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
356                                       name=kwargs["name"])
357                 tmpl.save()
358                 # cwltool.main will write our return value to stdout.
359                 return (tmpl.uuid, "success")
360             elif self.work_api == "containers":
361                 return (upload_workflow(self, tool, job_order,
362                                         self.project_uuid,
363                                         uuid=existing_uuid,
364                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
365                                         name=kwargs["name"]),
366                         "success")
367
368         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
369
370         kwargs["make_fs_access"] = make_fs_access
371         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
372         kwargs["use_container"] = True
373         kwargs["tmpdir_prefix"] = "tmp"
374         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
375
376         if self.work_api == "containers":
377             kwargs["outdir"] = "/var/spool/cwl"
378             kwargs["docker_outdir"] = "/var/spool/cwl"
379             kwargs["tmpdir"] = "/tmp"
380             kwargs["docker_tmpdir"] = "/tmp"
381         elif self.work_api == "jobs":
382             kwargs["outdir"] = "$(task.outdir)"
383             kwargs["docker_outdir"] = "$(task.outdir)"
384             kwargs["tmpdir"] = "$(task.tmpdir)"
385
386         runnerjob = None
387         if kwargs.get("submit"):
388             # Submit a runner job to run the workflow for us.
389             if self.work_api == "containers":
390                 if tool.tool["class"] == "CommandLineTool":
391                     kwargs["runnerjob"] = tool.tool["id"]
392                     upload_dependencies(self,
393                                         kwargs["name"],
394                                         tool.doc_loader,
395                                         tool.tool,
396                                         tool.tool["id"],
397                                         False)
398                     runnerjob = tool.job(job_order,
399                                          self.output_callback,
400                                          **kwargs).next()
401                 else:
402                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
403                                                 self.output_name,
404                                                 self.output_tags,
405                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
406                                                 name=kwargs.get("name"),
407                                                 on_error=kwargs.get("on_error"),
408                                                 submit_runner_image=kwargs.get("submit_runner_image"))
409             elif self.work_api == "jobs":
410                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
411                                       self.output_name,
412                                       self.output_tags,
413                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
414                                       name=kwargs.get("name"),
415                                       on_error=kwargs.get("on_error"),
416                                       submit_runner_image=kwargs.get("submit_runner_image"))
417
418         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
419             # Create pipeline for local run
420             self.pipeline = self.api.pipeline_instances().create(
421                 body={
422                     "owner_uuid": self.project_uuid,
423                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
424                     "components": {},
425                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
426             logger.info("Pipeline instance %s", self.pipeline["uuid"])
427
428         if runnerjob and not kwargs.get("wait"):
429             runnerjob.run(wait=kwargs.get("wait"))
430             return (runnerjob.uuid, "success")
431
432         self.poll_api = arvados.api('v1')
433         self.polling_thread = threading.Thread(target=self.poll_states)
434         self.polling_thread.start()
435
436         if runnerjob:
437             jobiter = iter((runnerjob,))
438         else:
439             if "cwl_runner_job" in kwargs:
440                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
441             jobiter = tool.job(job_order,
442                                self.output_callback,
443                                **kwargs)
444
445         try:
446             self.cond.acquire()
447             # Will continue to hold the lock for the duration of this code
448             # except when in cond.wait(), at which point on_message can update
449             # job state and process output callbacks.
450
451             loopperf = Perf(metrics, "jobiter")
452             loopperf.__enter__()
453             for runnable in jobiter:
454                 loopperf.__exit__()
455
456                 if self.stop_polling.is_set():
457                     break
458
459                 if runnable:
460                     with Perf(metrics, "run"):
461                         runnable.run(**kwargs)
462                 else:
463                     if self.processes:
464                         self.cond.wait(1)
465                     else:
466                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
467                         break
468                 loopperf.__enter__()
469             loopperf.__exit__()
470
471             while self.processes:
472                 self.cond.wait(1)
473
474         except UnsupportedRequirement:
475             raise
476         except:
477             if sys.exc_info()[0] is KeyboardInterrupt:
478                 logger.error("Interrupted, marking pipeline as failed")
479             else:
480                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
481             if self.pipeline:
482                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
483                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
484             if runnerjob and runnerjob.uuid and self.work_api == "containers":
485                 self.api.container_requests().update(uuid=runnerjob.uuid,
486                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
487         finally:
488             self.cond.release()
489             self.stop_polling.set()
490             self.polling_thread.join()
491
492         if self.final_status == "UnsupportedRequirement":
493             raise UnsupportedRequirement("Check log for details.")
494
495         if self.final_output is None:
496             raise WorkflowException("Workflow did not return a result.")
497
498         if kwargs.get("submit") and isinstance(runnerjob, Runner):
499             logger.info("Final output collection %s", runnerjob.final_output)
500         else:
501             if self.output_name is None:
502                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
503             if self.output_tags is None:
504                 self.output_tags = ""
505             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
506             self.set_crunch_output()
507
508         if kwargs.get("compute_checksum"):
509             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
510             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
511
512         return (self.final_output, self.final_status)
513
514
515 def versionstring():
516     """Print version string of key packages for provenance and debugging."""
517
518     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
519     arvpkg = pkg_resources.require("arvados-python-client")
520     cwlpkg = pkg_resources.require("cwltool")
521
522     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
523                                     "arvados-python-client", arvpkg[0].version,
524                                     "cwltool", cwlpkg[0].version)
525
526
527 def arg_parser():  # type: () -> argparse.ArgumentParser
528     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
529
530     parser.add_argument("--basedir", type=str,
531                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
532     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
533                         help="Output directory, default current directory")
534
535     parser.add_argument("--eval-timeout",
536                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
537                         type=float,
538                         default=20)
539     parser.add_argument("--version", action="store_true", help="Print version and exit")
540
541     exgroup = parser.add_mutually_exclusive_group()
542     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
543     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
544     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
545
546     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
547
548     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
549
550     exgroup = parser.add_mutually_exclusive_group()
551     exgroup.add_argument("--enable-reuse", action="store_true",
552                         default=True, dest="enable_reuse",
553                         help="")
554     exgroup.add_argument("--disable-reuse", action="store_false",
555                         default=True, dest="enable_reuse",
556                         help="")
557
558     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
559     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
560     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
561     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
562                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
563                         default=False)
564
565     exgroup = parser.add_mutually_exclusive_group()
566     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
567                         default=True, dest="submit")
568     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
569                         default=True, dest="submit")
570     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
571                          dest="create_workflow")
572     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
573     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
574
575     exgroup = parser.add_mutually_exclusive_group()
576     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
577                         default=True, dest="wait")
578     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
579                         default=True, dest="wait")
580
581     exgroup = parser.add_mutually_exclusive_group()
582     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
583                         default=True, dest="log_timestamps")
584     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
585                         default=True, dest="log_timestamps")
586
587     parser.add_argument("--api", type=str,
588                         default=None, dest="work_api",
589                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
590
591     parser.add_argument("--compute-checksum", action="store_true", default=False,
592                         help="Compute checksum of contents while collecting outputs",
593                         dest="compute_checksum")
594
595     parser.add_argument("--submit-runner-ram", type=int,
596                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
597                         default=1024)
598
599     parser.add_argument("--submit-runner-image", type=str,
600                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
601                         default=None)
602
603     parser.add_argument("--name", type=str,
604                         help="Name to use for workflow execution instance.",
605                         default=None)
606
607     parser.add_argument("--on-error", type=str,
608                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
609                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
610
611     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
612     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
613
614     return parser
615
616 def add_arv_hints():
617     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
618     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
619     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
620     res.close()
621     cwltool.process.supportedProcessRequirements.extend([
622         "http://arvados.org/cwl#RunInSingleContainer",
623         "http://arvados.org/cwl#OutputDirType",
624         "http://arvados.org/cwl#RuntimeConstraints",
625         "http://arvados.org/cwl#PartitionRequirement",
626         "http://arvados.org/cwl#APIRequirement",
627         "http://commonwl.org/cwltool#LoadListingRequirement"
628     ])
629
630 def main(args, stdout, stderr, api_client=None, keep_client=None):
631     parser = arg_parser()
632
633     job_order_object = None
634     arvargs = parser.parse_args(args)
635
636     if arvargs.version:
637         print versionstring()
638         return
639
640     if arvargs.update_workflow:
641         if arvargs.update_workflow.find('-7fd4e-') == 5:
642             want_api = 'containers'
643         elif arvargs.update_workflow.find('-p5p6p-') == 5:
644             want_api = 'jobs'
645         else:
646             want_api = None
647         if want_api and arvargs.work_api and want_api != arvargs.work_api:
648             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
649                 arvargs.update_workflow, want_api, arvargs.work_api))
650             return 1
651         arvargs.work_api = want_api
652
653     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
654         job_order_object = ({}, "")
655
656     add_arv_hints()
657
658     try:
659         if api_client is None:
660             api_client=arvados.api('v1', model=OrderedJsonModel())
661         if keep_client is None:
662             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
663         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
664                               num_retries=4, output_name=arvargs.output_name,
665                               output_tags=arvargs.output_tags)
666     except Exception as e:
667         logger.error(e)
668         return 1
669
670     if arvargs.debug:
671         logger.setLevel(logging.DEBUG)
672         logging.getLogger('arvados').setLevel(logging.DEBUG)
673
674     if arvargs.quiet:
675         logger.setLevel(logging.WARN)
676         logging.getLogger('arvados').setLevel(logging.WARN)
677         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
678
679     if arvargs.metrics:
680         metrics.setLevel(logging.DEBUG)
681         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
682
683     if arvargs.log_timestamps:
684         arvados.log_handler.setFormatter(logging.Formatter(
685             '%(asctime)s %(name)s %(levelname)s: %(message)s',
686             '%Y-%m-%d %H:%M:%S'))
687     else:
688         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
689
690     arvargs.conformance_test = None
691     arvargs.use_container = True
692     arvargs.relax_path_checks = True
693     arvargs.validate = None
694
695     make_fs_access = partial(CollectionFsAccess,
696                            collection_cache=runner.collection_cache)
697
698     return cwltool.main.main(args=arvargs,
699                              stdout=stdout,
700                              stderr=stderr,
701                              executor=runner.arv_executor,
702                              makeTool=runner.arv_make_tool,
703                              versionfunc=versionstring,
704                              job_order_object=job_order_object,
705                              make_fs_access=make_fs_access,
706                              fetcher_constructor=partial(CollectionFetcher,
707                                                          api_client=api_client,
708                                                          fs_access=make_fs_access(""),
709                                                          num_retries=runner.num_retries),
710                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
711                              logger_handler=arvados.log_handler,
712                              custom_schema_callback=add_arv_hints)