Merge branch '11369-cwl-crunch2-capacity' refs #11369
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77
78         if keep_client is not None:
79             self.keep_client = keep_client
80         else:
81             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
82
83         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
84
85         self.work_api = None
86         expected_api = ["jobs", "containers"]
87         for api in expected_api:
88             try:
89                 methods = self.api._rootDesc.get('resources')[api]['methods']
90                 if ('httpMethod' in methods['create'] and
91                     (work_api == api or work_api is None)):
92                     self.work_api = api
93                     break
94             except KeyError:
95                 pass
96
97         if not self.work_api:
98             if work_api is None:
99                 raise Exception("No supported APIs")
100             else:
101                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
102
103     def arv_make_tool(self, toolpath_object, **kwargs):
104         kwargs["work_api"] = self.work_api
105         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
106                                                 api_client=self.api,
107                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
108                                                 num_retries=self.num_retries)
109         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
110             return ArvadosCommandTool(self, toolpath_object, **kwargs)
111         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
112             return ArvadosWorkflow(self, toolpath_object, **kwargs)
113         else:
114             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
115
116     def output_callback(self, out, processStatus):
117         if processStatus == "success":
118             logger.info("Overall process status is %s", processStatus)
119             if self.pipeline:
120                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
121                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
122         else:
123             logger.warn("Overall process status is %s", processStatus)
124             if self.pipeline:
125                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
126                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
127         self.final_status = processStatus
128         self.final_output = out
129
130     def on_message(self, event):
131         if "object_uuid" in event:
132             if event["object_uuid"] in self.processes and event["event_type"] == "update":
133                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
134                     uuid = event["object_uuid"]
135                     with self.lock:
136                         j = self.processes[uuid]
137                         logger.info("%s %s is Running", self.label(j), uuid)
138                         j.running = True
139                         j.update_pipeline_component(event["properties"]["new_attributes"])
140                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
141                     uuid = event["object_uuid"]
142                     try:
143                         self.cond.acquire()
144                         j = self.processes[uuid]
145                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
146                         with Perf(metrics, "done %s" % j.name):
147                             j.done(event["properties"]["new_attributes"])
148                         self.cond.notify()
149                     finally:
150                         self.cond.release()
151
152     def label(self, obj):
153         return "[%s %s]" % (self.work_api[0:-1], obj.name)
154
155     def poll_states(self):
156         """Poll status of jobs or containers listed in the processes dict.
157
158         Runs in a separate thread.
159         """
160
161         try:
162             while True:
163                 self.stop_polling.wait(15)
164                 if self.stop_polling.is_set():
165                     break
166                 with self.lock:
167                     keys = self.processes.keys()
168                 if not keys:
169                     continue
170
171                 if self.work_api == "containers":
172                     table = self.poll_api.container_requests()
173                 elif self.work_api == "jobs":
174                     table = self.poll_api.jobs()
175
176                 try:
177                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
178                 except Exception as e:
179                     logger.warn("Error checking states on API server: %s", e)
180                     continue
181
182                 for p in proc_states["items"]:
183                     self.on_message({
184                         "object_uuid": p["uuid"],
185                         "event_type": "update",
186                         "properties": {
187                             "new_attributes": p
188                         }
189                     })
190         except:
191             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
192             self.cond.acquire()
193             self.processes.clear()
194             self.cond.notify()
195             self.cond.release()
196         finally:
197             self.stop_polling.set()
198
199     def get_uploaded(self):
200         return self.uploaded.copy()
201
202     def add_uploaded(self, src, pair):
203         self.uploaded[src] = pair
204
205     def check_features(self, obj):
206         if isinstance(obj, dict):
207             if obj.get("writable"):
208                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
209             if obj.get("class") == "DockerRequirement":
210                 if obj.get("dockerOutputDirectory"):
211                     # TODO: can be supported by containers API, but not jobs API.
212                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
213                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
214             for v in obj.itervalues():
215                 self.check_features(v)
216         elif isinstance(obj, list):
217             for i,v in enumerate(obj):
218                 with SourceLine(obj, i, UnsupportedRequirement):
219                     self.check_features(v)
220
221     def make_output_collection(self, name, tagsString, outputObj):
222         outputObj = copy.deepcopy(outputObj)
223
224         files = []
225         def capture(fileobj):
226             files.append(fileobj)
227
228         adjustDirObjs(outputObj, capture)
229         adjustFileObjs(outputObj, capture)
230
231         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
232
233         final = arvados.collection.Collection(api_client=self.api,
234                                               keep_client=self.keep_client,
235                                               num_retries=self.num_retries)
236
237         for k,v in generatemapper.items():
238             if k.startswith("_:"):
239                 if v.type == "Directory":
240                     continue
241                 if v.type == "CreateFile":
242                     with final.open(v.target, "wb") as f:
243                         f.write(v.resolved.encode("utf-8"))
244                     continue
245
246             if not k.startswith("keep:"):
247                 raise Exception("Output source is not in keep or a literal")
248             sp = k.split("/")
249             srccollection = sp[0][5:]
250             try:
251                 reader = self.collection_cache.get(srccollection)
252                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
253                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
254             except arvados.errors.ArgumentError as e:
255                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
256                 raise
257             except IOError as e:
258                 logger.warn("While preparing output collection: %s", e)
259
260         def rewrite(fileobj):
261             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
262             for k in ("basename", "listing", "contents"):
263                 if k in fileobj:
264                     del fileobj[k]
265
266         adjustDirObjs(outputObj, rewrite)
267         adjustFileObjs(outputObj, rewrite)
268
269         with final.open("cwl.output.json", "w") as f:
270             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
271
272         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
273
274         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
275                     final.api_response()["name"],
276                     final.manifest_locator())
277
278         final_uuid = final.manifest_locator()
279         tags = tagsString.split(',')
280         for tag in tags:
281              self.api.links().create(body={
282                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
283                 }).execute(num_retries=self.num_retries)
284
285         def finalcollection(fileobj):
286             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
287
288         adjustDirObjs(outputObj, finalcollection)
289         adjustFileObjs(outputObj, finalcollection)
290
291         return (outputObj, final)
292
293     def set_crunch_output(self):
294         if self.work_api == "containers":
295             try:
296                 current = self.api.containers().current().execute(num_retries=self.num_retries)
297             except ApiError as e:
298                 # Status code 404 just means we're not running in a container.
299                 if e.resp.status != 404:
300                     logger.info("Getting current container: %s", e)
301                 return
302             try:
303                 self.api.containers().update(uuid=current['uuid'],
304                                              body={
305                                                  'output': self.final_output_collection.portable_data_hash(),
306                                              }).execute(num_retries=self.num_retries)
307                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
308                                               body={
309                                                   'is_trashed': True
310                                               }).execute(num_retries=self.num_retries)
311             except Exception as e:
312                 logger.info("Setting container output: %s", e)
313         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
314             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
315                                    body={
316                                        'output': self.final_output_collection.portable_data_hash(),
317                                        'success': self.final_status == "success",
318                                        'progress':1.0
319                                    }).execute(num_retries=self.num_retries)
320
321     def arv_executor(self, tool, job_order, **kwargs):
322         self.debug = kwargs.get("debug")
323
324         tool.visit(self.check_features)
325
326         self.project_uuid = kwargs.get("project_uuid")
327         self.pipeline = None
328         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
329                                                                  collection_cache=self.collection_cache)
330         self.fs_access = make_fs_access(kwargs["basedir"])
331
332         if not kwargs.get("name"):
333             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
334
335         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
336         # Also uploads docker images.
337         upload_workflow_deps(self, tool)
338
339         # Reload tool object which may have been updated by
340         # upload_workflow_deps
341         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
342                                   makeTool=self.arv_make_tool,
343                                   loader=tool.doc_loader,
344                                   avsc_names=tool.doc_schema,
345                                   metadata=tool.metadata)
346
347         # Upload local file references in the job order.
348         job_order = upload_job_order(self, "%s input" % kwargs["name"],
349                                      tool, job_order)
350
351         existing_uuid = kwargs.get("update_workflow")
352         if existing_uuid or kwargs.get("create_workflow"):
353             # Create a pipeline template or workflow record and exit.
354             if self.work_api == "jobs":
355                 tmpl = RunnerTemplate(self, tool, job_order,
356                                       kwargs.get("enable_reuse"),
357                                       uuid=existing_uuid,
358                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
359                                       name=kwargs["name"])
360                 tmpl.save()
361                 # cwltool.main will write our return value to stdout.
362                 return (tmpl.uuid, "success")
363             elif self.work_api == "containers":
364                 return (upload_workflow(self, tool, job_order,
365                                         self.project_uuid,
366                                         uuid=existing_uuid,
367                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
368                                         name=kwargs["name"]),
369                         "success")
370
371         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
372
373         kwargs["make_fs_access"] = make_fs_access
374         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
375         kwargs["use_container"] = True
376         kwargs["tmpdir_prefix"] = "tmp"
377         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
378
379         if self.work_api == "containers":
380             kwargs["outdir"] = "/var/spool/cwl"
381             kwargs["docker_outdir"] = "/var/spool/cwl"
382             kwargs["tmpdir"] = "/tmp"
383             kwargs["docker_tmpdir"] = "/tmp"
384         elif self.work_api == "jobs":
385             kwargs["outdir"] = "$(task.outdir)"
386             kwargs["docker_outdir"] = "$(task.outdir)"
387             kwargs["tmpdir"] = "$(task.tmpdir)"
388
389         runnerjob = None
390         if kwargs.get("submit"):
391             # Submit a runner job to run the workflow for us.
392             if self.work_api == "containers":
393                 if tool.tool["class"] == "CommandLineTool":
394                     kwargs["runnerjob"] = tool.tool["id"]
395                     upload_dependencies(self,
396                                         kwargs["name"],
397                                         tool.doc_loader,
398                                         tool.tool,
399                                         tool.tool["id"],
400                                         False)
401                     runnerjob = tool.job(job_order,
402                                          self.output_callback,
403                                          **kwargs).next()
404                 else:
405                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
406                                                 self.output_name,
407                                                 self.output_tags,
408                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
409                                                 name=kwargs.get("name"),
410                                                 on_error=kwargs.get("on_error"),
411                                                 submit_runner_image=kwargs.get("submit_runner_image"))
412             elif self.work_api == "jobs":
413                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
414                                       self.output_name,
415                                       self.output_tags,
416                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
417                                       name=kwargs.get("name"),
418                                       on_error=kwargs.get("on_error"),
419                                       submit_runner_image=kwargs.get("submit_runner_image"))
420
421         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
422             # Create pipeline for local run
423             self.pipeline = self.api.pipeline_instances().create(
424                 body={
425                     "owner_uuid": self.project_uuid,
426                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
427                     "components": {},
428                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
429             logger.info("Pipeline instance %s", self.pipeline["uuid"])
430
431         if runnerjob and not kwargs.get("wait"):
432             runnerjob.run(wait=kwargs.get("wait"))
433             return (runnerjob.uuid, "success")
434
435         self.poll_api = arvados.api('v1')
436         self.polling_thread = threading.Thread(target=self.poll_states)
437         self.polling_thread.start()
438
439         if runnerjob:
440             jobiter = iter((runnerjob,))
441         else:
442             if "cwl_runner_job" in kwargs:
443                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
444             jobiter = tool.job(job_order,
445                                self.output_callback,
446                                **kwargs)
447
448         try:
449             self.cond.acquire()
450             # Will continue to hold the lock for the duration of this code
451             # except when in cond.wait(), at which point on_message can update
452             # job state and process output callbacks.
453
454             loopperf = Perf(metrics, "jobiter")
455             loopperf.__enter__()
456             for runnable in jobiter:
457                 loopperf.__exit__()
458
459                 if self.stop_polling.is_set():
460                     break
461
462                 if runnable:
463                     with Perf(metrics, "run"):
464                         runnable.run(**kwargs)
465                 else:
466                     if self.processes:
467                         self.cond.wait(1)
468                     else:
469                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
470                         break
471                 loopperf.__enter__()
472             loopperf.__exit__()
473
474             while self.processes:
475                 self.cond.wait(1)
476
477         except UnsupportedRequirement:
478             raise
479         except:
480             if sys.exc_info()[0] is KeyboardInterrupt:
481                 logger.error("Interrupted, marking pipeline as failed")
482             else:
483                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
484             if self.pipeline:
485                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
486                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
487             if runnerjob and runnerjob.uuid and self.work_api == "containers":
488                 self.api.container_requests().update(uuid=runnerjob.uuid,
489                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
490         finally:
491             self.cond.release()
492             self.stop_polling.set()
493             self.polling_thread.join()
494
495         if self.final_status == "UnsupportedRequirement":
496             raise UnsupportedRequirement("Check log for details.")
497
498         if self.final_output is None:
499             raise WorkflowException("Workflow did not return a result.")
500
501         if kwargs.get("submit") and isinstance(runnerjob, Runner):
502             logger.info("Final output collection %s", runnerjob.final_output)
503         else:
504             if self.output_name is None:
505                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
506             if self.output_tags is None:
507                 self.output_tags = ""
508             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
509             self.set_crunch_output()
510
511         if kwargs.get("compute_checksum"):
512             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
513             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
514
515         return (self.final_output, self.final_status)
516
517
518 def versionstring():
519     """Print version string of key packages for provenance and debugging."""
520
521     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
522     arvpkg = pkg_resources.require("arvados-python-client")
523     cwlpkg = pkg_resources.require("cwltool")
524
525     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
526                                     "arvados-python-client", arvpkg[0].version,
527                                     "cwltool", cwlpkg[0].version)
528
529
530 def arg_parser():  # type: () -> argparse.ArgumentParser
531     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
532
533     parser.add_argument("--basedir", type=str,
534                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
535     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
536                         help="Output directory, default current directory")
537
538     parser.add_argument("--eval-timeout",
539                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
540                         type=float,
541                         default=20)
542     parser.add_argument("--version", action="store_true", help="Print version and exit")
543
544     exgroup = parser.add_mutually_exclusive_group()
545     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
546     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
547     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
548
549     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
550
551     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
552
553     exgroup = parser.add_mutually_exclusive_group()
554     exgroup.add_argument("--enable-reuse", action="store_true",
555                         default=True, dest="enable_reuse",
556                         help="")
557     exgroup.add_argument("--disable-reuse", action="store_false",
558                         default=True, dest="enable_reuse",
559                         help="")
560
561     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
562     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
563     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
564     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
565                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
566                         default=False)
567
568     exgroup = parser.add_mutually_exclusive_group()
569     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
570                         default=True, dest="submit")
571     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
572                         default=True, dest="submit")
573     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
574                          dest="create_workflow")
575     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
576     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
577
578     exgroup = parser.add_mutually_exclusive_group()
579     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
580                         default=True, dest="wait")
581     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
582                         default=True, dest="wait")
583
584     exgroup = parser.add_mutually_exclusive_group()
585     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
586                         default=True, dest="log_timestamps")
587     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
588                         default=True, dest="log_timestamps")
589
590     parser.add_argument("--api", type=str,
591                         default=None, dest="work_api",
592                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
593
594     parser.add_argument("--compute-checksum", action="store_true", default=False,
595                         help="Compute checksum of contents while collecting outputs",
596                         dest="compute_checksum")
597
598     parser.add_argument("--submit-runner-ram", type=int,
599                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
600                         default=1024)
601
602     parser.add_argument("--submit-runner-image", type=str,
603                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
604                         default=None)
605
606     parser.add_argument("--name", type=str,
607                         help="Name to use for workflow execution instance.",
608                         default=None)
609
610     parser.add_argument("--on-error", type=str,
611                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
612                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
613
614     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
615     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
616
617     return parser
618
619 def add_arv_hints():
620     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
621     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
622     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
623     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
624     res.close()
625     cwltool.process.supportedProcessRequirements.extend([
626         "http://arvados.org/cwl#RunInSingleContainer",
627         "http://arvados.org/cwl#OutputDirType",
628         "http://arvados.org/cwl#RuntimeConstraints",
629         "http://arvados.org/cwl#PartitionRequirement",
630         "http://arvados.org/cwl#APIRequirement",
631         "http://commonwl.org/cwltool#LoadListingRequirement"
632     ])
633
634 def main(args, stdout, stderr, api_client=None, keep_client=None):
635     parser = arg_parser()
636
637     job_order_object = None
638     arvargs = parser.parse_args(args)
639
640     if arvargs.version:
641         print versionstring()
642         return
643
644     if arvargs.update_workflow:
645         if arvargs.update_workflow.find('-7fd4e-') == 5:
646             want_api = 'containers'
647         elif arvargs.update_workflow.find('-p5p6p-') == 5:
648             want_api = 'jobs'
649         else:
650             want_api = None
651         if want_api and arvargs.work_api and want_api != arvargs.work_api:
652             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
653                 arvargs.update_workflow, want_api, arvargs.work_api))
654             return 1
655         arvargs.work_api = want_api
656
657     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
658         job_order_object = ({}, "")
659
660     add_arv_hints()
661
662     try:
663         if api_client is None:
664             api_client=arvados.api('v1', model=OrderedJsonModel())
665         if keep_client is None:
666             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
667         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
668                               num_retries=4, output_name=arvargs.output_name,
669                               output_tags=arvargs.output_tags)
670     except Exception as e:
671         logger.error(e)
672         return 1
673
674     if arvargs.debug:
675         logger.setLevel(logging.DEBUG)
676         logging.getLogger('arvados').setLevel(logging.DEBUG)
677
678     if arvargs.quiet:
679         logger.setLevel(logging.WARN)
680         logging.getLogger('arvados').setLevel(logging.WARN)
681         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
682
683     if arvargs.metrics:
684         metrics.setLevel(logging.DEBUG)
685         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
686
687     if arvargs.log_timestamps:
688         arvados.log_handler.setFormatter(logging.Formatter(
689             '%(asctime)s %(name)s %(levelname)s: %(message)s',
690             '%Y-%m-%d %H:%M:%S'))
691     else:
692         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
693
694     arvargs.conformance_test = None
695     arvargs.use_container = True
696     arvargs.relax_path_checks = True
697     arvargs.validate = None
698
699     make_fs_access = partial(CollectionFsAccess,
700                            collection_cache=runner.collection_cache)
701
702     return cwltool.main.main(args=arvargs,
703                              stdout=stdout,
704                              stderr=stderr,
705                              executor=runner.arv_executor,
706                              makeTool=runner.arv_make_tool,
707                              versionfunc=versionstring,
708                              job_order_object=job_order_object,
709                              make_fs_access=make_fs_access,
710                              fetcher_constructor=partial(CollectionFetcher,
711                                                          api_client=api_client,
712                                                          fs_access=make_fs_access(""),
713                                                          num_retries=runner.num_retries),
714                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
715                              logger_handler=arvados.log_handler,
716                              custom_schema_callback=add_arv_hints)