13111: Merge branch 'master' into 12308-go-fuse
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20
21 from cwltool.errors import WorkflowException
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 from .arvcontainer import ArvadosContainer, RunnerContainer
33 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
34 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
35 from .arvtool import ArvadosCommandTool
36 from .arvworkflow import ArvadosWorkflow, upload_workflow
37 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
38 from .perf import Perf
39 from .pathmapper import NoFollowPathMapper
40 from ._version import __version__
41
42 from cwltool.pack import pack
43 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
44 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
45 from cwltool.draft2tool import compute_checksums
46 from arvados.api import OrderedJsonModel
47
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50 logger.setLevel(logging.INFO)
51
52 arvados.log_handler.setFormatter(logging.Formatter(
53         '%(asctime)s %(name)s %(levelname)s: %(message)s',
54         '%Y-%m-%d %H:%M:%S'))
55
56 class ArvCwlRunner(object):
57     """Execute a CWL tool or workflow, submit work (using either jobs or
58     containers API), wait for them to complete, and report output.
59
60     """
61
62     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63         self.api = api_client
64         self.processes = {}
65         self.lock = threading.Lock()
66         self.cond = threading.Condition(self.lock)
67         self.final_output = None
68         self.final_status = None
69         self.uploaded = {}
70         self.num_retries = num_retries
71         self.uuid = None
72         self.stop_polling = threading.Event()
73         self.poll_api = None
74         self.pipeline = None
75         self.final_output_collection = None
76         self.output_name = output_name
77         self.output_tags = output_tags
78         self.project_uuid = None
79         self.intermediate_output_ttl = 0
80         self.intermediate_output_collections = []
81         self.trash_intermediate = False
82
83         if keep_client is not None:
84             self.keep_client = keep_client
85         else:
86             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
87
88         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
89
90         self.work_api = None
91         expected_api = ["jobs", "containers"]
92         for api in expected_api:
93             try:
94                 methods = self.api._rootDesc.get('resources')[api]['methods']
95                 if ('httpMethod' in methods['create'] and
96                     (work_api == api or work_api is None)):
97                     self.work_api = api
98                     break
99             except KeyError:
100                 pass
101
102         if not self.work_api:
103             if work_api is None:
104                 raise Exception("No supported APIs")
105             else:
106                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
107
108     def arv_make_tool(self, toolpath_object, **kwargs):
109         kwargs["work_api"] = self.work_api
110         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
111                                                 api_client=self.api,
112                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
113                                                 num_retries=self.num_retries)
114         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
115         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
116             return ArvadosCommandTool(self, toolpath_object, **kwargs)
117         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
118             return ArvadosWorkflow(self, toolpath_object, **kwargs)
119         else:
120             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
121
122     def output_callback(self, out, processStatus):
123         if processStatus == "success":
124             logger.info("Overall process status is %s", processStatus)
125             if self.pipeline:
126                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
127                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
128         else:
129             logger.warn("Overall process status is %s", processStatus)
130             if self.pipeline:
131                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
132                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
133         self.final_status = processStatus
134         self.final_output = out
135
136     def on_message(self, event):
137         if "object_uuid" in event:
138             if event["object_uuid"] in self.processes and event["event_type"] == "update":
139                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
140                     uuid = event["object_uuid"]
141                     with self.lock:
142                         j = self.processes[uuid]
143                         logger.info("%s %s is Running", self.label(j), uuid)
144                         j.running = True
145                         j.update_pipeline_component(event["properties"]["new_attributes"])
146                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
147                     uuid = event["object_uuid"]
148                     try:
149                         self.cond.acquire()
150                         j = self.processes[uuid]
151                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
152                         with Perf(metrics, "done %s" % j.name):
153                             j.done(event["properties"]["new_attributes"])
154                         self.cond.notify()
155                     finally:
156                         self.cond.release()
157
158     def label(self, obj):
159         return "[%s %s]" % (self.work_api[0:-1], obj.name)
160
161     def poll_states(self):
162         """Poll status of jobs or containers listed in the processes dict.
163
164         Runs in a separate thread.
165         """
166
167         try:
168             while True:
169                 self.stop_polling.wait(15)
170                 if self.stop_polling.is_set():
171                     break
172                 with self.lock:
173                     keys = self.processes.keys()
174                 if not keys:
175                     continue
176
177                 if self.work_api == "containers":
178                     table = self.poll_api.container_requests()
179                 elif self.work_api == "jobs":
180                     table = self.poll_api.jobs()
181
182                 try:
183                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
184                 except Exception as e:
185                     logger.warn("Error checking states on API server: %s", e)
186                     continue
187
188                 for p in proc_states["items"]:
189                     self.on_message({
190                         "object_uuid": p["uuid"],
191                         "event_type": "update",
192                         "properties": {
193                             "new_attributes": p
194                         }
195                     })
196         except:
197             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
198             self.cond.acquire()
199             self.processes.clear()
200             self.cond.notify()
201             self.cond.release()
202         finally:
203             self.stop_polling.set()
204
205     def get_uploaded(self):
206         return self.uploaded.copy()
207
208     def add_uploaded(self, src, pair):
209         self.uploaded[src] = pair
210
211     def add_intermediate_output(self, uuid):
212         if uuid:
213             self.intermediate_output_collections.append(uuid)
214
215     def trash_intermediate_output(self):
216         logger.info("Cleaning up intermediate output collections")
217         for i in self.intermediate_output_collections:
218             try:
219                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
220             except:
221                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
222             if sys.exc_info()[0] is KeyboardInterrupt:
223                 break
224
225     def check_features(self, obj):
226         if isinstance(obj, dict):
227             if obj.get("writable"):
228                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
229             if obj.get("class") == "DockerRequirement":
230                 if obj.get("dockerOutputDirectory"):
231                     # TODO: can be supported by containers API, but not jobs API.
232                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
233                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
234             for v in obj.itervalues():
235                 self.check_features(v)
236         elif isinstance(obj, list):
237             for i,v in enumerate(obj):
238                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
239                     self.check_features(v)
240
241     def make_output_collection(self, name, tagsString, outputObj):
242         outputObj = copy.deepcopy(outputObj)
243
244         files = []
245         def capture(fileobj):
246             files.append(fileobj)
247
248         adjustDirObjs(outputObj, capture)
249         adjustFileObjs(outputObj, capture)
250
251         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
252
253         final = arvados.collection.Collection(api_client=self.api,
254                                               keep_client=self.keep_client,
255                                               num_retries=self.num_retries)
256
257         for k,v in generatemapper.items():
258             if k.startswith("_:"):
259                 if v.type == "Directory":
260                     continue
261                 if v.type == "CreateFile":
262                     with final.open(v.target, "wb") as f:
263                         f.write(v.resolved.encode("utf-8"))
264                     continue
265
266             if not k.startswith("keep:"):
267                 raise Exception("Output source is not in keep or a literal")
268             sp = k.split("/")
269             srccollection = sp[0][5:]
270             try:
271                 reader = self.collection_cache.get(srccollection)
272                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
273                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
274             except arvados.errors.ArgumentError as e:
275                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
276                 raise
277             except IOError as e:
278                 logger.warn("While preparing output collection: %s", e)
279
280         def rewrite(fileobj):
281             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
282             for k in ("basename", "listing", "contents", "nameext", "nameroot", "dirname"):
283                 if k in fileobj:
284                     del fileobj[k]
285
286         adjustDirObjs(outputObj, rewrite)
287         adjustFileObjs(outputObj, rewrite)
288
289         with final.open("cwl.output.json", "w") as f:
290             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
291
292         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
293
294         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
295                     final.api_response()["name"],
296                     final.manifest_locator())
297
298         final_uuid = final.manifest_locator()
299         tags = tagsString.split(',')
300         for tag in tags:
301              self.api.links().create(body={
302                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
303                 }).execute(num_retries=self.num_retries)
304
305         def finalcollection(fileobj):
306             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
307
308         adjustDirObjs(outputObj, finalcollection)
309         adjustFileObjs(outputObj, finalcollection)
310
311         return (outputObj, final)
312
313     def set_crunch_output(self):
314         if self.work_api == "containers":
315             try:
316                 current = self.api.containers().current().execute(num_retries=self.num_retries)
317             except ApiError as e:
318                 # Status code 404 just means we're not running in a container.
319                 if e.resp.status != 404:
320                     logger.info("Getting current container: %s", e)
321                 return
322             try:
323                 self.api.containers().update(uuid=current['uuid'],
324                                              body={
325                                                  'output': self.final_output_collection.portable_data_hash(),
326                                              }).execute(num_retries=self.num_retries)
327                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
328                                               body={
329                                                   'is_trashed': True
330                                               }).execute(num_retries=self.num_retries)
331             except Exception as e:
332                 logger.info("Setting container output: %s", e)
333         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
334             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
335                                    body={
336                                        'output': self.final_output_collection.portable_data_hash(),
337                                        'success': self.final_status == "success",
338                                        'progress':1.0
339                                    }).execute(num_retries=self.num_retries)
340
341     def arv_executor(self, tool, job_order, **kwargs):
342         self.debug = kwargs.get("debug")
343
344         tool.visit(self.check_features)
345
346         self.project_uuid = kwargs.get("project_uuid")
347         self.pipeline = None
348         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
349                                                                  collection_cache=self.collection_cache)
350         self.fs_access = make_fs_access(kwargs["basedir"])
351
352
353         self.trash_intermediate = kwargs["trash_intermediate"]
354         if self.trash_intermediate and self.work_api != "containers":
355             raise Exception("--trash-intermediate is only supported with --api=containers.")
356
357         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
358         if self.intermediate_output_ttl and self.work_api != "containers":
359             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
360         if self.intermediate_output_ttl < 0:
361             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
362
363         if not kwargs.get("name"):
364             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
365
366         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
367         # Also uploads docker images.
368         merged_map = upload_workflow_deps(self, tool)
369
370         # Reload tool object which may have been updated by
371         # upload_workflow_deps
372         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
373                                   makeTool=self.arv_make_tool,
374                                   loader=tool.doc_loader,
375                                   avsc_names=tool.doc_schema,
376                                   metadata=tool.metadata)
377
378         # Upload local file references in the job order.
379         job_order = upload_job_order(self, "%s input" % kwargs["name"],
380                                      tool, job_order)
381
382         existing_uuid = kwargs.get("update_workflow")
383         if existing_uuid or kwargs.get("create_workflow"):
384             # Create a pipeline template or workflow record and exit.
385             if self.work_api == "jobs":
386                 tmpl = RunnerTemplate(self, tool, job_order,
387                                       kwargs.get("enable_reuse"),
388                                       uuid=existing_uuid,
389                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
390                                       name=kwargs["name"],
391                                       merged_map=merged_map)
392                 tmpl.save()
393                 # cwltool.main will write our return value to stdout.
394                 return (tmpl.uuid, "success")
395             elif self.work_api == "containers":
396                 return (upload_workflow(self, tool, job_order,
397                                         self.project_uuid,
398                                         uuid=existing_uuid,
399                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
400                                         name=kwargs["name"],
401                                         merged_map=merged_map),
402                         "success")
403
404         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
405
406         kwargs["make_fs_access"] = make_fs_access
407         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
408         kwargs["use_container"] = True
409         kwargs["tmpdir_prefix"] = "tmp"
410         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
411
412         if self.work_api == "containers":
413             kwargs["outdir"] = "/var/spool/cwl"
414             kwargs["docker_outdir"] = "/var/spool/cwl"
415             kwargs["tmpdir"] = "/tmp"
416             kwargs["docker_tmpdir"] = "/tmp"
417         elif self.work_api == "jobs":
418             kwargs["outdir"] = "$(task.outdir)"
419             kwargs["docker_outdir"] = "$(task.outdir)"
420             kwargs["tmpdir"] = "$(task.tmpdir)"
421
422         runnerjob = None
423         if kwargs.get("submit"):
424             # Submit a runner job to run the workflow for us.
425             if self.work_api == "containers":
426                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
427                     kwargs["runnerjob"] = tool.tool["id"]
428                     runnerjob = tool.job(job_order,
429                                          self.output_callback,
430                                          **kwargs).next()
431                 else:
432                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
433                                                 self.output_name,
434                                                 self.output_tags,
435                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
436                                                 name=kwargs.get("name"),
437                                                 on_error=kwargs.get("on_error"),
438                                                 submit_runner_image=kwargs.get("submit_runner_image"),
439                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
440                                                 merged_map=merged_map)
441             elif self.work_api == "jobs":
442                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
443                                       self.output_name,
444                                       self.output_tags,
445                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
446                                       name=kwargs.get("name"),
447                                       on_error=kwargs.get("on_error"),
448                                       submit_runner_image=kwargs.get("submit_runner_image"),
449                                       merged_map=merged_map)
450         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
451             # Create pipeline for local run
452             self.pipeline = self.api.pipeline_instances().create(
453                 body={
454                     "owner_uuid": self.project_uuid,
455                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
456                     "components": {},
457                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
458             logger.info("Pipeline instance %s", self.pipeline["uuid"])
459
460         if runnerjob and not kwargs.get("wait"):
461             runnerjob.run(wait=kwargs.get("wait"))
462             return (runnerjob.uuid, "success")
463
464         self.poll_api = arvados.api('v1')
465         self.polling_thread = threading.Thread(target=self.poll_states)
466         self.polling_thread.start()
467
468         if runnerjob:
469             jobiter = iter((runnerjob,))
470         else:
471             if "cwl_runner_job" in kwargs:
472                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
473             jobiter = tool.job(job_order,
474                                self.output_callback,
475                                **kwargs)
476
477         try:
478             self.cond.acquire()
479             # Will continue to hold the lock for the duration of this code
480             # except when in cond.wait(), at which point on_message can update
481             # job state and process output callbacks.
482
483             loopperf = Perf(metrics, "jobiter")
484             loopperf.__enter__()
485             for runnable in jobiter:
486                 loopperf.__exit__()
487
488                 if self.stop_polling.is_set():
489                     break
490
491                 if runnable:
492                     with Perf(metrics, "run"):
493                         runnable.run(**kwargs)
494                 else:
495                     if self.processes:
496                         self.cond.wait(1)
497                     else:
498                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
499                         break
500                 loopperf.__enter__()
501             loopperf.__exit__()
502
503             while self.processes:
504                 self.cond.wait(1)
505
506         except UnsupportedRequirement:
507             raise
508         except:
509             if sys.exc_info()[0] is KeyboardInterrupt:
510                 logger.error("Interrupted, marking pipeline as failed")
511             else:
512                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
513             if self.pipeline:
514                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
515                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
516             if runnerjob and runnerjob.uuid and self.work_api == "containers":
517                 self.api.container_requests().update(uuid=runnerjob.uuid,
518                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
519         finally:
520             self.cond.release()
521             self.stop_polling.set()
522             self.polling_thread.join()
523
524         if self.final_status == "UnsupportedRequirement":
525             raise UnsupportedRequirement("Check log for details.")
526
527         if self.final_output is None:
528             raise WorkflowException("Workflow did not return a result.")
529
530         if kwargs.get("submit") and isinstance(runnerjob, Runner):
531             logger.info("Final output collection %s", runnerjob.final_output)
532         else:
533             if self.output_name is None:
534                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
535             if self.output_tags is None:
536                 self.output_tags = ""
537             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
538             self.set_crunch_output()
539
540         if kwargs.get("compute_checksum"):
541             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
542             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
543
544         if self.trash_intermediate and self.final_status == "success":
545             self.trash_intermediate_output()
546
547         return (self.final_output, self.final_status)
548
549
550 def versionstring():
551     """Print version string of key packages for provenance and debugging."""
552
553     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
554     arvpkg = pkg_resources.require("arvados-python-client")
555     cwlpkg = pkg_resources.require("cwltool")
556
557     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
558                                     "arvados-python-client", arvpkg[0].version,
559                                     "cwltool", cwlpkg[0].version)
560
561
562 def arg_parser():  # type: () -> argparse.ArgumentParser
563     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
564
565     parser.add_argument("--basedir", type=str,
566                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
567     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
568                         help="Output directory, default current directory")
569
570     parser.add_argument("--eval-timeout",
571                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
572                         type=float,
573                         default=20)
574
575     exgroup = parser.add_mutually_exclusive_group()
576     exgroup.add_argument("--print-dot", action="store_true",
577                          help="Print workflow visualization in graphviz format and exit")
578     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
579     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
580
581     exgroup = parser.add_mutually_exclusive_group()
582     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
583     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
584     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
585
586     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
587
588     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
589
590     exgroup = parser.add_mutually_exclusive_group()
591     exgroup.add_argument("--enable-reuse", action="store_true",
592                         default=True, dest="enable_reuse",
593                         help="Enable job or container reuse (default)")
594     exgroup.add_argument("--disable-reuse", action="store_false",
595                         default=True, dest="enable_reuse",
596                         help="Disable job or container reuse")
597
598     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
599     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
600     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
601     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
602                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
603                         default=False)
604
605     exgroup = parser.add_mutually_exclusive_group()
606     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
607                         default=True, dest="submit")
608     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
609                         default=True, dest="submit")
610     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
611                          dest="create_workflow")
612     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
613     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
614
615     exgroup = parser.add_mutually_exclusive_group()
616     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
617                         default=True, dest="wait")
618     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
619                         default=True, dest="wait")
620
621     exgroup = parser.add_mutually_exclusive_group()
622     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
623                         default=True, dest="log_timestamps")
624     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
625                         default=True, dest="log_timestamps")
626
627     parser.add_argument("--api", type=str,
628                         default=None, dest="work_api",
629                         choices=("jobs", "containers"),
630                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
631
632     parser.add_argument("--compute-checksum", action="store_true", default=False,
633                         help="Compute checksum of contents while collecting outputs",
634                         dest="compute_checksum")
635
636     parser.add_argument("--submit-runner-ram", type=int,
637                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
638                         default=1024)
639
640     parser.add_argument("--submit-runner-image", type=str,
641                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
642                         default=None)
643
644     parser.add_argument("--name", type=str,
645                         help="Name to use for workflow execution instance.",
646                         default=None)
647
648     parser.add_argument("--on-error", type=str,
649                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
650                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
651
652     parser.add_argument("--enable-dev", action="store_true",
653                         help="Enable loading and running development versions "
654                              "of CWL spec.", default=False)
655
656     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
657                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
658                         default=0)
659
660     exgroup = parser.add_mutually_exclusive_group()
661     exgroup.add_argument("--trash-intermediate", action="store_true",
662                         default=False, dest="trash_intermediate",
663                          help="Immediately trash intermediate outputs on workflow success.")
664     exgroup.add_argument("--no-trash-intermediate", action="store_false",
665                         default=False, dest="trash_intermediate",
666                         help="Do not trash intermediate outputs (default).")
667
668     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
669     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
670
671     return parser
672
673 def add_arv_hints():
674     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
675     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
676     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
677     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
678     res.close()
679     cwltool.process.supportedProcessRequirements.extend([
680         "http://arvados.org/cwl#RunInSingleContainer",
681         "http://arvados.org/cwl#OutputDirType",
682         "http://arvados.org/cwl#RuntimeConstraints",
683         "http://arvados.org/cwl#PartitionRequirement",
684         "http://arvados.org/cwl#APIRequirement",
685         "http://commonwl.org/cwltool#LoadListingRequirement",
686         "http://arvados.org/cwl#IntermediateOutput",
687         "http://arvados.org/cwl#ReuseRequirement"
688     ])
689
690 def main(args, stdout, stderr, api_client=None, keep_client=None):
691     parser = arg_parser()
692
693     job_order_object = None
694     arvargs = parser.parse_args(args)
695
696     if arvargs.version:
697         print versionstring()
698         return
699
700     if arvargs.update_workflow:
701         if arvargs.update_workflow.find('-7fd4e-') == 5:
702             want_api = 'containers'
703         elif arvargs.update_workflow.find('-p5p6p-') == 5:
704             want_api = 'jobs'
705         else:
706             want_api = None
707         if want_api and arvargs.work_api and want_api != arvargs.work_api:
708             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
709                 arvargs.update_workflow, want_api, arvargs.work_api))
710             return 1
711         arvargs.work_api = want_api
712
713     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
714         job_order_object = ({}, "")
715
716     add_arv_hints()
717
718     try:
719         if api_client is None:
720             api_client=arvados.api('v1', model=OrderedJsonModel())
721         if keep_client is None:
722             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
723         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
724                               num_retries=4, output_name=arvargs.output_name,
725                               output_tags=arvargs.output_tags)
726     except Exception as e:
727         logger.error(e)
728         return 1
729
730     if arvargs.debug:
731         logger.setLevel(logging.DEBUG)
732         logging.getLogger('arvados').setLevel(logging.DEBUG)
733
734     if arvargs.quiet:
735         logger.setLevel(logging.WARN)
736         logging.getLogger('arvados').setLevel(logging.WARN)
737         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
738
739     if arvargs.metrics:
740         metrics.setLevel(logging.DEBUG)
741         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
742
743     if arvargs.log_timestamps:
744         arvados.log_handler.setFormatter(logging.Formatter(
745             '%(asctime)s %(name)s %(levelname)s: %(message)s',
746             '%Y-%m-%d %H:%M:%S'))
747     else:
748         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
749
750     arvargs.conformance_test = None
751     arvargs.use_container = True
752     arvargs.relax_path_checks = True
753     arvargs.print_supported_versions = False
754
755     make_fs_access = partial(CollectionFsAccess,
756                            collection_cache=runner.collection_cache)
757
758     return cwltool.main.main(args=arvargs,
759                              stdout=stdout,
760                              stderr=stderr,
761                              executor=runner.arv_executor,
762                              makeTool=runner.arv_make_tool,
763                              versionfunc=versionstring,
764                              job_order_object=job_order_object,
765                              make_fs_access=make_fs_access,
766                              fetcher_constructor=partial(CollectionFetcher,
767                                                          api_client=api_client,
768                                                          fs_access=make_fs_access(""),
769                                                          num_retries=runner.num_retries),
770                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
771                              logger_handler=arvados.log_handler,
772                              custom_schema_callback=add_arv_hints)