Fix a typo in Crunch Dispatch documentation. Arvados-DCO-1.1-Signed-off-by: Pavel...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31
32 import arvados
33 import arvados.config
34 from arvados.keep import KeepClient
35 from arvados.errors import ApiError
36 import arvados.commands._util as arv_cmd
37
38 from .arvcontainer import ArvadosContainer, RunnerContainer
39 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
40 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
41 from .arvtool import ArvadosCommandTool
42 from .arvworkflow import ArvadosWorkflow, upload_workflow
43 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
44 from .perf import Perf
45 from .pathmapper import NoFollowPathMapper
46 from .task_queue import TaskQueue
47 from ._version import __version__
48
49 from cwltool.pack import pack
50 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
51 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
52 from cwltool.command_line_tool import compute_checksums
53 from arvados.api import OrderedJsonModel
54
55 logger = logging.getLogger('arvados.cwl-runner')
56 metrics = logging.getLogger('arvados.cwl-runner.metrics')
57 logger.setLevel(logging.INFO)
58
59 arvados.log_handler.setFormatter(logging.Formatter(
60         '%(asctime)s %(name)s %(levelname)s: %(message)s',
61         '%Y-%m-%d %H:%M:%S'))
62
63 DEFAULT_PRIORITY = 500
64
65 class ArvCwlRunner(object):
66     """Execute a CWL tool or workflow, submit work (using either jobs or
67     containers API), wait for them to complete, and report output.
68
69     """
70
71     def __init__(self, api_client, work_api=None, keep_client=None,
72                  output_name=None, output_tags=None, num_retries=4,
73                  thread_count=4):
74         self.api = api_client
75         self.processes = {}
76         self.workflow_eval_lock = threading.Condition(threading.RLock())
77         self.final_output = None
78         self.final_status = None
79         self.uploaded = {}
80         self.num_retries = num_retries
81         self.uuid = None
82         self.stop_polling = threading.Event()
83         self.poll_api = None
84         self.pipeline = None
85         self.final_output_collection = None
86         self.output_name = output_name
87         self.output_tags = output_tags
88         self.project_uuid = None
89         self.intermediate_output_ttl = 0
90         self.intermediate_output_collections = []
91         self.trash_intermediate = False
92         self.thread_count = thread_count
93         self.poll_interval = 12
94
95         if keep_client is not None:
96             self.keep_client = keep_client
97         else:
98             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
99
100         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
101
102         self.work_api = None
103         expected_api = ["jobs", "containers"]
104         for api in expected_api:
105             try:
106                 methods = self.api._rootDesc.get('resources')[api]['methods']
107                 if ('httpMethod' in methods['create'] and
108                     (work_api == api or work_api is None)):
109                     self.work_api = api
110                     break
111             except KeyError:
112                 pass
113
114         if not self.work_api:
115             if work_api is None:
116                 raise Exception("No supported APIs")
117             else:
118                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
119
120     def arv_make_tool(self, toolpath_object, **kwargs):
121         kwargs["work_api"] = self.work_api
122         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
123                                                 api_client=self.api,
124                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
125                                                 num_retries=self.num_retries)
126         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
127         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
128             return ArvadosCommandTool(self, toolpath_object, **kwargs)
129         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
130             return ArvadosWorkflow(self, toolpath_object, **kwargs)
131         else:
132             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
133
134     def output_callback(self, out, processStatus):
135         with self.workflow_eval_lock:
136             if processStatus == "success":
137                 logger.info("Overall process status is %s", processStatus)
138                 if self.pipeline:
139                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
140                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
141             else:
142                 logger.warn("Overall process status is %s", processStatus)
143                 if self.pipeline:
144                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
145                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
146             self.final_status = processStatus
147             self.final_output = out
148             self.workflow_eval_lock.notifyAll()
149
150
151     def start_run(self, runnable, kwargs):
152         self.task_queue.add(partial(runnable.run, **kwargs))
153
154     def process_submitted(self, container):
155         with self.workflow_eval_lock:
156             self.processes[container.uuid] = container
157
158     def process_done(self, uuid):
159         with self.workflow_eval_lock:
160             if uuid in self.processes:
161                 del self.processes[uuid]
162
163     def wrapped_callback(self, cb, obj, st):
164         with self.workflow_eval_lock:
165             cb(obj, st)
166             self.workflow_eval_lock.notifyAll()
167
168     def get_wrapped_callback(self, cb):
169         return partial(self.wrapped_callback, cb)
170
171     def on_message(self, event):
172         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
173             uuid = event["object_uuid"]
174             if event["properties"]["new_attributes"]["state"] == "Running":
175                 with self.workflow_eval_lock:
176                     j = self.processes[uuid]
177                     if j.running is False:
178                         j.running = True
179                         j.update_pipeline_component(event["properties"]["new_attributes"])
180                         logger.info("%s %s is Running", self.label(j), uuid)
181             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
182                 with self.workflow_eval_lock:
183                     j = self.processes[uuid]
184                 self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
185                 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
186
187     def label(self, obj):
188         return "[%s %s]" % (self.work_api[0:-1], obj.name)
189
190     def poll_states(self):
191         """Poll status of jobs or containers listed in the processes dict.
192
193         Runs in a separate thread.
194         """
195
196         try:
197             remain_wait = self.poll_interval
198             while True:
199                 if remain_wait > 0:
200                     self.stop_polling.wait(remain_wait)
201                 if self.stop_polling.is_set():
202                     break
203                 with self.workflow_eval_lock:
204                     keys = list(self.processes.keys())
205                 if not keys:
206                     remain_wait = self.poll_interval
207                     continue
208
209                 begin_poll = time.time()
210                 if self.work_api == "containers":
211                     table = self.poll_api.container_requests()
212                 elif self.work_api == "jobs":
213                     table = self.poll_api.jobs()
214
215                 try:
216                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
217                 except Exception as e:
218                     logger.warn("Error checking states on API server: %s", e)
219                     remain_wait = self.poll_interval
220                     continue
221
222                 for p in proc_states["items"]:
223                     self.on_message({
224                         "object_uuid": p["uuid"],
225                         "event_type": "update",
226                         "properties": {
227                             "new_attributes": p
228                         }
229                     })
230                 finish_poll = time.time()
231                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
232         except:
233             logger.exception("Fatal error in state polling thread.")
234             with self.workflow_eval_lock:
235                 self.processes.clear()
236                 self.workflow_eval_lock.notifyAll()
237         finally:
238             self.stop_polling.set()
239
240     def get_uploaded(self):
241         return self.uploaded.copy()
242
243     def add_uploaded(self, src, pair):
244         self.uploaded[src] = pair
245
246     def add_intermediate_output(self, uuid):
247         if uuid:
248             self.intermediate_output_collections.append(uuid)
249
250     def trash_intermediate_output(self):
251         logger.info("Cleaning up intermediate output collections")
252         for i in self.intermediate_output_collections:
253             try:
254                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
255             except:
256                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
257             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
258                 break
259
260     def check_features(self, obj):
261         if isinstance(obj, dict):
262             if obj.get("writable") and self.work_api != "containers":
263                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
264             if obj.get("class") == "DockerRequirement":
265                 if obj.get("dockerOutputDirectory"):
266                     if self.work_api != "containers":
267                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
268                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
269                     if not obj.get("dockerOutputDirectory").startswith('/'):
270                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
271                             "Option 'dockerOutputDirectory' must be an absolute path.")
272             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
273                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
274             for v in obj.itervalues():
275                 self.check_features(v)
276         elif isinstance(obj, list):
277             for i,v in enumerate(obj):
278                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
279                     self.check_features(v)
280
281     def make_output_collection(self, name, tagsString, outputObj):
282         outputObj = copy.deepcopy(outputObj)
283
284         files = []
285         def capture(fileobj):
286             files.append(fileobj)
287
288         adjustDirObjs(outputObj, capture)
289         adjustFileObjs(outputObj, capture)
290
291         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
292
293         final = arvados.collection.Collection(api_client=self.api,
294                                               keep_client=self.keep_client,
295                                               num_retries=self.num_retries)
296
297         for k,v in generatemapper.items():
298             if k.startswith("_:"):
299                 if v.type == "Directory":
300                     continue
301                 if v.type == "CreateFile":
302                     with final.open(v.target, "wb") as f:
303                         f.write(v.resolved.encode("utf-8"))
304                     continue
305
306             if not k.startswith("keep:"):
307                 raise Exception("Output source is not in keep or a literal")
308             sp = k.split("/")
309             srccollection = sp[0][5:]
310             try:
311                 reader = self.collection_cache.get(srccollection)
312                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
313                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
314             except arvados.errors.ArgumentError as e:
315                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
316                 raise
317             except IOError as e:
318                 logger.warn("While preparing output collection: %s", e)
319
320         def rewrite(fileobj):
321             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
322             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
323                 if k in fileobj:
324                     del fileobj[k]
325
326         adjustDirObjs(outputObj, rewrite)
327         adjustFileObjs(outputObj, rewrite)
328
329         with final.open("cwl.output.json", "w") as f:
330             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
331
332         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
333
334         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
335                     final.api_response()["name"],
336                     final.manifest_locator())
337
338         final_uuid = final.manifest_locator()
339         tags = tagsString.split(',')
340         for tag in tags:
341              self.api.links().create(body={
342                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
343                 }).execute(num_retries=self.num_retries)
344
345         def finalcollection(fileobj):
346             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
347
348         adjustDirObjs(outputObj, finalcollection)
349         adjustFileObjs(outputObj, finalcollection)
350
351         return (outputObj, final)
352
353     def set_crunch_output(self):
354         if self.work_api == "containers":
355             try:
356                 current = self.api.containers().current().execute(num_retries=self.num_retries)
357             except ApiError as e:
358                 # Status code 404 just means we're not running in a container.
359                 if e.resp.status != 404:
360                     logger.info("Getting current container: %s", e)
361                 return
362             try:
363                 self.api.containers().update(uuid=current['uuid'],
364                                              body={
365                                                  'output': self.final_output_collection.portable_data_hash(),
366                                              }).execute(num_retries=self.num_retries)
367                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
368                                               body={
369                                                   'is_trashed': True
370                                               }).execute(num_retries=self.num_retries)
371             except Exception as e:
372                 logger.info("Setting container output: %s", e)
373         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
374             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
375                                    body={
376                                        'output': self.final_output_collection.portable_data_hash(),
377                                        'success': self.final_status == "success",
378                                        'progress':1.0
379                                    }).execute(num_retries=self.num_retries)
380
381     def arv_executor(self, tool, job_order, **kwargs):
382         self.debug = kwargs.get("debug")
383
384         tool.visit(self.check_features)
385
386         self.project_uuid = kwargs.get("project_uuid")
387         self.pipeline = None
388         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
389                                                                  collection_cache=self.collection_cache)
390         self.fs_access = make_fs_access(kwargs["basedir"])
391         self.secret_store = kwargs.get("secret_store")
392
393         self.trash_intermediate = kwargs["trash_intermediate"]
394         if self.trash_intermediate and self.work_api != "containers":
395             raise Exception("--trash-intermediate is only supported with --api=containers.")
396
397         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
398         if self.intermediate_output_ttl and self.work_api != "containers":
399             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
400         if self.intermediate_output_ttl < 0:
401             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
402
403         if not kwargs.get("name"):
404             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
405
406         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
407         # Also uploads docker images.
408         merged_map = upload_workflow_deps(self, tool)
409
410         # Reload tool object which may have been updated by
411         # upload_workflow_deps
412         # Don't validate this time because it will just print redundant errors.
413         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
414                                   makeTool=self.arv_make_tool,
415                                   loader=tool.doc_loader,
416                                   avsc_names=tool.doc_schema,
417                                   metadata=tool.metadata,
418                                   do_validate=False)
419
420         # Upload local file references in the job order.
421         job_order = upload_job_order(self, "%s input" % kwargs["name"],
422                                      tool, job_order)
423
424         existing_uuid = kwargs.get("update_workflow")
425         if existing_uuid or kwargs.get("create_workflow"):
426             # Create a pipeline template or workflow record and exit.
427             if self.work_api == "jobs":
428                 tmpl = RunnerTemplate(self, tool, job_order,
429                                       kwargs.get("enable_reuse"),
430                                       uuid=existing_uuid,
431                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
432                                       name=kwargs["name"],
433                                       merged_map=merged_map)
434                 tmpl.save()
435                 # cwltool.main will write our return value to stdout.
436                 return (tmpl.uuid, "success")
437             elif self.work_api == "containers":
438                 return (upload_workflow(self, tool, job_order,
439                                         self.project_uuid,
440                                         uuid=existing_uuid,
441                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
442                                         name=kwargs["name"],
443                                         merged_map=merged_map),
444                         "success")
445
446         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
447         self.eval_timeout = kwargs.get("eval_timeout")
448
449         kwargs["make_fs_access"] = make_fs_access
450         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
451         kwargs["use_container"] = True
452         kwargs["tmpdir_prefix"] = "tmp"
453         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
454
455         if self.work_api == "containers":
456             if self.ignore_docker_for_reuse:
457                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
458             kwargs["outdir"] = "/var/spool/cwl"
459             kwargs["docker_outdir"] = "/var/spool/cwl"
460             kwargs["tmpdir"] = "/tmp"
461             kwargs["docker_tmpdir"] = "/tmp"
462         elif self.work_api == "jobs":
463             if kwargs["priority"] != DEFAULT_PRIORITY:
464                 raise Exception("--priority not implemented for jobs API.")
465             kwargs["outdir"] = "$(task.outdir)"
466             kwargs["docker_outdir"] = "$(task.outdir)"
467             kwargs["tmpdir"] = "$(task.tmpdir)"
468
469         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
470             raise Exception("--priority must be in the range 1..1000.")
471
472         runnerjob = None
473         if kwargs.get("submit"):
474             # Submit a runner job to run the workflow for us.
475             if self.work_api == "containers":
476                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
477                     kwargs["runnerjob"] = tool.tool["id"]
478                     runnerjob = tool.job(job_order,
479                                          self.output_callback,
480                                          **kwargs).next()
481                 else:
482                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
483                                                 self.output_name,
484                                                 self.output_tags,
485                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
486                                                 name=kwargs.get("name"),
487                                                 on_error=kwargs.get("on_error"),
488                                                 submit_runner_image=kwargs.get("submit_runner_image"),
489                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
490                                                 merged_map=merged_map,
491                                                 priority=kwargs.get("priority"),
492                                                 secret_store=self.secret_store)
493             elif self.work_api == "jobs":
494                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
495                                       self.output_name,
496                                       self.output_tags,
497                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
498                                       name=kwargs.get("name"),
499                                       on_error=kwargs.get("on_error"),
500                                       submit_runner_image=kwargs.get("submit_runner_image"),
501                                       merged_map=merged_map)
502         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
503             # Create pipeline for local run
504             self.pipeline = self.api.pipeline_instances().create(
505                 body={
506                     "owner_uuid": self.project_uuid,
507                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
508                     "components": {},
509                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
510             logger.info("Pipeline instance %s", self.pipeline["uuid"])
511
512         if runnerjob and not kwargs.get("wait"):
513             submitargs = kwargs.copy()
514             submitargs['submit'] = False
515             runnerjob.run(**submitargs)
516             return (runnerjob.uuid, "success")
517
518         self.poll_api = arvados.api('v1')
519         self.polling_thread = threading.Thread(target=self.poll_states)
520         self.polling_thread.start()
521
522         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
523
524         if runnerjob:
525             jobiter = iter((runnerjob,))
526         else:
527             if "cwl_runner_job" in kwargs:
528                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
529             jobiter = tool.job(job_order,
530                                self.output_callback,
531                                **kwargs)
532
533         try:
534             self.workflow_eval_lock.acquire()
535             # Holds the lock while this code runs and releases it when
536             # it is safe to do so in self.workflow_eval_lock.wait(),
537             # at which point on_message can update job state and
538             # process output callbacks.
539
540             loopperf = Perf(metrics, "jobiter")
541             loopperf.__enter__()
542             for runnable in jobiter:
543                 loopperf.__exit__()
544
545                 if self.stop_polling.is_set():
546                     break
547
548                 if self.task_queue.error is not None:
549                     raise self.task_queue.error
550
551                 if runnable:
552                     with Perf(metrics, "run"):
553                         self.start_run(runnable, kwargs)
554                 else:
555                     if (self.task_queue.in_flight + len(self.processes)) > 0:
556                         self.workflow_eval_lock.wait(3)
557                     else:
558                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
559                         break
560                 loopperf.__enter__()
561             loopperf.__exit__()
562
563             while (self.task_queue.in_flight + len(self.processes)) > 0:
564                 if self.task_queue.error is not None:
565                     raise self.task_queue.error
566                 self.workflow_eval_lock.wait(3)
567
568         except UnsupportedRequirement:
569             raise
570         except:
571             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
572                 logger.error("Interrupted, workflow will be cancelled")
573             else:
574                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
575             if self.pipeline:
576                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
577                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
578             if runnerjob and runnerjob.uuid and self.work_api == "containers":
579                 self.api.container_requests().update(uuid=runnerjob.uuid,
580                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
581         finally:
582             self.workflow_eval_lock.release()
583             self.task_queue.drain()
584             self.stop_polling.set()
585             self.polling_thread.join()
586             self.task_queue.join()
587
588         if self.final_status == "UnsupportedRequirement":
589             raise UnsupportedRequirement("Check log for details.")
590
591         if self.final_output is None:
592             raise WorkflowException("Workflow did not return a result.")
593
594         if kwargs.get("submit") and isinstance(runnerjob, Runner):
595             logger.info("Final output collection %s", runnerjob.final_output)
596         else:
597             if self.output_name is None:
598                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
599             if self.output_tags is None:
600                 self.output_tags = ""
601             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
602             self.set_crunch_output()
603
604         if kwargs.get("compute_checksum"):
605             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
606             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
607
608         if self.trash_intermediate and self.final_status == "success":
609             self.trash_intermediate_output()
610
611         return (self.final_output, self.final_status)
612
613
614 def versionstring():
615     """Print version string of key packages for provenance and debugging."""
616
617     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
618     arvpkg = pkg_resources.require("arvados-python-client")
619     cwlpkg = pkg_resources.require("cwltool")
620
621     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
622                                     "arvados-python-client", arvpkg[0].version,
623                                     "cwltool", cwlpkg[0].version)
624
625
626 def arg_parser():  # type: () -> argparse.ArgumentParser
627     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
628
629     parser.add_argument("--basedir", type=str,
630                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
631     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
632                         help="Output directory, default current directory")
633
634     parser.add_argument("--eval-timeout",
635                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
636                         type=float,
637                         default=20)
638
639     exgroup = parser.add_mutually_exclusive_group()
640     exgroup.add_argument("--print-dot", action="store_true",
641                          help="Print workflow visualization in graphviz format and exit")
642     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
643     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
644
645     exgroup = parser.add_mutually_exclusive_group()
646     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
647     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
648     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
649
650     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
651
652     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
653
654     exgroup = parser.add_mutually_exclusive_group()
655     exgroup.add_argument("--enable-reuse", action="store_true",
656                         default=True, dest="enable_reuse",
657                         help="Enable job or container reuse (default)")
658     exgroup.add_argument("--disable-reuse", action="store_false",
659                         default=True, dest="enable_reuse",
660                         help="Disable job or container reuse")
661
662     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
663     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
664     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
665     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
666                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
667                         default=False)
668
669     exgroup = parser.add_mutually_exclusive_group()
670     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
671                         default=True, dest="submit")
672     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
673                         default=True, dest="submit")
674     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
675                          dest="create_workflow")
676     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
677     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
678
679     exgroup = parser.add_mutually_exclusive_group()
680     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
681                         default=True, dest="wait")
682     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
683                         default=True, dest="wait")
684
685     exgroup = parser.add_mutually_exclusive_group()
686     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
687                         default=True, dest="log_timestamps")
688     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
689                         default=True, dest="log_timestamps")
690
691     parser.add_argument("--api", type=str,
692                         default=None, dest="work_api",
693                         choices=("jobs", "containers"),
694                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
695
696     parser.add_argument("--compute-checksum", action="store_true", default=False,
697                         help="Compute checksum of contents while collecting outputs",
698                         dest="compute_checksum")
699
700     parser.add_argument("--submit-runner-ram", type=int,
701                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
702                         default=1024)
703
704     parser.add_argument("--submit-runner-image", type=str,
705                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
706                         default=None)
707
708     parser.add_argument("--name", type=str,
709                         help="Name to use for workflow execution instance.",
710                         default=None)
711
712     parser.add_argument("--on-error", type=str,
713                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
714                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
715
716     parser.add_argument("--enable-dev", action="store_true",
717                         help="Enable loading and running development versions "
718                              "of CWL spec.", default=False)
719
720     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
721                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
722                         default=0)
723
724     parser.add_argument("--priority", type=int,
725                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
726                         default=DEFAULT_PRIORITY)
727
728     parser.add_argument("--disable-validate", dest="do_validate",
729                         action="store_false", default=True,
730                         help=argparse.SUPPRESS)
731
732     parser.add_argument("--disable-js-validation",
733                         action="store_true", default=False,
734                         help=argparse.SUPPRESS)
735
736     parser.add_argument("--thread-count", type=int,
737                         default=4, help="Number of threads to use for job submit and output collection.")
738
739     exgroup = parser.add_mutually_exclusive_group()
740     exgroup.add_argument("--trash-intermediate", action="store_true",
741                         default=False, dest="trash_intermediate",
742                          help="Immediately trash intermediate outputs on workflow success.")
743     exgroup.add_argument("--no-trash-intermediate", action="store_false",
744                         default=False, dest="trash_intermediate",
745                         help="Do not trash intermediate outputs (default).")
746
747     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
748     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
749
750     return parser
751
752 def add_arv_hints():
753     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
754     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
755     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
756     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
757     res.close()
758     cwltool.process.supportedProcessRequirements.extend([
759         "http://arvados.org/cwl#RunInSingleContainer",
760         "http://arvados.org/cwl#OutputDirType",
761         "http://arvados.org/cwl#RuntimeConstraints",
762         "http://arvados.org/cwl#PartitionRequirement",
763         "http://arvados.org/cwl#APIRequirement",
764         "http://commonwl.org/cwltool#LoadListingRequirement",
765         "http://arvados.org/cwl#IntermediateOutput",
766         "http://arvados.org/cwl#ReuseRequirement"
767     ])
768
769 def exit_signal_handler(sigcode, frame):
770     logger.error("Caught signal {}, exiting.".format(sigcode))
771     sys.exit(-sigcode)
772
773 def main(args, stdout, stderr, api_client=None, keep_client=None,
774          install_sig_handlers=True):
775     parser = arg_parser()
776
777     job_order_object = None
778     arvargs = parser.parse_args(args)
779
780     if install_sig_handlers:
781         arv_cmd.install_signal_handlers()
782
783     if arvargs.update_workflow:
784         if arvargs.update_workflow.find('-7fd4e-') == 5:
785             want_api = 'containers'
786         elif arvargs.update_workflow.find('-p5p6p-') == 5:
787             want_api = 'jobs'
788         else:
789             want_api = None
790         if want_api and arvargs.work_api and want_api != arvargs.work_api:
791             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
792                 arvargs.update_workflow, want_api, arvargs.work_api))
793             return 1
794         arvargs.work_api = want_api
795
796     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
797         job_order_object = ({}, "")
798
799     add_arv_hints()
800
801     try:
802         if api_client is None:
803             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
804             keep_client = api_client.keep
805         if keep_client is None:
806             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
807         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
808                               num_retries=4, output_name=arvargs.output_name,
809                               output_tags=arvargs.output_tags,
810                               thread_count=arvargs.thread_count)
811     except Exception as e:
812         logger.error(e)
813         return 1
814
815     if arvargs.debug:
816         logger.setLevel(logging.DEBUG)
817         logging.getLogger('arvados').setLevel(logging.DEBUG)
818
819     if arvargs.quiet:
820         logger.setLevel(logging.WARN)
821         logging.getLogger('arvados').setLevel(logging.WARN)
822         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
823
824     if arvargs.metrics:
825         metrics.setLevel(logging.DEBUG)
826         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
827
828     if arvargs.log_timestamps:
829         arvados.log_handler.setFormatter(logging.Formatter(
830             '%(asctime)s %(name)s %(levelname)s: %(message)s',
831             '%Y-%m-%d %H:%M:%S'))
832     else:
833         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
834
835     arvargs.conformance_test = None
836     arvargs.use_container = True
837     arvargs.relax_path_checks = True
838     arvargs.print_supported_versions = False
839
840     make_fs_access = partial(CollectionFsAccess,
841                            collection_cache=runner.collection_cache)
842
843     return cwltool.main.main(args=arvargs,
844                              stdout=stdout,
845                              stderr=stderr,
846                              executor=runner.arv_executor,
847                              makeTool=runner.arv_make_tool,
848                              versionfunc=versionstring,
849                              job_order_object=job_order_object,
850                              make_fs_access=make_fs_access,
851                              fetcher_constructor=partial(CollectionFetcher,
852                                                          api_client=api_client,
853                                                          fs_access=make_fs_access(""),
854                                                          num_retries=runner.num_retries),
855                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
856                              logger_handler=arvados.log_handler,
857                              custom_schema_callback=add_arv_hints)