13135: Secrets support WIP
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20
21 from cwltool.errors import WorkflowException
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
42
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.command_line_tool import compute_checksums
47 from arvados.api import OrderedJsonModel
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
52
53 arvados.log_handler.setFormatter(logging.Formatter(
54         '%(asctime)s %(name)s %(levelname)s: %(message)s',
55         '%Y-%m-%d %H:%M:%S'))
56
57 DEFAULT_PRIORITY = 500
58
59 class ArvCwlRunner(object):
60     """Execute a CWL tool or workflow, submit work (using either jobs or
61     containers API), wait for them to complete, and report output.
62
63     """
64
65     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
66         self.api = api_client
67         self.processes = {}
68         self.lock = threading.Lock()
69         self.cond = threading.Condition(self.lock)
70         self.final_output = None
71         self.final_status = None
72         self.uploaded = {}
73         self.num_retries = num_retries
74         self.uuid = None
75         self.stop_polling = threading.Event()
76         self.poll_api = None
77         self.pipeline = None
78         self.final_output_collection = None
79         self.output_name = output_name
80         self.output_tags = output_tags
81         self.project_uuid = None
82         self.intermediate_output_ttl = 0
83         self.intermediate_output_collections = []
84         self.trash_intermediate = False
85
86         if keep_client is not None:
87             self.keep_client = keep_client
88         else:
89             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
90
91         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
92
93         self.work_api = None
94         expected_api = ["jobs", "containers"]
95         for api in expected_api:
96             try:
97                 methods = self.api._rootDesc.get('resources')[api]['methods']
98                 if ('httpMethod' in methods['create'] and
99                     (work_api == api or work_api is None)):
100                     self.work_api = api
101                     break
102             except KeyError:
103                 pass
104
105         if not self.work_api:
106             if work_api is None:
107                 raise Exception("No supported APIs")
108             else:
109                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
110
111     def arv_make_tool(self, toolpath_object, **kwargs):
112         kwargs["work_api"] = self.work_api
113         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
114                                                 api_client=self.api,
115                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
116                                                 num_retries=self.num_retries)
117         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
118         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
119             return ArvadosCommandTool(self, toolpath_object, **kwargs)
120         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
121             return ArvadosWorkflow(self, toolpath_object, **kwargs)
122         else:
123             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
124
125     def output_callback(self, out, processStatus):
126         if processStatus == "success":
127             logger.info("Overall process status is %s", processStatus)
128             if self.pipeline:
129                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
131         else:
132             logger.warn("Overall process status is %s", processStatus)
133             if self.pipeline:
134                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
135                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
136         self.final_status = processStatus
137         self.final_output = out
138
139     def on_message(self, event):
140         if "object_uuid" in event:
141             if event["object_uuid"] in self.processes and event["event_type"] == "update":
142                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
143                     uuid = event["object_uuid"]
144                     with self.lock:
145                         j = self.processes[uuid]
146                         logger.info("%s %s is Running", self.label(j), uuid)
147                         j.running = True
148                         j.update_pipeline_component(event["properties"]["new_attributes"])
149                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
150                     uuid = event["object_uuid"]
151                     try:
152                         self.cond.acquire()
153                         j = self.processes[uuid]
154                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
155                         with Perf(metrics, "done %s" % j.name):
156                             j.done(event["properties"]["new_attributes"])
157                         self.cond.notify()
158                     finally:
159                         self.cond.release()
160
161     def label(self, obj):
162         return "[%s %s]" % (self.work_api[0:-1], obj.name)
163
164     def poll_states(self):
165         """Poll status of jobs or containers listed in the processes dict.
166
167         Runs in a separate thread.
168         """
169
170         try:
171             while True:
172                 self.stop_polling.wait(15)
173                 if self.stop_polling.is_set():
174                     break
175                 with self.lock:
176                     keys = self.processes.keys()
177                 if not keys:
178                     continue
179
180                 if self.work_api == "containers":
181                     table = self.poll_api.container_requests()
182                 elif self.work_api == "jobs":
183                     table = self.poll_api.jobs()
184
185                 try:
186                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
187                 except Exception as e:
188                     logger.warn("Error checking states on API server: %s", e)
189                     continue
190
191                 for p in proc_states["items"]:
192                     self.on_message({
193                         "object_uuid": p["uuid"],
194                         "event_type": "update",
195                         "properties": {
196                             "new_attributes": p
197                         }
198                     })
199         except:
200             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
201             self.cond.acquire()
202             self.processes.clear()
203             self.cond.notify()
204             self.cond.release()
205         finally:
206             self.stop_polling.set()
207
208     def get_uploaded(self):
209         return self.uploaded.copy()
210
211     def add_uploaded(self, src, pair):
212         self.uploaded[src] = pair
213
214     def add_intermediate_output(self, uuid):
215         if uuid:
216             self.intermediate_output_collections.append(uuid)
217
218     def trash_intermediate_output(self):
219         logger.info("Cleaning up intermediate output collections")
220         for i in self.intermediate_output_collections:
221             try:
222                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
223             except:
224                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
225             if sys.exc_info()[0] is KeyboardInterrupt:
226                 break
227
228     def check_features(self, obj):
229         if isinstance(obj, dict):
230             if obj.get("writable") and self.work_api != "containers":
231                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
232             if obj.get("class") == "DockerRequirement":
233                 if obj.get("dockerOutputDirectory"):
234                     if self.work_api != "containers":
235                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
236                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
237                     if not obj.get("dockerOutputDirectory").startswith('/'):
238                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
239                             "Option 'dockerOutputDirectory' must be an absolute path.")
240             for v in obj.itervalues():
241                 self.check_features(v)
242         elif isinstance(obj, list):
243             for i,v in enumerate(obj):
244                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
245                     self.check_features(v)
246
247     def make_output_collection(self, name, tagsString, outputObj):
248         outputObj = copy.deepcopy(outputObj)
249
250         files = []
251         def capture(fileobj):
252             files.append(fileobj)
253
254         adjustDirObjs(outputObj, capture)
255         adjustFileObjs(outputObj, capture)
256
257         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
258
259         final = arvados.collection.Collection(api_client=self.api,
260                                               keep_client=self.keep_client,
261                                               num_retries=self.num_retries)
262
263         for k,v in generatemapper.items():
264             if k.startswith("_:"):
265                 if v.type == "Directory":
266                     continue
267                 if v.type == "CreateFile":
268                     with final.open(v.target, "wb") as f:
269                         f.write(v.resolved.encode("utf-8"))
270                     continue
271
272             if not k.startswith("keep:"):
273                 raise Exception("Output source is not in keep or a literal")
274             sp = k.split("/")
275             srccollection = sp[0][5:]
276             try:
277                 reader = self.collection_cache.get(srccollection)
278                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
279                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
280             except arvados.errors.ArgumentError as e:
281                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
282                 raise
283             except IOError as e:
284                 logger.warn("While preparing output collection: %s", e)
285
286         def rewrite(fileobj):
287             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
288             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
289                 if k in fileobj:
290                     del fileobj[k]
291
292         adjustDirObjs(outputObj, rewrite)
293         adjustFileObjs(outputObj, rewrite)
294
295         with final.open("cwl.output.json", "w") as f:
296             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
297
298         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
299
300         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
301                     final.api_response()["name"],
302                     final.manifest_locator())
303
304         final_uuid = final.manifest_locator()
305         tags = tagsString.split(',')
306         for tag in tags:
307              self.api.links().create(body={
308                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
309                 }).execute(num_retries=self.num_retries)
310
311         def finalcollection(fileobj):
312             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
313
314         adjustDirObjs(outputObj, finalcollection)
315         adjustFileObjs(outputObj, finalcollection)
316
317         return (outputObj, final)
318
319     def set_crunch_output(self):
320         if self.work_api == "containers":
321             try:
322                 current = self.api.containers().current().execute(num_retries=self.num_retries)
323             except ApiError as e:
324                 # Status code 404 just means we're not running in a container.
325                 if e.resp.status != 404:
326                     logger.info("Getting current container: %s", e)
327                 return
328             try:
329                 self.api.containers().update(uuid=current['uuid'],
330                                              body={
331                                                  'output': self.final_output_collection.portable_data_hash(),
332                                              }).execute(num_retries=self.num_retries)
333                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
334                                               body={
335                                                   'is_trashed': True
336                                               }).execute(num_retries=self.num_retries)
337             except Exception as e:
338                 logger.info("Setting container output: %s", e)
339         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
340             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
341                                    body={
342                                        'output': self.final_output_collection.portable_data_hash(),
343                                        'success': self.final_status == "success",
344                                        'progress':1.0
345                                    }).execute(num_retries=self.num_retries)
346
347     def arv_executor(self, tool, job_order, **kwargs):
348         self.debug = kwargs.get("debug")
349
350         tool.visit(self.check_features)
351
352         self.project_uuid = kwargs.get("project_uuid")
353         self.pipeline = None
354         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
355                                                                  collection_cache=self.collection_cache)
356         self.fs_access = make_fs_access(kwargs["basedir"])
357
358
359         self.trash_intermediate = kwargs["trash_intermediate"]
360         if self.trash_intermediate and self.work_api != "containers":
361             raise Exception("--trash-intermediate is only supported with --api=containers.")
362
363         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
364         if self.intermediate_output_ttl and self.work_api != "containers":
365             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
366         if self.intermediate_output_ttl < 0:
367             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
368
369         if not kwargs.get("name"):
370             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
371
372         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
373         # Also uploads docker images.
374         merged_map = upload_workflow_deps(self, tool)
375
376         # Reload tool object which may have been updated by
377         # upload_workflow_deps
378         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
379                                   makeTool=self.arv_make_tool,
380                                   loader=tool.doc_loader,
381                                   avsc_names=tool.doc_schema,
382                                   metadata=tool.metadata)
383
384         # Upload local file references in the job order.
385         job_order = upload_job_order(self, "%s input" % kwargs["name"],
386                                      tool, job_order)
387
388         existing_uuid = kwargs.get("update_workflow")
389         if existing_uuid or kwargs.get("create_workflow"):
390             # Create a pipeline template or workflow record and exit.
391             if self.work_api == "jobs":
392                 tmpl = RunnerTemplate(self, tool, job_order,
393                                       kwargs.get("enable_reuse"),
394                                       uuid=existing_uuid,
395                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
396                                       name=kwargs["name"],
397                                       merged_map=merged_map)
398                 tmpl.save()
399                 # cwltool.main will write our return value to stdout.
400                 return (tmpl.uuid, "success")
401             elif self.work_api == "containers":
402                 return (upload_workflow(self, tool, job_order,
403                                         self.project_uuid,
404                                         uuid=existing_uuid,
405                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
406                                         name=kwargs["name"],
407                                         merged_map=merged_map),
408                         "success")
409
410         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
411         self.eval_timeout = kwargs.get("eval_timeout")
412
413         kwargs["make_fs_access"] = make_fs_access
414         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
415         kwargs["use_container"] = True
416         kwargs["tmpdir_prefix"] = "tmp"
417         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
418
419         if self.work_api == "containers":
420             if self.ignore_docker_for_reuse:
421                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
422             kwargs["outdir"] = "/var/spool/cwl"
423             kwargs["docker_outdir"] = "/var/spool/cwl"
424             kwargs["tmpdir"] = "/tmp"
425             kwargs["docker_tmpdir"] = "/tmp"
426         elif self.work_api == "jobs":
427             if kwargs["priority"] != DEFAULT_PRIORITY:
428                 raise Exception("--priority not implemented for jobs API.")
429             kwargs["outdir"] = "$(task.outdir)"
430             kwargs["docker_outdir"] = "$(task.outdir)"
431             kwargs["tmpdir"] = "$(task.tmpdir)"
432
433         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
434             raise Exception("--priority must be in the range 1..1000.")
435
436         runnerjob = None
437         if kwargs.get("submit"):
438             # Submit a runner job to run the workflow for us.
439             if self.work_api == "containers":
440                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
441                     kwargs["runnerjob"] = tool.tool["id"]
442                     runnerjob = tool.job(job_order,
443                                          self.output_callback,
444                                          **kwargs).next()
445                 else:
446                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
447                                                 self.output_name,
448                                                 self.output_tags,
449                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
450                                                 name=kwargs.get("name"),
451                                                 on_error=kwargs.get("on_error"),
452                                                 submit_runner_image=kwargs.get("submit_runner_image"),
453                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
454                                                 merged_map=merged_map,
455                                                 priority=kwargs.get("priority"),
456                                                 secret_store=kwargs.get("secret_store"))
457             elif self.work_api == "jobs":
458                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
459                                       self.output_name,
460                                       self.output_tags,
461                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
462                                       name=kwargs.get("name"),
463                                       on_error=kwargs.get("on_error"),
464                                       submit_runner_image=kwargs.get("submit_runner_image"),
465                                       merged_map=merged_map)
466         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
467             # Create pipeline for local run
468             self.pipeline = self.api.pipeline_instances().create(
469                 body={
470                     "owner_uuid": self.project_uuid,
471                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
472                     "components": {},
473                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
474             logger.info("Pipeline instance %s", self.pipeline["uuid"])
475
476         if runnerjob and not kwargs.get("wait"):
477             runnerjob.run(wait=kwargs.get("wait"))
478             return (runnerjob.uuid, "success")
479
480         self.poll_api = arvados.api('v1')
481         self.polling_thread = threading.Thread(target=self.poll_states)
482         self.polling_thread.start()
483
484         if runnerjob:
485             jobiter = iter((runnerjob,))
486         else:
487             if "cwl_runner_job" in kwargs:
488                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
489             jobiter = tool.job(job_order,
490                                self.output_callback,
491                                **kwargs)
492
493         try:
494             self.cond.acquire()
495             # Will continue to hold the lock for the duration of this code
496             # except when in cond.wait(), at which point on_message can update
497             # job state and process output callbacks.
498
499             loopperf = Perf(metrics, "jobiter")
500             loopperf.__enter__()
501             for runnable in jobiter:
502                 loopperf.__exit__()
503
504                 if self.stop_polling.is_set():
505                     break
506
507                 if runnable:
508                     with Perf(metrics, "run"):
509                         runnable.run(**kwargs)
510                 else:
511                     if self.processes:
512                         self.cond.wait(1)
513                     else:
514                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
515                         break
516                 loopperf.__enter__()
517             loopperf.__exit__()
518
519             while self.processes:
520                 self.cond.wait(1)
521
522         except UnsupportedRequirement:
523             raise
524         except:
525             if sys.exc_info()[0] is KeyboardInterrupt:
526                 logger.error("Interrupted, marking pipeline as failed")
527             else:
528                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
529             if self.pipeline:
530                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
531                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
532             if runnerjob and runnerjob.uuid and self.work_api == "containers":
533                 self.api.container_requests().update(uuid=runnerjob.uuid,
534                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
535         finally:
536             self.cond.release()
537             self.stop_polling.set()
538             self.polling_thread.join()
539
540         if self.final_status == "UnsupportedRequirement":
541             raise UnsupportedRequirement("Check log for details.")
542
543         if self.final_output is None:
544             raise WorkflowException("Workflow did not return a result.")
545
546         if kwargs.get("submit") and isinstance(runnerjob, Runner):
547             logger.info("Final output collection %s", runnerjob.final_output)
548         else:
549             if self.output_name is None:
550                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
551             if self.output_tags is None:
552                 self.output_tags = ""
553             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
554             self.set_crunch_output()
555
556         if kwargs.get("compute_checksum"):
557             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
558             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
559
560         if self.trash_intermediate and self.final_status == "success":
561             self.trash_intermediate_output()
562
563         return (self.final_output, self.final_status)
564
565
566 def versionstring():
567     """Print version string of key packages for provenance and debugging."""
568
569     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
570     arvpkg = pkg_resources.require("arvados-python-client")
571     cwlpkg = pkg_resources.require("cwltool")
572
573     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
574                                     "arvados-python-client", arvpkg[0].version,
575                                     "cwltool", cwlpkg[0].version)
576
577
578 def arg_parser():  # type: () -> argparse.ArgumentParser
579     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
580
581     parser.add_argument("--basedir", type=str,
582                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
583     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
584                         help="Output directory, default current directory")
585
586     parser.add_argument("--eval-timeout",
587                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
588                         type=float,
589                         default=20)
590
591     exgroup = parser.add_mutually_exclusive_group()
592     exgroup.add_argument("--print-dot", action="store_true",
593                          help="Print workflow visualization in graphviz format and exit")
594     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
595     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
596
597     exgroup = parser.add_mutually_exclusive_group()
598     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
599     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
600     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
601
602     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
603
604     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
605
606     exgroup = parser.add_mutually_exclusive_group()
607     exgroup.add_argument("--enable-reuse", action="store_true",
608                         default=True, dest="enable_reuse",
609                         help="Enable job or container reuse (default)")
610     exgroup.add_argument("--disable-reuse", action="store_false",
611                         default=True, dest="enable_reuse",
612                         help="Disable job or container reuse")
613
614     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
615     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
616     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
617     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
618                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
619                         default=False)
620
621     exgroup = parser.add_mutually_exclusive_group()
622     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
623                         default=True, dest="submit")
624     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
625                         default=True, dest="submit")
626     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
627                          dest="create_workflow")
628     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
629     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
630
631     exgroup = parser.add_mutually_exclusive_group()
632     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
633                         default=True, dest="wait")
634     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
635                         default=True, dest="wait")
636
637     exgroup = parser.add_mutually_exclusive_group()
638     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
639                         default=True, dest="log_timestamps")
640     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
641                         default=True, dest="log_timestamps")
642
643     parser.add_argument("--api", type=str,
644                         default=None, dest="work_api",
645                         choices=("jobs", "containers"),
646                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
647
648     parser.add_argument("--compute-checksum", action="store_true", default=False,
649                         help="Compute checksum of contents while collecting outputs",
650                         dest="compute_checksum")
651
652     parser.add_argument("--submit-runner-ram", type=int,
653                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
654                         default=1024)
655
656     parser.add_argument("--submit-runner-image", type=str,
657                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
658                         default=None)
659
660     parser.add_argument("--name", type=str,
661                         help="Name to use for workflow execution instance.",
662                         default=None)
663
664     parser.add_argument("--on-error", type=str,
665                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
666                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
667
668     parser.add_argument("--enable-dev", action="store_true",
669                         help="Enable loading and running development versions "
670                              "of CWL spec.", default=False)
671
672     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
673                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
674                         default=0)
675
676     parser.add_argument("--priority", type=int,
677                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
678                         default=DEFAULT_PRIORITY)
679
680     exgroup = parser.add_mutually_exclusive_group()
681     exgroup.add_argument("--trash-intermediate", action="store_true",
682                         default=False, dest="trash_intermediate",
683                          help="Immediately trash intermediate outputs on workflow success.")
684     exgroup.add_argument("--no-trash-intermediate", action="store_false",
685                         default=False, dest="trash_intermediate",
686                         help="Do not trash intermediate outputs (default).")
687
688     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
689     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
690
691     return parser
692
693 def add_arv_hints():
694     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
695     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
696     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
697     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
698     res.close()
699     cwltool.process.supportedProcessRequirements.extend([
700         "http://arvados.org/cwl#RunInSingleContainer",
701         "http://arvados.org/cwl#OutputDirType",
702         "http://arvados.org/cwl#RuntimeConstraints",
703         "http://arvados.org/cwl#PartitionRequirement",
704         "http://arvados.org/cwl#APIRequirement",
705         "http://commonwl.org/cwltool#LoadListingRequirement",
706         "http://arvados.org/cwl#IntermediateOutput",
707         "http://arvados.org/cwl#ReuseRequirement"
708     ])
709
710 def main(args, stdout, stderr, api_client=None, keep_client=None):
711     parser = arg_parser()
712
713     job_order_object = None
714     arvargs = parser.parse_args(args)
715
716     if arvargs.update_workflow:
717         if arvargs.update_workflow.find('-7fd4e-') == 5:
718             want_api = 'containers'
719         elif arvargs.update_workflow.find('-p5p6p-') == 5:
720             want_api = 'jobs'
721         else:
722             want_api = None
723         if want_api and arvargs.work_api and want_api != arvargs.work_api:
724             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
725                 arvargs.update_workflow, want_api, arvargs.work_api))
726             return 1
727         arvargs.work_api = want_api
728
729     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
730         job_order_object = ({}, "")
731
732     add_arv_hints()
733
734     try:
735         if api_client is None:
736             api_client=arvados.api('v1', model=OrderedJsonModel())
737         if keep_client is None:
738             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
739         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
740                               num_retries=4, output_name=arvargs.output_name,
741                               output_tags=arvargs.output_tags)
742     except Exception as e:
743         logger.error(e)
744         return 1
745
746     if arvargs.debug:
747         logger.setLevel(logging.DEBUG)
748         logging.getLogger('arvados').setLevel(logging.DEBUG)
749
750     if arvargs.quiet:
751         logger.setLevel(logging.WARN)
752         logging.getLogger('arvados').setLevel(logging.WARN)
753         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
754
755     if arvargs.metrics:
756         metrics.setLevel(logging.DEBUG)
757         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
758
759     if arvargs.log_timestamps:
760         arvados.log_handler.setFormatter(logging.Formatter(
761             '%(asctime)s %(name)s %(levelname)s: %(message)s',
762             '%Y-%m-%d %H:%M:%S'))
763     else:
764         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
765
766     arvargs.conformance_test = None
767     arvargs.use_container = True
768     arvargs.relax_path_checks = True
769     arvargs.print_supported_versions = False
770
771     make_fs_access = partial(CollectionFsAccess,
772                            collection_cache=runner.collection_cache)
773
774     return cwltool.main.main(args=arvargs,
775                              stdout=stdout,
776                              stderr=stderr,
777                              executor=runner.arv_executor,
778                              makeTool=runner.arv_make_tool,
779                              versionfunc=versionstring,
780                              job_order_object=job_order_object,
781                              make_fs_access=make_fs_access,
782                              fetcher_constructor=partial(CollectionFetcher,
783                                                          api_client=api_client,
784                                                          fs_access=make_fs_access(""),
785                                                          num_retries=runner.num_retries),
786                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
787                              logger_handler=arvados.log_handler,
788                              custom_schema_callback=add_arv_hints)