Fix arvados-cwl-runner --version argument (again) refs #12526
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20
21 from cwltool.errors import WorkflowException
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
42
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.command_line_tool import compute_checksums
47 from arvados.api import OrderedJsonModel
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
52
53 arvados.log_handler.setFormatter(logging.Formatter(
54         '%(asctime)s %(name)s %(levelname)s: %(message)s',
55         '%Y-%m-%d %H:%M:%S'))
56
57 DEFAULT_PRIORITY = 500
58
59 class ArvCwlRunner(object):
60     """Execute a CWL tool or workflow, submit work (using either jobs or
61     containers API), wait for them to complete, and report output.
62
63     """
64
65     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
66         self.api = api_client
67         self.processes = {}
68         self.lock = threading.Lock()
69         self.cond = threading.Condition(self.lock)
70         self.final_output = None
71         self.final_status = None
72         self.uploaded = {}
73         self.num_retries = num_retries
74         self.uuid = None
75         self.stop_polling = threading.Event()
76         self.poll_api = None
77         self.pipeline = None
78         self.final_output_collection = None
79         self.output_name = output_name
80         self.output_tags = output_tags
81         self.project_uuid = None
82         self.intermediate_output_ttl = 0
83         self.intermediate_output_collections = []
84         self.trash_intermediate = False
85
86         if keep_client is not None:
87             self.keep_client = keep_client
88         else:
89             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
90
91         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
92
93         self.work_api = None
94         expected_api = ["jobs", "containers"]
95         for api in expected_api:
96             try:
97                 methods = self.api._rootDesc.get('resources')[api]['methods']
98                 if ('httpMethod' in methods['create'] and
99                     (work_api == api or work_api is None)):
100                     self.work_api = api
101                     break
102             except KeyError:
103                 pass
104
105         if not self.work_api:
106             if work_api is None:
107                 raise Exception("No supported APIs")
108             else:
109                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
110
111     def arv_make_tool(self, toolpath_object, **kwargs):
112         kwargs["work_api"] = self.work_api
113         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
114                                                 api_client=self.api,
115                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
116                                                 num_retries=self.num_retries)
117         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
118         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
119             return ArvadosCommandTool(self, toolpath_object, **kwargs)
120         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
121             return ArvadosWorkflow(self, toolpath_object, **kwargs)
122         else:
123             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
124
125     def output_callback(self, out, processStatus):
126         if processStatus == "success":
127             logger.info("Overall process status is %s", processStatus)
128             if self.pipeline:
129                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
131         else:
132             logger.warn("Overall process status is %s", processStatus)
133             if self.pipeline:
134                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
135                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
136         self.final_status = processStatus
137         self.final_output = out
138
139     def on_message(self, event):
140         if "object_uuid" in event:
141             if event["object_uuid"] in self.processes and event["event_type"] == "update":
142                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
143                     uuid = event["object_uuid"]
144                     with self.lock:
145                         j = self.processes[uuid]
146                         logger.info("%s %s is Running", self.label(j), uuid)
147                         j.running = True
148                         j.update_pipeline_component(event["properties"]["new_attributes"])
149                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
150                     uuid = event["object_uuid"]
151                     try:
152                         self.cond.acquire()
153                         j = self.processes[uuid]
154                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
155                         with Perf(metrics, "done %s" % j.name):
156                             j.done(event["properties"]["new_attributes"])
157                         self.cond.notify()
158                     finally:
159                         self.cond.release()
160
161     def label(self, obj):
162         return "[%s %s]" % (self.work_api[0:-1], obj.name)
163
164     def poll_states(self):
165         """Poll status of jobs or containers listed in the processes dict.
166
167         Runs in a separate thread.
168         """
169
170         try:
171             while True:
172                 self.stop_polling.wait(15)
173                 if self.stop_polling.is_set():
174                     break
175                 with self.lock:
176                     keys = self.processes.keys()
177                 if not keys:
178                     continue
179
180                 if self.work_api == "containers":
181                     table = self.poll_api.container_requests()
182                 elif self.work_api == "jobs":
183                     table = self.poll_api.jobs()
184
185                 try:
186                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
187                 except Exception as e:
188                     logger.warn("Error checking states on API server: %s", e)
189                     continue
190
191                 for p in proc_states["items"]:
192                     self.on_message({
193                         "object_uuid": p["uuid"],
194                         "event_type": "update",
195                         "properties": {
196                             "new_attributes": p
197                         }
198                     })
199         except:
200             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
201             self.cond.acquire()
202             self.processes.clear()
203             self.cond.notify()
204             self.cond.release()
205         finally:
206             self.stop_polling.set()
207
208     def get_uploaded(self):
209         return self.uploaded.copy()
210
211     def add_uploaded(self, src, pair):
212         self.uploaded[src] = pair
213
214     def add_intermediate_output(self, uuid):
215         if uuid:
216             self.intermediate_output_collections.append(uuid)
217
218     def trash_intermediate_output(self):
219         logger.info("Cleaning up intermediate output collections")
220         for i in self.intermediate_output_collections:
221             try:
222                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
223             except:
224                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
225             if sys.exc_info()[0] is KeyboardInterrupt:
226                 break
227
228     def check_features(self, obj):
229         if isinstance(obj, dict):
230             if obj.get("writable") and self.work_api != "containers":
231                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
232             if obj.get("class") == "DockerRequirement":
233                 if obj.get("dockerOutputDirectory"):
234                     if self.work_api != "containers":
235                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
236                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
237                     if not obj.get("dockerOutputDirectory").startswith('/'):
238                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
239                             "Option 'dockerOutputDirectory' must be an absolute path.")
240             for v in obj.itervalues():
241                 self.check_features(v)
242         elif isinstance(obj, list):
243             for i,v in enumerate(obj):
244                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
245                     self.check_features(v)
246
247     def make_output_collection(self, name, tagsString, outputObj):
248         outputObj = copy.deepcopy(outputObj)
249
250         files = []
251         def capture(fileobj):
252             files.append(fileobj)
253
254         adjustDirObjs(outputObj, capture)
255         adjustFileObjs(outputObj, capture)
256
257         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
258
259         final = arvados.collection.Collection(api_client=self.api,
260                                               keep_client=self.keep_client,
261                                               num_retries=self.num_retries)
262
263         for k,v in generatemapper.items():
264             if k.startswith("_:"):
265                 if v.type == "Directory":
266                     continue
267                 if v.type == "CreateFile":
268                     with final.open(v.target, "wb") as f:
269                         f.write(v.resolved.encode("utf-8"))
270                     continue
271
272             if not k.startswith("keep:"):
273                 raise Exception("Output source is not in keep or a literal")
274             sp = k.split("/")
275             srccollection = sp[0][5:]
276             try:
277                 reader = self.collection_cache.get(srccollection)
278                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
279                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
280             except arvados.errors.ArgumentError as e:
281                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
282                 raise
283             except IOError as e:
284                 logger.warn("While preparing output collection: %s", e)
285
286         def rewrite(fileobj):
287             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
288             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
289                 if k in fileobj:
290                     del fileobj[k]
291
292         adjustDirObjs(outputObj, rewrite)
293         adjustFileObjs(outputObj, rewrite)
294
295         with final.open("cwl.output.json", "w") as f:
296             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
297
298         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
299
300         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
301                     final.api_response()["name"],
302                     final.manifest_locator())
303
304         final_uuid = final.manifest_locator()
305         tags = tagsString.split(',')
306         for tag in tags:
307              self.api.links().create(body={
308                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
309                 }).execute(num_retries=self.num_retries)
310
311         def finalcollection(fileobj):
312             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
313
314         adjustDirObjs(outputObj, finalcollection)
315         adjustFileObjs(outputObj, finalcollection)
316
317         return (outputObj, final)
318
319     def set_crunch_output(self):
320         if self.work_api == "containers":
321             try:
322                 current = self.api.containers().current().execute(num_retries=self.num_retries)
323             except ApiError as e:
324                 # Status code 404 just means we're not running in a container.
325                 if e.resp.status != 404:
326                     logger.info("Getting current container: %s", e)
327                 return
328             try:
329                 self.api.containers().update(uuid=current['uuid'],
330                                              body={
331                                                  'output': self.final_output_collection.portable_data_hash(),
332                                              }).execute(num_retries=self.num_retries)
333                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
334                                               body={
335                                                   'is_trashed': True
336                                               }).execute(num_retries=self.num_retries)
337             except Exception as e:
338                 logger.info("Setting container output: %s", e)
339         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
340             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
341                                    body={
342                                        'output': self.final_output_collection.portable_data_hash(),
343                                        'success': self.final_status == "success",
344                                        'progress':1.0
345                                    }).execute(num_retries=self.num_retries)
346
347     def arv_executor(self, tool, job_order, **kwargs):
348         self.debug = kwargs.get("debug")
349
350         tool.visit(self.check_features)
351
352         self.project_uuid = kwargs.get("project_uuid")
353         self.pipeline = None
354         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
355                                                                  collection_cache=self.collection_cache)
356         self.fs_access = make_fs_access(kwargs["basedir"])
357
358
359         self.trash_intermediate = kwargs["trash_intermediate"]
360         if self.trash_intermediate and self.work_api != "containers":
361             raise Exception("--trash-intermediate is only supported with --api=containers.")
362
363         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
364         if self.intermediate_output_ttl and self.work_api != "containers":
365             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
366         if self.intermediate_output_ttl < 0:
367             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
368
369         if not kwargs.get("name"):
370             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
371
372         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
373         # Also uploads docker images.
374         merged_map = upload_workflow_deps(self, tool)
375
376         # Reload tool object which may have been updated by
377         # upload_workflow_deps
378         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
379                                   makeTool=self.arv_make_tool,
380                                   loader=tool.doc_loader,
381                                   avsc_names=tool.doc_schema,
382                                   metadata=tool.metadata)
383
384         # Upload local file references in the job order.
385         job_order = upload_job_order(self, "%s input" % kwargs["name"],
386                                      tool, job_order)
387
388         existing_uuid = kwargs.get("update_workflow")
389         if existing_uuid or kwargs.get("create_workflow"):
390             # Create a pipeline template or workflow record and exit.
391             if self.work_api == "jobs":
392                 tmpl = RunnerTemplate(self, tool, job_order,
393                                       kwargs.get("enable_reuse"),
394                                       uuid=existing_uuid,
395                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
396                                       name=kwargs["name"],
397                                       merged_map=merged_map)
398                 tmpl.save()
399                 # cwltool.main will write our return value to stdout.
400                 return (tmpl.uuid, "success")
401             elif self.work_api == "containers":
402                 return (upload_workflow(self, tool, job_order,
403                                         self.project_uuid,
404                                         uuid=existing_uuid,
405                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
406                                         name=kwargs["name"],
407                                         merged_map=merged_map),
408                         "success")
409
410         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
411         self.eval_timeout = kwargs.get("eval_timeout")
412
413         kwargs["make_fs_access"] = make_fs_access
414         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
415         kwargs["use_container"] = True
416         kwargs["tmpdir_prefix"] = "tmp"
417         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
418
419         if self.work_api == "containers":
420             if self.ignore_docker_for_reuse:
421                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
422             kwargs["outdir"] = "/var/spool/cwl"
423             kwargs["docker_outdir"] = "/var/spool/cwl"
424             kwargs["tmpdir"] = "/tmp"
425             kwargs["docker_tmpdir"] = "/tmp"
426         elif self.work_api == "jobs":
427             if kwargs["priority"] != DEFAULT_PRIORITY:
428                 raise Exception("--priority not implemented for jobs API.")
429             kwargs["outdir"] = "$(task.outdir)"
430             kwargs["docker_outdir"] = "$(task.outdir)"
431             kwargs["tmpdir"] = "$(task.tmpdir)"
432
433         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
434             raise Exception("--priority must be in the range 1..1000.")
435
436         runnerjob = None
437         if kwargs.get("submit"):
438             # Submit a runner job to run the workflow for us.
439             if self.work_api == "containers":
440                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
441                     kwargs["runnerjob"] = tool.tool["id"]
442                     runnerjob = tool.job(job_order,
443                                          self.output_callback,
444                                          **kwargs).next()
445                 else:
446                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
447                                                 self.output_name,
448                                                 self.output_tags,
449                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
450                                                 name=kwargs.get("name"),
451                                                 on_error=kwargs.get("on_error"),
452                                                 submit_runner_image=kwargs.get("submit_runner_image"),
453                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
454                                                 merged_map=merged_map,
455                                                 priority=kwargs.get("priority"))
456             elif self.work_api == "jobs":
457                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
458                                       self.output_name,
459                                       self.output_tags,
460                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
461                                       name=kwargs.get("name"),
462                                       on_error=kwargs.get("on_error"),
463                                       submit_runner_image=kwargs.get("submit_runner_image"),
464                                       merged_map=merged_map)
465         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
466             # Create pipeline for local run
467             self.pipeline = self.api.pipeline_instances().create(
468                 body={
469                     "owner_uuid": self.project_uuid,
470                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
471                     "components": {},
472                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
473             logger.info("Pipeline instance %s", self.pipeline["uuid"])
474
475         if runnerjob and not kwargs.get("wait"):
476             runnerjob.run(wait=kwargs.get("wait"))
477             return (runnerjob.uuid, "success")
478
479         self.poll_api = arvados.api('v1')
480         self.polling_thread = threading.Thread(target=self.poll_states)
481         self.polling_thread.start()
482
483         if runnerjob:
484             jobiter = iter((runnerjob,))
485         else:
486             if "cwl_runner_job" in kwargs:
487                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
488             jobiter = tool.job(job_order,
489                                self.output_callback,
490                                **kwargs)
491
492         try:
493             self.cond.acquire()
494             # Will continue to hold the lock for the duration of this code
495             # except when in cond.wait(), at which point on_message can update
496             # job state and process output callbacks.
497
498             loopperf = Perf(metrics, "jobiter")
499             loopperf.__enter__()
500             for runnable in jobiter:
501                 loopperf.__exit__()
502
503                 if self.stop_polling.is_set():
504                     break
505
506                 if runnable:
507                     with Perf(metrics, "run"):
508                         runnable.run(**kwargs)
509                 else:
510                     if self.processes:
511                         self.cond.wait(1)
512                     else:
513                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
514                         break
515                 loopperf.__enter__()
516             loopperf.__exit__()
517
518             while self.processes:
519                 self.cond.wait(1)
520
521         except UnsupportedRequirement:
522             raise
523         except:
524             if sys.exc_info()[0] is KeyboardInterrupt:
525                 logger.error("Interrupted, marking pipeline as failed")
526             else:
527                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
528             if self.pipeline:
529                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
530                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
531             if runnerjob and runnerjob.uuid and self.work_api == "containers":
532                 self.api.container_requests().update(uuid=runnerjob.uuid,
533                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
534         finally:
535             self.cond.release()
536             self.stop_polling.set()
537             self.polling_thread.join()
538
539         if self.final_status == "UnsupportedRequirement":
540             raise UnsupportedRequirement("Check log for details.")
541
542         if self.final_output is None:
543             raise WorkflowException("Workflow did not return a result.")
544
545         if kwargs.get("submit") and isinstance(runnerjob, Runner):
546             logger.info("Final output collection %s", runnerjob.final_output)
547         else:
548             if self.output_name is None:
549                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
550             if self.output_tags is None:
551                 self.output_tags = ""
552             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
553             self.set_crunch_output()
554
555         if kwargs.get("compute_checksum"):
556             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
557             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
558
559         if self.trash_intermediate and self.final_status == "success":
560             self.trash_intermediate_output()
561
562         return (self.final_output, self.final_status)
563
564
565 def versionstring():
566     """Print version string of key packages for provenance and debugging."""
567
568     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
569     arvpkg = pkg_resources.require("arvados-python-client")
570     cwlpkg = pkg_resources.require("cwltool")
571
572     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
573                                     "arvados-python-client", arvpkg[0].version,
574                                     "cwltool", cwlpkg[0].version)
575
576
577 def arg_parser():  # type: () -> argparse.ArgumentParser
578     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
579
580     parser.add_argument("--basedir", type=str,
581                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
582     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
583                         help="Output directory, default current directory")
584
585     parser.add_argument("--eval-timeout",
586                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
587                         type=float,
588                         default=20)
589
590     exgroup = parser.add_mutually_exclusive_group()
591     exgroup.add_argument("--print-dot", action="store_true",
592                          help="Print workflow visualization in graphviz format and exit")
593     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
594     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
595
596     exgroup = parser.add_mutually_exclusive_group()
597     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
598     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
599     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
600
601     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
602
603     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
604
605     exgroup = parser.add_mutually_exclusive_group()
606     exgroup.add_argument("--enable-reuse", action="store_true",
607                         default=True, dest="enable_reuse",
608                         help="Enable job or container reuse (default)")
609     exgroup.add_argument("--disable-reuse", action="store_false",
610                         default=True, dest="enable_reuse",
611                         help="Disable job or container reuse")
612
613     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
614     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
615     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
616     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
617                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
618                         default=False)
619
620     exgroup = parser.add_mutually_exclusive_group()
621     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
622                         default=True, dest="submit")
623     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
624                         default=True, dest="submit")
625     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
626                          dest="create_workflow")
627     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
628     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
629
630     exgroup = parser.add_mutually_exclusive_group()
631     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
632                         default=True, dest="wait")
633     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
634                         default=True, dest="wait")
635
636     exgroup = parser.add_mutually_exclusive_group()
637     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
638                         default=True, dest="log_timestamps")
639     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
640                         default=True, dest="log_timestamps")
641
642     parser.add_argument("--api", type=str,
643                         default=None, dest="work_api",
644                         choices=("jobs", "containers"),
645                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
646
647     parser.add_argument("--compute-checksum", action="store_true", default=False,
648                         help="Compute checksum of contents while collecting outputs",
649                         dest="compute_checksum")
650
651     parser.add_argument("--submit-runner-ram", type=int,
652                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
653                         default=1024)
654
655     parser.add_argument("--submit-runner-image", type=str,
656                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
657                         default=None)
658
659     parser.add_argument("--name", type=str,
660                         help="Name to use for workflow execution instance.",
661                         default=None)
662
663     parser.add_argument("--on-error", type=str,
664                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
665                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
666
667     parser.add_argument("--enable-dev", action="store_true",
668                         help="Enable loading and running development versions "
669                              "of CWL spec.", default=False)
670
671     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
672                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
673                         default=0)
674
675     parser.add_argument("--priority", type=int,
676                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
677                         default=DEFAULT_PRIORITY)
678
679     exgroup = parser.add_mutually_exclusive_group()
680     exgroup.add_argument("--trash-intermediate", action="store_true",
681                         default=False, dest="trash_intermediate",
682                          help="Immediately trash intermediate outputs on workflow success.")
683     exgroup.add_argument("--no-trash-intermediate", action="store_false",
684                         default=False, dest="trash_intermediate",
685                         help="Do not trash intermediate outputs (default).")
686
687     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
688     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
689
690     return parser
691
692 def add_arv_hints():
693     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
694     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
695     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
696     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
697     res.close()
698     cwltool.process.supportedProcessRequirements.extend([
699         "http://arvados.org/cwl#RunInSingleContainer",
700         "http://arvados.org/cwl#OutputDirType",
701         "http://arvados.org/cwl#RuntimeConstraints",
702         "http://arvados.org/cwl#PartitionRequirement",
703         "http://arvados.org/cwl#APIRequirement",
704         "http://commonwl.org/cwltool#LoadListingRequirement",
705         "http://arvados.org/cwl#IntermediateOutput",
706         "http://arvados.org/cwl#ReuseRequirement"
707     ])
708
709 def main(args, stdout, stderr, api_client=None, keep_client=None):
710     parser = arg_parser()
711
712     job_order_object = None
713     arvargs = parser.parse_args(args)
714
715     if arvargs.update_workflow:
716         if arvargs.update_workflow.find('-7fd4e-') == 5:
717             want_api = 'containers'
718         elif arvargs.update_workflow.find('-p5p6p-') == 5:
719             want_api = 'jobs'
720         else:
721             want_api = None
722         if want_api and arvargs.work_api and want_api != arvargs.work_api:
723             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
724                 arvargs.update_workflow, want_api, arvargs.work_api))
725             return 1
726         arvargs.work_api = want_api
727
728     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
729         job_order_object = ({}, "")
730
731     add_arv_hints()
732
733     try:
734         if api_client is None:
735             api_client=arvados.api('v1', model=OrderedJsonModel())
736         if keep_client is None:
737             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
738         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
739                               num_retries=4, output_name=arvargs.output_name,
740                               output_tags=arvargs.output_tags)
741     except Exception as e:
742         logger.error(e)
743         return 1
744
745     if arvargs.debug:
746         logger.setLevel(logging.DEBUG)
747         logging.getLogger('arvados').setLevel(logging.DEBUG)
748
749     if arvargs.quiet:
750         logger.setLevel(logging.WARN)
751         logging.getLogger('arvados').setLevel(logging.WARN)
752         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
753
754     if arvargs.metrics:
755         metrics.setLevel(logging.DEBUG)
756         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
757
758     if arvargs.log_timestamps:
759         arvados.log_handler.setFormatter(logging.Formatter(
760             '%(asctime)s %(name)s %(levelname)s: %(message)s',
761             '%Y-%m-%d %H:%M:%S'))
762     else:
763         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
764
765     arvargs.conformance_test = None
766     arvargs.use_container = True
767     arvargs.relax_path_checks = True
768     arvargs.print_supported_versions = False
769
770     make_fs_access = partial(CollectionFsAccess,
771                            collection_cache=runner.collection_cache)
772
773     return cwltool.main.main(args=arvargs,
774                              stdout=stdout,
775                              stderr=stderr,
776                              executor=runner.arv_executor,
777                              makeTool=runner.arv_make_tool,
778                              versionfunc=versionstring,
779                              job_order_object=job_order_object,
780                              make_fs_access=make_fs_access,
781                              fetcher_constructor=partial(CollectionFetcher,
782                                                          api_client=api_client,
783                                                          fs_access=make_fs_access(""),
784                                                          num_retries=runner.num_retries),
785                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
786                              logger_handler=arvados.log_handler,
787                              custom_schema_callback=add_arv_hints)