11543: Upload tool dependencies into single collection. Add test for collection...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77
78         if keep_client is not None:
79             self.keep_client = keep_client
80         else:
81             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
82
83         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
84
85         self.work_api = None
86         expected_api = ["jobs", "containers"]
87         for api in expected_api:
88             try:
89                 methods = self.api._rootDesc.get('resources')[api]['methods']
90                 if ('httpMethod' in methods['create'] and
91                     (work_api == api or work_api is None)):
92                     self.work_api = api
93                     break
94             except KeyError:
95                 pass
96
97         if not self.work_api:
98             if work_api is None:
99                 raise Exception("No supported APIs")
100             else:
101                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
102
103     def arv_make_tool(self, toolpath_object, **kwargs):
104         kwargs["work_api"] = self.work_api
105         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
106                                                 api_client=self.api,
107                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
108                                                 num_retries=self.num_retries,
109                                                 overrides=kwargs.get("override_tools"))
110         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
111             return ArvadosCommandTool(self, toolpath_object, **kwargs)
112         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
113             return ArvadosWorkflow(self, toolpath_object, **kwargs)
114         else:
115             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
116
117     def output_callback(self, out, processStatus):
118         if processStatus == "success":
119             logger.info("Overall process status is %s", processStatus)
120             if self.pipeline:
121                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
123         else:
124             logger.warn("Overall process status is %s", processStatus)
125             if self.pipeline:
126                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
127                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
128         self.final_status = processStatus
129         self.final_output = out
130
131     def on_message(self, event):
132         if "object_uuid" in event:
133             if event["object_uuid"] in self.processes and event["event_type"] == "update":
134                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
135                     uuid = event["object_uuid"]
136                     with self.lock:
137                         j = self.processes[uuid]
138                         logger.info("%s %s is Running", self.label(j), uuid)
139                         j.running = True
140                         j.update_pipeline_component(event["properties"]["new_attributes"])
141                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
142                     uuid = event["object_uuid"]
143                     try:
144                         self.cond.acquire()
145                         j = self.processes[uuid]
146                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
147                         with Perf(metrics, "done %s" % j.name):
148                             j.done(event["properties"]["new_attributes"])
149                         self.cond.notify()
150                     finally:
151                         self.cond.release()
152
153     def label(self, obj):
154         return "[%s %s]" % (self.work_api[0:-1], obj.name)
155
156     def poll_states(self):
157         """Poll status of jobs or containers listed in the processes dict.
158
159         Runs in a separate thread.
160         """
161
162         try:
163             while True:
164                 self.stop_polling.wait(15)
165                 if self.stop_polling.is_set():
166                     break
167                 with self.lock:
168                     keys = self.processes.keys()
169                 if not keys:
170                     continue
171
172                 if self.work_api == "containers":
173                     table = self.poll_api.container_requests()
174                 elif self.work_api == "jobs":
175                     table = self.poll_api.jobs()
176
177                 try:
178                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
179                 except Exception as e:
180                     logger.warn("Error checking states on API server: %s", e)
181                     continue
182
183                 for p in proc_states["items"]:
184                     self.on_message({
185                         "object_uuid": p["uuid"],
186                         "event_type": "update",
187                         "properties": {
188                             "new_attributes": p
189                         }
190                     })
191         except:
192             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
193             self.cond.acquire()
194             self.processes.clear()
195             self.cond.notify()
196             self.cond.release()
197         finally:
198             self.stop_polling.set()
199
200     def get_uploaded(self):
201         return self.uploaded.copy()
202
203     def add_uploaded(self, src, pair):
204         self.uploaded[src] = pair
205
206     def check_features(self, obj):
207         if isinstance(obj, dict):
208             if obj.get("writable"):
209                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
210             if obj.get("class") == "DockerRequirement":
211                 if obj.get("dockerOutputDirectory"):
212                     # TODO: can be supported by containers API, but not jobs API.
213                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
214                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
215             for v in obj.itervalues():
216                 self.check_features(v)
217         elif isinstance(obj, list):
218             for i,v in enumerate(obj):
219                 with SourceLine(obj, i, UnsupportedRequirement):
220                     self.check_features(v)
221
222     def make_output_collection(self, name, tagsString, outputObj):
223         outputObj = copy.deepcopy(outputObj)
224
225         files = []
226         def capture(fileobj):
227             files.append(fileobj)
228
229         adjustDirObjs(outputObj, capture)
230         adjustFileObjs(outputObj, capture)
231
232         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
233
234         final = arvados.collection.Collection(api_client=self.api,
235                                               keep_client=self.keep_client,
236                                               num_retries=self.num_retries)
237
238         for k,v in generatemapper.items():
239             if k.startswith("_:"):
240                 if v.type == "Directory":
241                     continue
242                 if v.type == "CreateFile":
243                     with final.open(v.target, "wb") as f:
244                         f.write(v.resolved.encode("utf-8"))
245                     continue
246
247             if not k.startswith("keep:"):
248                 raise Exception("Output source is not in keep or a literal")
249             sp = k.split("/")
250             srccollection = sp[0][5:]
251             try:
252                 reader = self.collection_cache.get(srccollection)
253                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
254                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
255             except arvados.errors.ArgumentError as e:
256                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
257                 raise
258             except IOError as e:
259                 logger.warn("While preparing output collection: %s", e)
260
261         def rewrite(fileobj):
262             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
263             for k in ("basename", "listing", "contents"):
264                 if k in fileobj:
265                     del fileobj[k]
266
267         adjustDirObjs(outputObj, rewrite)
268         adjustFileObjs(outputObj, rewrite)
269
270         with final.open("cwl.output.json", "w") as f:
271             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
272
273         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
274
275         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
276                     final.api_response()["name"],
277                     final.manifest_locator())
278
279         final_uuid = final.manifest_locator()
280         tags = tagsString.split(',')
281         for tag in tags:
282              self.api.links().create(body={
283                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
284                 }).execute(num_retries=self.num_retries)
285
286         def finalcollection(fileobj):
287             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
288
289         adjustDirObjs(outputObj, finalcollection)
290         adjustFileObjs(outputObj, finalcollection)
291
292         return (outputObj, final)
293
294     def set_crunch_output(self):
295         if self.work_api == "containers":
296             try:
297                 current = self.api.containers().current().execute(num_retries=self.num_retries)
298             except ApiError as e:
299                 # Status code 404 just means we're not running in a container.
300                 if e.resp.status != 404:
301                     logger.info("Getting current container: %s", e)
302                 return
303             try:
304                 self.api.containers().update(uuid=current['uuid'],
305                                              body={
306                                                  'output': self.final_output_collection.portable_data_hash(),
307                                              }).execute(num_retries=self.num_retries)
308                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
309                                               body={
310                                                   'is_trashed': True
311                                               }).execute(num_retries=self.num_retries)
312             except Exception as e:
313                 logger.info("Setting container output: %s", e)
314         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
315             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
316                                    body={
317                                        'output': self.final_output_collection.portable_data_hash(),
318                                        'success': self.final_status == "success",
319                                        'progress':1.0
320                                    }).execute(num_retries=self.num_retries)
321
322     def arv_executor(self, tool, job_order, **kwargs):
323         self.debug = kwargs.get("debug")
324
325         tool.visit(self.check_features)
326
327         self.project_uuid = kwargs.get("project_uuid")
328         self.pipeline = None
329         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
330                                                                  collection_cache=self.collection_cache)
331         self.fs_access = make_fs_access(kwargs["basedir"])
332
333         if not kwargs.get("name"):
334             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
335
336         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
337         # Also uploads docker images.
338         override_tools = {}
339         upload_workflow_deps(self, tool, override_tools)
340
341         # Reload tool object which may have been updated by
342         # upload_workflow_deps
343         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
344                                   makeTool=self.arv_make_tool,
345                                   loader=tool.doc_loader,
346                                   avsc_names=tool.doc_schema,
347                                   metadata=tool.metadata,
348                                   override_tools=override_tools)
349
350         # Upload local file references in the job order.
351         job_order = upload_job_order(self, "%s input" % kwargs["name"],
352                                      tool, job_order)
353
354         existing_uuid = kwargs.get("update_workflow")
355         if existing_uuid or kwargs.get("create_workflow"):
356             # Create a pipeline template or workflow record and exit.
357             if self.work_api == "jobs":
358                 tmpl = RunnerTemplate(self, tool, job_order,
359                                       kwargs.get("enable_reuse"),
360                                       uuid=existing_uuid,
361                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
362                                       name=kwargs["name"])
363                 tmpl.save()
364                 # cwltool.main will write our return value to stdout.
365                 return (tmpl.uuid, "success")
366             elif self.work_api == "containers":
367                 return (upload_workflow(self, tool, job_order,
368                                         self.project_uuid,
369                                         uuid=existing_uuid,
370                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
371                                         name=kwargs["name"]),
372                         "success")
373
374         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
375
376         kwargs["make_fs_access"] = make_fs_access
377         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
378         kwargs["use_container"] = True
379         kwargs["tmpdir_prefix"] = "tmp"
380         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
381
382         if self.work_api == "containers":
383             kwargs["outdir"] = "/var/spool/cwl"
384             kwargs["docker_outdir"] = "/var/spool/cwl"
385             kwargs["tmpdir"] = "/tmp"
386             kwargs["docker_tmpdir"] = "/tmp"
387         elif self.work_api == "jobs":
388             kwargs["outdir"] = "$(task.outdir)"
389             kwargs["docker_outdir"] = "$(task.outdir)"
390             kwargs["tmpdir"] = "$(task.tmpdir)"
391
392         runnerjob = None
393         if kwargs.get("submit"):
394             # Submit a runner job to run the workflow for us.
395             if self.work_api == "containers":
396                 if tool.tool["class"] == "CommandLineTool":
397                     kwargs["runnerjob"] = tool.tool["id"]
398                     upload_dependencies(self,
399                                         kwargs["name"],
400                                         tool.doc_loader,
401                                         tool.tool,
402                                         tool.tool["id"],
403                                         False)
404                     runnerjob = tool.job(job_order,
405                                          self.output_callback,
406                                          **kwargs).next()
407                 else:
408                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
409                                                 self.output_name,
410                                                 self.output_tags,
411                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
412                                                 name=kwargs.get("name"),
413                                                 on_error=kwargs.get("on_error"),
414                                                 submit_runner_image=kwargs.get("submit_runner_image"))
415             elif self.work_api == "jobs":
416                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
417                                       self.output_name,
418                                       self.output_tags,
419                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
420                                       name=kwargs.get("name"),
421                                       on_error=kwargs.get("on_error"),
422                                       submit_runner_image=kwargs.get("submit_runner_image"))
423
424         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
425             # Create pipeline for local run
426             self.pipeline = self.api.pipeline_instances().create(
427                 body={
428                     "owner_uuid": self.project_uuid,
429                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
430                     "components": {},
431                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
432             logger.info("Pipeline instance %s", self.pipeline["uuid"])
433
434         if runnerjob and not kwargs.get("wait"):
435             runnerjob.run(wait=kwargs.get("wait"))
436             return (runnerjob.uuid, "success")
437
438         self.poll_api = arvados.api('v1')
439         self.polling_thread = threading.Thread(target=self.poll_states)
440         self.polling_thread.start()
441
442         if runnerjob:
443             jobiter = iter((runnerjob,))
444         else:
445             if "cwl_runner_job" in kwargs:
446                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
447             jobiter = tool.job(job_order,
448                                self.output_callback,
449                                **kwargs)
450
451         try:
452             self.cond.acquire()
453             # Will continue to hold the lock for the duration of this code
454             # except when in cond.wait(), at which point on_message can update
455             # job state and process output callbacks.
456
457             loopperf = Perf(metrics, "jobiter")
458             loopperf.__enter__()
459             for runnable in jobiter:
460                 loopperf.__exit__()
461
462                 if self.stop_polling.is_set():
463                     break
464
465                 if runnable:
466                     with Perf(metrics, "run"):
467                         runnable.run(**kwargs)
468                 else:
469                     if self.processes:
470                         self.cond.wait(1)
471                     else:
472                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
473                         break
474                 loopperf.__enter__()
475             loopperf.__exit__()
476
477             while self.processes:
478                 self.cond.wait(1)
479
480         except UnsupportedRequirement:
481             raise
482         except:
483             if sys.exc_info()[0] is KeyboardInterrupt:
484                 logger.error("Interrupted, marking pipeline as failed")
485             else:
486                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
487             if self.pipeline:
488                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
489                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
490             if runnerjob and runnerjob.uuid and self.work_api == "containers":
491                 self.api.container_requests().update(uuid=runnerjob.uuid,
492                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
493         finally:
494             self.cond.release()
495             self.stop_polling.set()
496             self.polling_thread.join()
497
498         if self.final_status == "UnsupportedRequirement":
499             raise UnsupportedRequirement("Check log for details.")
500
501         if self.final_output is None:
502             raise WorkflowException("Workflow did not return a result.")
503
504         if kwargs.get("submit") and isinstance(runnerjob, Runner):
505             logger.info("Final output collection %s", runnerjob.final_output)
506         else:
507             if self.output_name is None:
508                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
509             if self.output_tags is None:
510                 self.output_tags = ""
511             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
512             self.set_crunch_output()
513
514         if kwargs.get("compute_checksum"):
515             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
516             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
517
518         return (self.final_output, self.final_status)
519
520
521 def versionstring():
522     """Print version string of key packages for provenance and debugging."""
523
524     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
525     arvpkg = pkg_resources.require("arvados-python-client")
526     cwlpkg = pkg_resources.require("cwltool")
527
528     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
529                                     "arvados-python-client", arvpkg[0].version,
530                                     "cwltool", cwlpkg[0].version)
531
532
533 def arg_parser():  # type: () -> argparse.ArgumentParser
534     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
535
536     parser.add_argument("--basedir", type=str,
537                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
538     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
539                         help="Output directory, default current directory")
540
541     parser.add_argument("--eval-timeout",
542                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
543                         type=float,
544                         default=20)
545
546     exgroup = parser.add_mutually_exclusive_group()
547     exgroup.add_argument("--print-dot", action="store_true",
548                          help="Print workflow visualization in graphviz format and exit")
549     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
550     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
551
552     exgroup = parser.add_mutually_exclusive_group()
553     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
554     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
555     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
556
557     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
558
559     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
560
561     exgroup = parser.add_mutually_exclusive_group()
562     exgroup.add_argument("--enable-reuse", action="store_true",
563                         default=True, dest="enable_reuse",
564                         help="")
565     exgroup.add_argument("--disable-reuse", action="store_false",
566                         default=True, dest="enable_reuse",
567                         help="")
568
569     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
570     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
571     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
572     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
573                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
574                         default=False)
575
576     exgroup = parser.add_mutually_exclusive_group()
577     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
578                         default=True, dest="submit")
579     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
580                         default=True, dest="submit")
581     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
582                          dest="create_workflow")
583     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
584     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
585
586     exgroup = parser.add_mutually_exclusive_group()
587     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
588                         default=True, dest="wait")
589     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
590                         default=True, dest="wait")
591
592     exgroup = parser.add_mutually_exclusive_group()
593     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
594                         default=True, dest="log_timestamps")
595     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
596                         default=True, dest="log_timestamps")
597
598     parser.add_argument("--api", type=str,
599                         default=None, dest="work_api",
600                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
601
602     parser.add_argument("--compute-checksum", action="store_true", default=False,
603                         help="Compute checksum of contents while collecting outputs",
604                         dest="compute_checksum")
605
606     parser.add_argument("--submit-runner-ram", type=int,
607                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
608                         default=1024)
609
610     parser.add_argument("--submit-runner-image", type=str,
611                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
612                         default=None)
613
614     parser.add_argument("--name", type=str,
615                         help="Name to use for workflow execution instance.",
616                         default=None)
617
618     parser.add_argument("--on-error", type=str,
619                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
620                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
621
622     parser.add_argument("--enable-dev", action="store_true",
623                         help="Enable loading and running development versions "
624                              "of CWL spec.", default=False)
625
626     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
627     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
628
629     return parser
630
631 def add_arv_hints():
632     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
633     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
634     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
635     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
636     res.close()
637     cwltool.process.supportedProcessRequirements.extend([
638         "http://arvados.org/cwl#RunInSingleContainer",
639         "http://arvados.org/cwl#OutputDirType",
640         "http://arvados.org/cwl#RuntimeConstraints",
641         "http://arvados.org/cwl#PartitionRequirement",
642         "http://arvados.org/cwl#APIRequirement",
643         "http://commonwl.org/cwltool#LoadListingRequirement"
644     ])
645
646 def main(args, stdout, stderr, api_client=None, keep_client=None):
647     parser = arg_parser()
648
649     job_order_object = None
650     arvargs = parser.parse_args(args)
651
652     if arvargs.version:
653         print versionstring()
654         return
655
656     if arvargs.update_workflow:
657         if arvargs.update_workflow.find('-7fd4e-') == 5:
658             want_api = 'containers'
659         elif arvargs.update_workflow.find('-p5p6p-') == 5:
660             want_api = 'jobs'
661         else:
662             want_api = None
663         if want_api and arvargs.work_api and want_api != arvargs.work_api:
664             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
665                 arvargs.update_workflow, want_api, arvargs.work_api))
666             return 1
667         arvargs.work_api = want_api
668
669     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
670         job_order_object = ({}, "")
671
672     add_arv_hints()
673
674     try:
675         if api_client is None:
676             api_client=arvados.api('v1', model=OrderedJsonModel())
677         if keep_client is None:
678             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
679         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
680                               num_retries=4, output_name=arvargs.output_name,
681                               output_tags=arvargs.output_tags)
682     except Exception as e:
683         logger.error(e)
684         return 1
685
686     if arvargs.debug:
687         logger.setLevel(logging.DEBUG)
688         logging.getLogger('arvados').setLevel(logging.DEBUG)
689
690     if arvargs.quiet:
691         logger.setLevel(logging.WARN)
692         logging.getLogger('arvados').setLevel(logging.WARN)
693         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
694
695     if arvargs.metrics:
696         metrics.setLevel(logging.DEBUG)
697         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
698
699     if arvargs.log_timestamps:
700         arvados.log_handler.setFormatter(logging.Formatter(
701             '%(asctime)s %(name)s %(levelname)s: %(message)s',
702             '%Y-%m-%d %H:%M:%S'))
703     else:
704         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
705
706     arvargs.conformance_test = None
707     arvargs.use_container = True
708     arvargs.relax_path_checks = True
709     arvargs.validate = None
710
711     make_fs_access = partial(CollectionFsAccess,
712                            collection_cache=runner.collection_cache)
713
714     return cwltool.main.main(args=arvargs,
715                              stdout=stdout,
716                              stderr=stderr,
717                              executor=runner.arv_executor,
718                              makeTool=runner.arv_make_tool,
719                              versionfunc=versionstring,
720                              job_order_object=job_order_object,
721                              make_fs_access=make_fs_access,
722                              fetcher_constructor=partial(CollectionFetcher,
723                                                          api_client=api_client,
724                                                          fs_access=make_fs_access(""),
725                                                          num_retries=runner.num_retries),
726                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
727                              logger_handler=arvados.log_handler,
728                              custom_schema_callback=add_arv_hints)