11100: Add test for --trash-intermediate. Add log message when intermediate
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77         self.intermediate_output_ttl = 0
78         self.intermediate_output_collections = []
79         self.trash_intermediate = False
80
81         if keep_client is not None:
82             self.keep_client = keep_client
83         else:
84             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
85
86         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
87
88         self.work_api = None
89         expected_api = ["jobs", "containers"]
90         for api in expected_api:
91             try:
92                 methods = self.api._rootDesc.get('resources')[api]['methods']
93                 if ('httpMethod' in methods['create'] and
94                     (work_api == api or work_api is None)):
95                     self.work_api = api
96                     break
97             except KeyError:
98                 pass
99
100         if not self.work_api:
101             if work_api is None:
102                 raise Exception("No supported APIs")
103             else:
104                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
105
106     def arv_make_tool(self, toolpath_object, **kwargs):
107         kwargs["work_api"] = self.work_api
108         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
109                                                 api_client=self.api,
110                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
111                                                 num_retries=self.num_retries)
112         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
113             return ArvadosCommandTool(self, toolpath_object, **kwargs)
114         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
115             return ArvadosWorkflow(self, toolpath_object, **kwargs)
116         else:
117             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
118
119     def output_callback(self, out, processStatus):
120         if processStatus == "success":
121             logger.info("Overall process status is %s", processStatus)
122             if self.pipeline:
123                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
124                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
125         else:
126             logger.warn("Overall process status is %s", processStatus)
127             if self.pipeline:
128                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
129                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
130         self.final_status = processStatus
131         self.final_output = out
132
133     def on_message(self, event):
134         if "object_uuid" in event:
135             if event["object_uuid"] in self.processes and event["event_type"] == "update":
136                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
137                     uuid = event["object_uuid"]
138                     with self.lock:
139                         j = self.processes[uuid]
140                         logger.info("%s %s is Running", self.label(j), uuid)
141                         j.running = True
142                         j.update_pipeline_component(event["properties"]["new_attributes"])
143                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
144                     uuid = event["object_uuid"]
145                     try:
146                         self.cond.acquire()
147                         j = self.processes[uuid]
148                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
149                         with Perf(metrics, "done %s" % j.name):
150                             j.done(event["properties"]["new_attributes"])
151                         self.cond.notify()
152                     finally:
153                         self.cond.release()
154
155     def label(self, obj):
156         return "[%s %s]" % (self.work_api[0:-1], obj.name)
157
158     def poll_states(self):
159         """Poll status of jobs or containers listed in the processes dict.
160
161         Runs in a separate thread.
162         """
163
164         try:
165             while True:
166                 self.stop_polling.wait(15)
167                 if self.stop_polling.is_set():
168                     break
169                 with self.lock:
170                     keys = self.processes.keys()
171                 if not keys:
172                     continue
173
174                 if self.work_api == "containers":
175                     table = self.poll_api.container_requests()
176                 elif self.work_api == "jobs":
177                     table = self.poll_api.jobs()
178
179                 try:
180                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
181                 except Exception as e:
182                     logger.warn("Error checking states on API server: %s", e)
183                     continue
184
185                 for p in proc_states["items"]:
186                     self.on_message({
187                         "object_uuid": p["uuid"],
188                         "event_type": "update",
189                         "properties": {
190                             "new_attributes": p
191                         }
192                     })
193         except:
194             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
195             self.cond.acquire()
196             self.processes.clear()
197             self.cond.notify()
198             self.cond.release()
199         finally:
200             self.stop_polling.set()
201
202     def get_uploaded(self):
203         return self.uploaded.copy()
204
205     def add_uploaded(self, src, pair):
206         self.uploaded[src] = pair
207
208     def add_intermediate_output(self, uuid):
209         if uuid:
210             self.intermediate_output_collections.append(uuid)
211
212     def trash_intermediate_output(self):
213         logger.info("Cleaning up intermediate output collections")
214         for i in self.intermediate_output_collections:
215             try:
216                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
217             except:
218                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
219             if sys.exc_info()[0] is KeyboardInterrupt:
220                 break
221
222     def check_features(self, obj):
223         if isinstance(obj, dict):
224             if obj.get("writable"):
225                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
226             if obj.get("class") == "DockerRequirement":
227                 if obj.get("dockerOutputDirectory"):
228                     # TODO: can be supported by containers API, but not jobs API.
229                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
230                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
231             for v in obj.itervalues():
232                 self.check_features(v)
233         elif isinstance(obj, list):
234             for i,v in enumerate(obj):
235                 with SourceLine(obj, i, UnsupportedRequirement):
236                     self.check_features(v)
237
238     def make_output_collection(self, name, tagsString, outputObj):
239         outputObj = copy.deepcopy(outputObj)
240
241         files = []
242         def capture(fileobj):
243             files.append(fileobj)
244
245         adjustDirObjs(outputObj, capture)
246         adjustFileObjs(outputObj, capture)
247
248         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
249
250         final = arvados.collection.Collection(api_client=self.api,
251                                               keep_client=self.keep_client,
252                                               num_retries=self.num_retries)
253
254         for k,v in generatemapper.items():
255             if k.startswith("_:"):
256                 if v.type == "Directory":
257                     continue
258                 if v.type == "CreateFile":
259                     with final.open(v.target, "wb") as f:
260                         f.write(v.resolved.encode("utf-8"))
261                     continue
262
263             if not k.startswith("keep:"):
264                 raise Exception("Output source is not in keep or a literal")
265             sp = k.split("/")
266             srccollection = sp[0][5:]
267             try:
268                 reader = self.collection_cache.get(srccollection)
269                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
270                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
271             except arvados.errors.ArgumentError as e:
272                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
273                 raise
274             except IOError as e:
275                 logger.warn("While preparing output collection: %s", e)
276
277         def rewrite(fileobj):
278             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
279             for k in ("basename", "listing", "contents"):
280                 if k in fileobj:
281                     del fileobj[k]
282
283         adjustDirObjs(outputObj, rewrite)
284         adjustFileObjs(outputObj, rewrite)
285
286         with final.open("cwl.output.json", "w") as f:
287             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
288
289         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
290
291         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
292                     final.api_response()["name"],
293                     final.manifest_locator())
294
295         final_uuid = final.manifest_locator()
296         tags = tagsString.split(',')
297         for tag in tags:
298              self.api.links().create(body={
299                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
300                 }).execute(num_retries=self.num_retries)
301
302         def finalcollection(fileobj):
303             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
304
305         adjustDirObjs(outputObj, finalcollection)
306         adjustFileObjs(outputObj, finalcollection)
307
308         return (outputObj, final)
309
310     def set_crunch_output(self):
311         if self.work_api == "containers":
312             try:
313                 current = self.api.containers().current().execute(num_retries=self.num_retries)
314             except ApiError as e:
315                 # Status code 404 just means we're not running in a container.
316                 if e.resp.status != 404:
317                     logger.info("Getting current container: %s", e)
318                 return
319             try:
320                 self.api.containers().update(uuid=current['uuid'],
321                                              body={
322                                                  'output': self.final_output_collection.portable_data_hash(),
323                                              }).execute(num_retries=self.num_retries)
324                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
325                                               body={
326                                                   'is_trashed': True
327                                               }).execute(num_retries=self.num_retries)
328             except Exception as e:
329                 logger.info("Setting container output: %s", e)
330         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
331             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
332                                    body={
333                                        'output': self.final_output_collection.portable_data_hash(),
334                                        'success': self.final_status == "success",
335                                        'progress':1.0
336                                    }).execute(num_retries=self.num_retries)
337
338     def arv_executor(self, tool, job_order, **kwargs):
339         self.debug = kwargs.get("debug")
340
341         tool.visit(self.check_features)
342
343         self.project_uuid = kwargs.get("project_uuid")
344         self.pipeline = None
345         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
346                                                                  collection_cache=self.collection_cache)
347         self.fs_access = make_fs_access(kwargs["basedir"])
348
349
350         self.trash_intermediate = kwargs["trash_intermediate"]
351         if self.trash_intermediate and self.work_api != "containers":
352             raise Exception("--trash-intermediate is only supported with --api=containers.")
353
354         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
355         if self.intermediate_output_ttl and self.work_api != "containers":
356             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
357         if self.intermediate_output_ttl < 0:
358             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
359
360         if not kwargs.get("name"):
361             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
362
363         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
364         # Also uploads docker images.
365         upload_workflow_deps(self, tool)
366
367         # Reload tool object which may have been updated by
368         # upload_workflow_deps
369         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
370                                   makeTool=self.arv_make_tool,
371                                   loader=tool.doc_loader,
372                                   avsc_names=tool.doc_schema,
373                                   metadata=tool.metadata)
374
375         # Upload local file references in the job order.
376         job_order = upload_job_order(self, "%s input" % kwargs["name"],
377                                      tool, job_order)
378
379         existing_uuid = kwargs.get("update_workflow")
380         if existing_uuid or kwargs.get("create_workflow"):
381             # Create a pipeline template or workflow record and exit.
382             if self.work_api == "jobs":
383                 tmpl = RunnerTemplate(self, tool, job_order,
384                                       kwargs.get("enable_reuse"),
385                                       uuid=existing_uuid,
386                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
387                                       name=kwargs["name"])
388                 tmpl.save()
389                 # cwltool.main will write our return value to stdout.
390                 return (tmpl.uuid, "success")
391             elif self.work_api == "containers":
392                 return (upload_workflow(self, tool, job_order,
393                                         self.project_uuid,
394                                         uuid=existing_uuid,
395                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
396                                         name=kwargs["name"]),
397                         "success")
398
399         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
400
401         kwargs["make_fs_access"] = make_fs_access
402         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
403         kwargs["use_container"] = True
404         kwargs["tmpdir_prefix"] = "tmp"
405         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
406
407         if self.work_api == "containers":
408             kwargs["outdir"] = "/var/spool/cwl"
409             kwargs["docker_outdir"] = "/var/spool/cwl"
410             kwargs["tmpdir"] = "/tmp"
411             kwargs["docker_tmpdir"] = "/tmp"
412         elif self.work_api == "jobs":
413             kwargs["outdir"] = "$(task.outdir)"
414             kwargs["docker_outdir"] = "$(task.outdir)"
415             kwargs["tmpdir"] = "$(task.tmpdir)"
416
417         runnerjob = None
418         if kwargs.get("submit"):
419             # Submit a runner job to run the workflow for us.
420             if self.work_api == "containers":
421                 if tool.tool["class"] == "CommandLineTool":
422                     kwargs["runnerjob"] = tool.tool["id"]
423                     upload_dependencies(self,
424                                         kwargs["name"],
425                                         tool.doc_loader,
426                                         tool.tool,
427                                         tool.tool["id"],
428                                         False)
429                     runnerjob = tool.job(job_order,
430                                          self.output_callback,
431                                          **kwargs).next()
432                 else:
433                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
434                                                 self.output_name,
435                                                 self.output_tags,
436                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
437                                                 name=kwargs.get("name"),
438                                                 on_error=kwargs.get("on_error"),
439                                                 submit_runner_image=kwargs.get("submit_runner_image"),
440                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
441             elif self.work_api == "jobs":
442                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
443                                       self.output_name,
444                                       self.output_tags,
445                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
446                                       name=kwargs.get("name"),
447                                       on_error=kwargs.get("on_error"),
448                                       submit_runner_image=kwargs.get("submit_runner_image"))
449
450         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
451             # Create pipeline for local run
452             self.pipeline = self.api.pipeline_instances().create(
453                 body={
454                     "owner_uuid": self.project_uuid,
455                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
456                     "components": {},
457                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
458             logger.info("Pipeline instance %s", self.pipeline["uuid"])
459
460         if runnerjob and not kwargs.get("wait"):
461             runnerjob.run(wait=kwargs.get("wait"))
462             return (runnerjob.uuid, "success")
463
464         self.poll_api = arvados.api('v1')
465         self.polling_thread = threading.Thread(target=self.poll_states)
466         self.polling_thread.start()
467
468         if runnerjob:
469             jobiter = iter((runnerjob,))
470         else:
471             if "cwl_runner_job" in kwargs:
472                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
473             jobiter = tool.job(job_order,
474                                self.output_callback,
475                                **kwargs)
476
477         try:
478             self.cond.acquire()
479             # Will continue to hold the lock for the duration of this code
480             # except when in cond.wait(), at which point on_message can update
481             # job state and process output callbacks.
482
483             loopperf = Perf(metrics, "jobiter")
484             loopperf.__enter__()
485             for runnable in jobiter:
486                 loopperf.__exit__()
487
488                 if self.stop_polling.is_set():
489                     break
490
491                 if runnable:
492                     with Perf(metrics, "run"):
493                         runnable.run(**kwargs)
494                 else:
495                     if self.processes:
496                         self.cond.wait(1)
497                     else:
498                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
499                         break
500                 loopperf.__enter__()
501             loopperf.__exit__()
502
503             while self.processes:
504                 self.cond.wait(1)
505
506         except UnsupportedRequirement:
507             raise
508         except:
509             if sys.exc_info()[0] is KeyboardInterrupt:
510                 logger.error("Interrupted, marking pipeline as failed")
511             else:
512                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
513             if self.pipeline:
514                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
515                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
516             if runnerjob and runnerjob.uuid and self.work_api == "containers":
517                 self.api.container_requests().update(uuid=runnerjob.uuid,
518                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
519         finally:
520             self.cond.release()
521             self.stop_polling.set()
522             self.polling_thread.join()
523
524         if self.final_status == "UnsupportedRequirement":
525             raise UnsupportedRequirement("Check log for details.")
526
527         if self.final_output is None:
528             raise WorkflowException("Workflow did not return a result.")
529
530         if kwargs.get("submit") and isinstance(runnerjob, Runner):
531             logger.info("Final output collection %s", runnerjob.final_output)
532         else:
533             if self.output_name is None:
534                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
535             if self.output_tags is None:
536                 self.output_tags = ""
537             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
538             self.set_crunch_output()
539
540         if kwargs.get("compute_checksum"):
541             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
542             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
543
544         if self.trash_intermediate and self.final_status == "success":
545             self.trash_intermediate_output()
546
547         return (self.final_output, self.final_status)
548
549
550 def versionstring():
551     """Print version string of key packages for provenance and debugging."""
552
553     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
554     arvpkg = pkg_resources.require("arvados-python-client")
555     cwlpkg = pkg_resources.require("cwltool")
556
557     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
558                                     "arvados-python-client", arvpkg[0].version,
559                                     "cwltool", cwlpkg[0].version)
560
561
562 def arg_parser():  # type: () -> argparse.ArgumentParser
563     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
564
565     parser.add_argument("--basedir", type=str,
566                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
567     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
568                         help="Output directory, default current directory")
569
570     parser.add_argument("--eval-timeout",
571                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
572                         type=float,
573                         default=20)
574
575     exgroup = parser.add_mutually_exclusive_group()
576     exgroup.add_argument("--print-dot", action="store_true",
577                          help="Print workflow visualization in graphviz format and exit")
578     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
579     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
580
581     exgroup = parser.add_mutually_exclusive_group()
582     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
583     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
584     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
585
586     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
587
588     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
589
590     exgroup = parser.add_mutually_exclusive_group()
591     exgroup.add_argument("--enable-reuse", action="store_true",
592                         default=True, dest="enable_reuse",
593                         help="Enable job or container reuse (default)")
594     exgroup.add_argument("--disable-reuse", action="store_false",
595                         default=True, dest="enable_reuse",
596                         help="Disable job or container reuse")
597
598     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
599     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
600     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
601     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
602                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
603                         default=False)
604
605     exgroup = parser.add_mutually_exclusive_group()
606     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
607                         default=True, dest="submit")
608     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
609                         default=True, dest="submit")
610     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
611                          dest="create_workflow")
612     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
613     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
614
615     exgroup = parser.add_mutually_exclusive_group()
616     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
617                         default=True, dest="wait")
618     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
619                         default=True, dest="wait")
620
621     exgroup = parser.add_mutually_exclusive_group()
622     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
623                         default=True, dest="log_timestamps")
624     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
625                         default=True, dest="log_timestamps")
626
627     parser.add_argument("--api", type=str,
628                         default=None, dest="work_api",
629                         choices=("jobs", "containers"),
630                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
631
632     parser.add_argument("--compute-checksum", action="store_true", default=False,
633                         help="Compute checksum of contents while collecting outputs",
634                         dest="compute_checksum")
635
636     parser.add_argument("--submit-runner-ram", type=int,
637                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
638                         default=1024)
639
640     parser.add_argument("--submit-runner-image", type=str,
641                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
642                         default=None)
643
644     parser.add_argument("--name", type=str,
645                         help="Name to use for workflow execution instance.",
646                         default=None)
647
648     parser.add_argument("--on-error", type=str,
649                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
650                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
651
652     parser.add_argument("--enable-dev", action="store_true",
653                         help="Enable loading and running development versions "
654                              "of CWL spec.", default=False)
655
656     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
657                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
658                         default=0)
659
660     exgroup = parser.add_mutually_exclusive_group()
661     exgroup.add_argument("--trash-intermediate", action="store_true",
662                         default=False, dest="trash_intermediate",
663                          help="Immediately trash intermediate outputs on workflow success.")
664     exgroup.add_argument("--no-trash-intermediate", action="store_false",
665                         default=False, dest="trash_intermediate",
666                         help="Do not trash intermediate outputs (default).")
667
668     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
669     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
670
671     return parser
672
673 def add_arv_hints():
674     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
675     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
676     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
677     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
678     res.close()
679     cwltool.process.supportedProcessRequirements.extend([
680         "http://arvados.org/cwl#RunInSingleContainer",
681         "http://arvados.org/cwl#OutputDirType",
682         "http://arvados.org/cwl#RuntimeConstraints",
683         "http://arvados.org/cwl#PartitionRequirement",
684         "http://arvados.org/cwl#APIRequirement",
685         "http://commonwl.org/cwltool#LoadListingRequirement",
686         "http://arvados.org/cwl#IntermediateOutput"
687     ])
688
689 def main(args, stdout, stderr, api_client=None, keep_client=None):
690     parser = arg_parser()
691
692     job_order_object = None
693     arvargs = parser.parse_args(args)
694
695     if arvargs.version:
696         print versionstring()
697         return
698
699     if arvargs.update_workflow:
700         if arvargs.update_workflow.find('-7fd4e-') == 5:
701             want_api = 'containers'
702         elif arvargs.update_workflow.find('-p5p6p-') == 5:
703             want_api = 'jobs'
704         else:
705             want_api = None
706         if want_api and arvargs.work_api and want_api != arvargs.work_api:
707             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
708                 arvargs.update_workflow, want_api, arvargs.work_api))
709             return 1
710         arvargs.work_api = want_api
711
712     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
713         job_order_object = ({}, "")
714
715     add_arv_hints()
716
717     try:
718         if api_client is None:
719             api_client=arvados.api('v1', model=OrderedJsonModel())
720         if keep_client is None:
721             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
722         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
723                               num_retries=4, output_name=arvargs.output_name,
724                               output_tags=arvargs.output_tags)
725     except Exception as e:
726         logger.error(e)
727         return 1
728
729     if arvargs.debug:
730         logger.setLevel(logging.DEBUG)
731         logging.getLogger('arvados').setLevel(logging.DEBUG)
732
733     if arvargs.quiet:
734         logger.setLevel(logging.WARN)
735         logging.getLogger('arvados').setLevel(logging.WARN)
736         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
737
738     if arvargs.metrics:
739         metrics.setLevel(logging.DEBUG)
740         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
741
742     if arvargs.log_timestamps:
743         arvados.log_handler.setFormatter(logging.Formatter(
744             '%(asctime)s %(name)s %(levelname)s: %(message)s',
745             '%Y-%m-%d %H:%M:%S'))
746     else:
747         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
748
749     arvargs.conformance_test = None
750     arvargs.use_container = True
751     arvargs.relax_path_checks = True
752     arvargs.validate = None
753
754     make_fs_access = partial(CollectionFsAccess,
755                            collection_cache=runner.collection_cache)
756
757     return cwltool.main.main(args=arvargs,
758                              stdout=stdout,
759                              stderr=stderr,
760                              executor=runner.arv_executor,
761                              makeTool=runner.arv_make_tool,
762                              versionfunc=versionstring,
763                              job_order_object=job_order_object,
764                              make_fs_access=make_fs_access,
765                              fetcher_constructor=partial(CollectionFetcher,
766                                                          api_client=api_client,
767                                                          fs_access=make_fs_access(""),
768                                                          num_retries=runner.num_retries),
769                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
770                              logger_handler=arvados.log_handler,
771                              custom_schema_callback=add_arv_hints)