13108: Refactor signal handling
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31
32 import arvados
33 import arvados.config
34 from arvados.keep import KeepClient
35 from arvados.errors import ApiError
36 import arvados.commands._util as arv_cmd
37
38 from .arvcontainer import ArvadosContainer, RunnerContainer
39 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
40 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
41 from .arvtool import ArvadosCommandTool
42 from .arvworkflow import ArvadosWorkflow, upload_workflow
43 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
44 from .perf import Perf
45 from .pathmapper import NoFollowPathMapper
46 from .task_queue import TaskQueue
47 from ._version import __version__
48
49 from cwltool.pack import pack
50 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
51 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
52 from cwltool.command_line_tool import compute_checksums
53 from arvados.api import OrderedJsonModel
54
55 logger = logging.getLogger('arvados.cwl-runner')
56 metrics = logging.getLogger('arvados.cwl-runner.metrics')
57 logger.setLevel(logging.INFO)
58
59 arvados.log_handler.setFormatter(logging.Formatter(
60         '%(asctime)s %(name)s %(levelname)s: %(message)s',
61         '%Y-%m-%d %H:%M:%S'))
62
63 DEFAULT_PRIORITY = 500
64
65 class ArvCwlRunner(object):
66     """Execute a CWL tool or workflow, submit work (using either jobs or
67     containers API), wait for them to complete, and report output.
68
69     """
70
71     def __init__(self, api_client, work_api=None, keep_client=None,
72                  output_name=None, output_tags=None, num_retries=4,
73                  thread_count=4):
74         self.api = api_client
75         self.processes = {}
76         self.workflow_eval_lock = threading.Condition(threading.RLock())
77         self.final_output = None
78         self.final_status = None
79         self.uploaded = {}
80         self.num_retries = num_retries
81         self.uuid = None
82         self.stop_polling = threading.Event()
83         self.poll_api = None
84         self.pipeline = None
85         self.final_output_collection = None
86         self.output_name = output_name
87         self.output_tags = output_tags
88         self.project_uuid = None
89         self.intermediate_output_ttl = 0
90         self.intermediate_output_collections = []
91         self.trash_intermediate = False
92         self.thread_count = thread_count
93         self.poll_interval = 12
94
95         if keep_client is not None:
96             self.keep_client = keep_client
97         else:
98             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
99
100         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
101
102         self.work_api = None
103         expected_api = ["jobs", "containers"]
104         for api in expected_api:
105             try:
106                 methods = self.api._rootDesc.get('resources')[api]['methods']
107                 if ('httpMethod' in methods['create'] and
108                     (work_api == api or work_api is None)):
109                     self.work_api = api
110                     break
111             except KeyError:
112                 pass
113
114         if not self.work_api:
115             if work_api is None:
116                 raise Exception("No supported APIs")
117             else:
118                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
119
120     def arv_make_tool(self, toolpath_object, **kwargs):
121         kwargs["work_api"] = self.work_api
122         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
123                                                 api_client=self.api,
124                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
125                                                 num_retries=self.num_retries)
126         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
127         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
128             return ArvadosCommandTool(self, toolpath_object, **kwargs)
129         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
130             return ArvadosWorkflow(self, toolpath_object, **kwargs)
131         else:
132             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
133
134     def output_callback(self, out, processStatus):
135         with self.workflow_eval_lock:
136             if processStatus == "success":
137                 logger.info("Overall process status is %s", processStatus)
138                 if self.pipeline:
139                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
140                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
141             else:
142                 logger.warn("Overall process status is %s", processStatus)
143                 if self.pipeline:
144                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
145                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
146             self.final_status = processStatus
147             self.final_output = out
148             self.workflow_eval_lock.notifyAll()
149
150
151     def start_run(self, runnable, kwargs):
152         self.task_queue.add(partial(runnable.run, **kwargs))
153
154     def process_submitted(self, container):
155         with self.workflow_eval_lock:
156             self.processes[container.uuid] = container
157
158     def process_done(self, uuid):
159         with self.workflow_eval_lock:
160             if uuid in self.processes:
161                 del self.processes[uuid]
162
163     def wrapped_callback(self, cb, obj, st):
164         with self.workflow_eval_lock:
165             cb(obj, st)
166             self.workflow_eval_lock.notifyAll()
167
168     def get_wrapped_callback(self, cb):
169         return partial(self.wrapped_callback, cb)
170
171     def on_message(self, event):
172         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
173             uuid = event["object_uuid"]
174             if event["properties"]["new_attributes"]["state"] == "Running":
175                 with self.workflow_eval_lock:
176                     j = self.processes[uuid]
177                     if j.running is False:
178                         j.running = True
179                         j.update_pipeline_component(event["properties"]["new_attributes"])
180                         logger.info("%s %s is Running", self.label(j), uuid)
181             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
182                 with self.workflow_eval_lock:
183                     j = self.processes[uuid]
184                 self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
185                 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
186
187     def label(self, obj):
188         return "[%s %s]" % (self.work_api[0:-1], obj.name)
189
190     def poll_states(self):
191         """Poll status of jobs or containers listed in the processes dict.
192
193         Runs in a separate thread.
194         """
195
196         try:
197             remain_wait = self.poll_interval
198             while True:
199                 if remain_wait > 0:
200                     self.stop_polling.wait(remain_wait)
201                 if self.stop_polling.is_set():
202                     break
203                 with self.workflow_eval_lock:
204                     keys = list(self.processes.keys())
205                 if not keys:
206                     remain_wait = self.poll_interval
207                     continue
208
209                 begin_poll = time.time()
210                 if self.work_api == "containers":
211                     table = self.poll_api.container_requests()
212                 elif self.work_api == "jobs":
213                     table = self.poll_api.jobs()
214
215                 try:
216                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
217                 except Exception as e:
218                     logger.warn("Error checking states on API server: %s", e)
219                     remain_wait = self.poll_interval
220                     continue
221
222                 for p in proc_states["items"]:
223                     self.on_message({
224                         "object_uuid": p["uuid"],
225                         "event_type": "update",
226                         "properties": {
227                             "new_attributes": p
228                         }
229                     })
230                 finish_poll = time.time()
231                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
232         except:
233             logger.exception("Fatal error in state polling thread.")
234             with self.workflow_eval_lock:
235                 self.processes.clear()
236                 self.workflow_eval_lock.notifyAll()
237         finally:
238             self.stop_polling.set()
239
240     def get_uploaded(self):
241         return self.uploaded.copy()
242
243     def add_uploaded(self, src, pair):
244         self.uploaded[src] = pair
245
246     def add_intermediate_output(self, uuid):
247         if uuid:
248             self.intermediate_output_collections.append(uuid)
249
250     def trash_intermediate_output(self):
251         logger.info("Cleaning up intermediate output collections")
252         for i in self.intermediate_output_collections:
253             try:
254                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
255             except:
256                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
257             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
258                 break
259
260     def check_features(self, obj):
261         if isinstance(obj, dict):
262             if obj.get("writable") and self.work_api != "containers":
263                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
264             if obj.get("class") == "DockerRequirement":
265                 if obj.get("dockerOutputDirectory"):
266                     if self.work_api != "containers":
267                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
268                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
269                     if not obj.get("dockerOutputDirectory").startswith('/'):
270                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
271                             "Option 'dockerOutputDirectory' must be an absolute path.")
272             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
273                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
274             for v in obj.itervalues():
275                 self.check_features(v)
276         elif isinstance(obj, list):
277             for i,v in enumerate(obj):
278                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
279                     self.check_features(v)
280
281     def make_output_collection(self, name, tagsString, outputObj):
282         outputObj = copy.deepcopy(outputObj)
283
284         files = []
285         def capture(fileobj):
286             files.append(fileobj)
287
288         adjustDirObjs(outputObj, capture)
289         adjustFileObjs(outputObj, capture)
290
291         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
292
293         final = arvados.collection.Collection(api_client=self.api,
294                                               keep_client=self.keep_client,
295                                               num_retries=self.num_retries)
296
297         for k,v in generatemapper.items():
298             if k.startswith("_:"):
299                 if v.type == "Directory":
300                     continue
301                 if v.type == "CreateFile":
302                     with final.open(v.target, "wb") as f:
303                         f.write(v.resolved.encode("utf-8"))
304                     continue
305
306             if not k.startswith("keep:"):
307                 raise Exception("Output source is not in keep or a literal")
308             sp = k.split("/")
309             srccollection = sp[0][5:]
310             try:
311                 reader = self.collection_cache.get(srccollection)
312                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
313                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
314             except arvados.errors.ArgumentError as e:
315                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
316                 raise
317             except IOError as e:
318                 logger.warn("While preparing output collection: %s", e)
319
320         def rewrite(fileobj):
321             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
322             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
323                 if k in fileobj:
324                     del fileobj[k]
325
326         adjustDirObjs(outputObj, rewrite)
327         adjustFileObjs(outputObj, rewrite)
328
329         with final.open("cwl.output.json", "w") as f:
330             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
331
332         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
333
334         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
335                     final.api_response()["name"],
336                     final.manifest_locator())
337
338         final_uuid = final.manifest_locator()
339         tags = tagsString.split(',')
340         for tag in tags:
341              self.api.links().create(body={
342                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
343                 }).execute(num_retries=self.num_retries)
344
345         def finalcollection(fileobj):
346             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
347
348         adjustDirObjs(outputObj, finalcollection)
349         adjustFileObjs(outputObj, finalcollection)
350
351         return (outputObj, final)
352
353     def set_crunch_output(self):
354         if self.work_api == "containers":
355             try:
356                 current = self.api.containers().current().execute(num_retries=self.num_retries)
357             except ApiError as e:
358                 # Status code 404 just means we're not running in a container.
359                 if e.resp.status != 404:
360                     logger.info("Getting current container: %s", e)
361                 return
362             try:
363                 self.api.containers().update(uuid=current['uuid'],
364                                              body={
365                                                  'output': self.final_output_collection.portable_data_hash(),
366                                              }).execute(num_retries=self.num_retries)
367                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
368                                               body={
369                                                   'is_trashed': True
370                                               }).execute(num_retries=self.num_retries)
371             except Exception as e:
372                 logger.info("Setting container output: %s", e)
373         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
374             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
375                                    body={
376                                        'output': self.final_output_collection.portable_data_hash(),
377                                        'success': self.final_status == "success",
378                                        'progress':1.0
379                                    }).execute(num_retries=self.num_retries)
380
381     def arv_executor(self, tool, job_order, **kwargs):
382         self.debug = kwargs.get("debug")
383
384         tool.visit(self.check_features)
385
386         self.project_uuid = kwargs.get("project_uuid")
387         self.pipeline = None
388         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
389                                                                  collection_cache=self.collection_cache)
390         self.fs_access = make_fs_access(kwargs["basedir"])
391         self.secret_store = kwargs.get("secret_store")
392
393         self.trash_intermediate = kwargs["trash_intermediate"]
394         if self.trash_intermediate and self.work_api != "containers":
395             raise Exception("--trash-intermediate is only supported with --api=containers.")
396
397         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
398         if self.intermediate_output_ttl and self.work_api != "containers":
399             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
400         if self.intermediate_output_ttl < 0:
401             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
402
403         if not kwargs.get("name"):
404             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
405
406         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
407         # Also uploads docker images.
408         merged_map = upload_workflow_deps(self, tool)
409
410         # Reload tool object which may have been updated by
411         # upload_workflow_deps
412         # Don't validate this time because it will just print redundant errors.
413         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
414                                   makeTool=self.arv_make_tool,
415                                   loader=tool.doc_loader,
416                                   avsc_names=tool.doc_schema,
417                                   metadata=tool.metadata,
418                                   do_validate=False)
419
420         # Upload local file references in the job order.
421         job_order = upload_job_order(self, "%s input" % kwargs["name"],
422                                      tool, job_order)
423
424         existing_uuid = kwargs.get("update_workflow")
425         if existing_uuid or kwargs.get("create_workflow"):
426             # Create a pipeline template or workflow record and exit.
427             if self.work_api == "jobs":
428                 tmpl = RunnerTemplate(self, tool, job_order,
429                                       kwargs.get("enable_reuse"),
430                                       uuid=existing_uuid,
431                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
432                                       name=kwargs["name"],
433                                       merged_map=merged_map)
434                 tmpl.save()
435                 # cwltool.main will write our return value to stdout.
436                 return (tmpl.uuid, "success")
437             elif self.work_api == "containers":
438                 return (upload_workflow(self, tool, job_order,
439                                         self.project_uuid,
440                                         uuid=existing_uuid,
441                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
442                                         name=kwargs["name"],
443                                         merged_map=merged_map),
444                         "success")
445
446         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
447         self.eval_timeout = kwargs.get("eval_timeout")
448
449         kwargs["make_fs_access"] = make_fs_access
450         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
451         kwargs["use_container"] = True
452         kwargs["tmpdir_prefix"] = "tmp"
453         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
454
455         if self.work_api == "containers":
456             if self.ignore_docker_for_reuse:
457                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
458             kwargs["outdir"] = "/var/spool/cwl"
459             kwargs["docker_outdir"] = "/var/spool/cwl"
460             kwargs["tmpdir"] = "/tmp"
461             kwargs["docker_tmpdir"] = "/tmp"
462         elif self.work_api == "jobs":
463             if kwargs["priority"] != DEFAULT_PRIORITY:
464                 raise Exception("--priority not implemented for jobs API.")
465             kwargs["outdir"] = "$(task.outdir)"
466             kwargs["docker_outdir"] = "$(task.outdir)"
467             kwargs["tmpdir"] = "$(task.tmpdir)"
468
469         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
470             raise Exception("--priority must be in the range 1..1000.")
471
472         runnerjob = None
473         if kwargs.get("submit"):
474             # Submit a runner job to run the workflow for us.
475             if self.work_api == "containers":
476                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
477                     kwargs["runnerjob"] = tool.tool["id"]
478                     runnerjob = tool.job(job_order,
479                                          self.output_callback,
480                                          **kwargs).next()
481                 else:
482                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
483                                                 self.output_name,
484                                                 self.output_tags,
485                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
486                                                 name=kwargs.get("name"),
487                                                 on_error=kwargs.get("on_error"),
488                                                 submit_runner_image=kwargs.get("submit_runner_image"),
489                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
490                                                 merged_map=merged_map,
491                                                 priority=kwargs.get("priority"),
492                                                 secret_store=self.secret_store)
493             elif self.work_api == "jobs":
494                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
495                                       self.output_name,
496                                       self.output_tags,
497                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
498                                       name=kwargs.get("name"),
499                                       on_error=kwargs.get("on_error"),
500                                       submit_runner_image=kwargs.get("submit_runner_image"),
501                                       merged_map=merged_map)
502         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
503             # Create pipeline for local run
504             self.pipeline = self.api.pipeline_instances().create(
505                 body={
506                     "owner_uuid": self.project_uuid,
507                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
508                     "components": {},
509                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
510             logger.info("Pipeline instance %s", self.pipeline["uuid"])
511
512         if runnerjob and not kwargs.get("wait"):
513             runnerjob.run(**kwargs)
514             return (runnerjob.uuid, "success")
515
516         self.poll_api = arvados.api('v1')
517         self.polling_thread = threading.Thread(target=self.poll_states)
518         self.polling_thread.start()
519
520         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
521
522         if runnerjob:
523             jobiter = iter((runnerjob,))
524         else:
525             if "cwl_runner_job" in kwargs:
526                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
527             jobiter = tool.job(job_order,
528                                self.output_callback,
529                                **kwargs)
530
531         try:
532             self.workflow_eval_lock.acquire()
533             # Holds the lock while this code runs and releases it when
534             # it is safe to do so in self.workflow_eval_lock.wait(),
535             # at which point on_message can update job state and
536             # process output callbacks.
537
538             loopperf = Perf(metrics, "jobiter")
539             loopperf.__enter__()
540             for runnable in jobiter:
541                 loopperf.__exit__()
542
543                 if self.stop_polling.is_set():
544                     break
545
546                 if self.task_queue.error is not None:
547                     raise self.task_queue.error
548
549                 if runnable:
550                     with Perf(metrics, "run"):
551                         self.start_run(runnable, kwargs)
552                 else:
553                     if (self.task_queue.in_flight + len(self.processes)) > 0:
554                         self.workflow_eval_lock.wait(3)
555                     else:
556                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
557                         break
558                 loopperf.__enter__()
559             loopperf.__exit__()
560
561             while (self.task_queue.in_flight + len(self.processes)) > 0:
562                 if self.task_queue.error is not None:
563                     raise self.task_queue.error
564                 self.workflow_eval_lock.wait(3)
565
566         except UnsupportedRequirement:
567             raise
568         except:
569             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
570                 logger.error("Interrupted, workflow will be cancelled")
571             else:
572                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
573             if self.pipeline:
574                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
575                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
576             if runnerjob and runnerjob.uuid and self.work_api == "containers":
577                 self.api.container_requests().update(uuid=runnerjob.uuid,
578                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
579         finally:
580             self.workflow_eval_lock.release()
581             self.task_queue.drain()
582             self.stop_polling.set()
583             self.polling_thread.join()
584             self.task_queue.join()
585
586         if self.final_status == "UnsupportedRequirement":
587             raise UnsupportedRequirement("Check log for details.")
588
589         if self.final_output is None:
590             raise WorkflowException("Workflow did not return a result.")
591
592         if kwargs.get("submit") and isinstance(runnerjob, Runner):
593             logger.info("Final output collection %s", runnerjob.final_output)
594         else:
595             if self.output_name is None:
596                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
597             if self.output_tags is None:
598                 self.output_tags = ""
599             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
600             self.set_crunch_output()
601
602         if kwargs.get("compute_checksum"):
603             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
604             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
605
606         if self.trash_intermediate and self.final_status == "success":
607             self.trash_intermediate_output()
608
609         return (self.final_output, self.final_status)
610
611
612 def versionstring():
613     """Print version string of key packages for provenance and debugging."""
614
615     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
616     arvpkg = pkg_resources.require("arvados-python-client")
617     cwlpkg = pkg_resources.require("cwltool")
618
619     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
620                                     "arvados-python-client", arvpkg[0].version,
621                                     "cwltool", cwlpkg[0].version)
622
623
624 def arg_parser():  # type: () -> argparse.ArgumentParser
625     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
626
627     parser.add_argument("--basedir", type=str,
628                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
629     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
630                         help="Output directory, default current directory")
631
632     parser.add_argument("--eval-timeout",
633                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
634                         type=float,
635                         default=20)
636
637     exgroup = parser.add_mutually_exclusive_group()
638     exgroup.add_argument("--print-dot", action="store_true",
639                          help="Print workflow visualization in graphviz format and exit")
640     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
641     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
642
643     exgroup = parser.add_mutually_exclusive_group()
644     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
645     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
646     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
647
648     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
649
650     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
651
652     exgroup = parser.add_mutually_exclusive_group()
653     exgroup.add_argument("--enable-reuse", action="store_true",
654                         default=True, dest="enable_reuse",
655                         help="Enable job or container reuse (default)")
656     exgroup.add_argument("--disable-reuse", action="store_false",
657                         default=True, dest="enable_reuse",
658                         help="Disable job or container reuse")
659
660     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
661     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
662     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
663     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
664                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
665                         default=False)
666
667     exgroup = parser.add_mutually_exclusive_group()
668     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
669                         default=True, dest="submit")
670     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
671                         default=True, dest="submit")
672     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
673                          dest="create_workflow")
674     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
675     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
676
677     exgroup = parser.add_mutually_exclusive_group()
678     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
679                         default=True, dest="wait")
680     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
681                         default=True, dest="wait")
682
683     exgroup = parser.add_mutually_exclusive_group()
684     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
685                         default=True, dest="log_timestamps")
686     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
687                         default=True, dest="log_timestamps")
688
689     parser.add_argument("--api", type=str,
690                         default=None, dest="work_api",
691                         choices=("jobs", "containers"),
692                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
693
694     parser.add_argument("--compute-checksum", action="store_true", default=False,
695                         help="Compute checksum of contents while collecting outputs",
696                         dest="compute_checksum")
697
698     parser.add_argument("--submit-runner-ram", type=int,
699                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
700                         default=1024)
701
702     parser.add_argument("--submit-runner-image", type=str,
703                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
704                         default=None)
705
706     parser.add_argument("--name", type=str,
707                         help="Name to use for workflow execution instance.",
708                         default=None)
709
710     parser.add_argument("--on-error", type=str,
711                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
712                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
713
714     parser.add_argument("--enable-dev", action="store_true",
715                         help="Enable loading and running development versions "
716                              "of CWL spec.", default=False)
717
718     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
719                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
720                         default=0)
721
722     parser.add_argument("--priority", type=int,
723                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
724                         default=DEFAULT_PRIORITY)
725
726     parser.add_argument("--disable-validate", dest="do_validate",
727                         action="store_false", default=True,
728                         help=argparse.SUPPRESS)
729
730     parser.add_argument("--disable-js-validation",
731                         action="store_true", default=False,
732                         help=argparse.SUPPRESS)
733
734     parser.add_argument("--thread-count", type=int,
735                         default=4, help="Number of threads to use for job submit and output collection.")
736
737     exgroup = parser.add_mutually_exclusive_group()
738     exgroup.add_argument("--trash-intermediate", action="store_true",
739                         default=False, dest="trash_intermediate",
740                          help="Immediately trash intermediate outputs on workflow success.")
741     exgroup.add_argument("--no-trash-intermediate", action="store_false",
742                         default=False, dest="trash_intermediate",
743                         help="Do not trash intermediate outputs (default).")
744
745     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
746     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
747
748     return parser
749
750 def add_arv_hints():
751     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
752     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
753     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
754     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
755     res.close()
756     cwltool.process.supportedProcessRequirements.extend([
757         "http://arvados.org/cwl#RunInSingleContainer",
758         "http://arvados.org/cwl#OutputDirType",
759         "http://arvados.org/cwl#RuntimeConstraints",
760         "http://arvados.org/cwl#PartitionRequirement",
761         "http://arvados.org/cwl#APIRequirement",
762         "http://commonwl.org/cwltool#LoadListingRequirement",
763         "http://arvados.org/cwl#IntermediateOutput",
764         "http://arvados.org/cwl#ReuseRequirement"
765     ])
766
767 def exit_signal_handler(sigcode, frame):
768     logger.error("Caught signal {}, exiting.".format(sigcode))
769     sys.exit(-sigcode)
770
771 def main(args, stdout, stderr, api_client=None, keep_client=None,
772          install_sig_handlers=True):
773     parser = arg_parser()
774
775     job_order_object = None
776     arvargs = parser.parse_args(args)
777
778     if install_sig_handlers:
779         arv_cmd.install_signal_handlers()
780
781     if arvargs.update_workflow:
782         if arvargs.update_workflow.find('-7fd4e-') == 5:
783             want_api = 'containers'
784         elif arvargs.update_workflow.find('-p5p6p-') == 5:
785             want_api = 'jobs'
786         else:
787             want_api = None
788         if want_api and arvargs.work_api and want_api != arvargs.work_api:
789             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
790                 arvargs.update_workflow, want_api, arvargs.work_api))
791             return 1
792         arvargs.work_api = want_api
793
794     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
795         job_order_object = ({}, "")
796
797     add_arv_hints()
798
799     try:
800         if api_client is None:
801             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
802             keep_client = api_client.keep
803         if keep_client is None:
804             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
805         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
806                               num_retries=4, output_name=arvargs.output_name,
807                               output_tags=arvargs.output_tags,
808                               thread_count=arvargs.thread_count)
809     except Exception as e:
810         logger.error(e)
811         return 1
812
813     if arvargs.debug:
814         logger.setLevel(logging.DEBUG)
815         logging.getLogger('arvados').setLevel(logging.DEBUG)
816
817     if arvargs.quiet:
818         logger.setLevel(logging.WARN)
819         logging.getLogger('arvados').setLevel(logging.WARN)
820         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
821
822     if arvargs.metrics:
823         metrics.setLevel(logging.DEBUG)
824         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
825
826     if arvargs.log_timestamps:
827         arvados.log_handler.setFormatter(logging.Formatter(
828             '%(asctime)s %(name)s %(levelname)s: %(message)s',
829             '%Y-%m-%d %H:%M:%S'))
830     else:
831         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
832
833     arvargs.conformance_test = None
834     arvargs.use_container = True
835     arvargs.relax_path_checks = True
836     arvargs.print_supported_versions = False
837
838     make_fs_access = partial(CollectionFsAccess,
839                            collection_cache=runner.collection_cache)
840
841     return cwltool.main.main(args=arvargs,
842                              stdout=stdout,
843                              stderr=stderr,
844                              executor=runner.arv_executor,
845                              makeTool=runner.arv_make_tool,
846                              versionfunc=versionstring,
847                              job_order_object=job_order_object,
848                              make_fs_access=make_fs_access,
849                              fetcher_constructor=partial(CollectionFetcher,
850                                                          api_client=api_client,
851                                                          fs_access=make_fs_access(""),
852                                                          num_retries=runner.num_retries),
853                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
854                              logger_handler=arvados.log_handler,
855                              custom_schema_callback=add_arv_hints)