5c60f7d2a019dee14b7fc5aa0a6965e4ce9ac085
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31
32 import arvados
33 import arvados.config
34 from arvados.keep import KeepClient
35 from arvados.errors import ApiError
36 import arvados.commands._util as arv_cmd
37
38 from .arvcontainer import ArvadosContainer, RunnerContainer
39 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
40 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
41 from .arvtool import ArvadosCommandTool
42 from .arvworkflow import ArvadosWorkflow, upload_workflow
43 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
44 from .perf import Perf
45 from .pathmapper import NoFollowPathMapper
46 from .task_queue import TaskQueue
47 from ._version import __version__
48
49 from cwltool.pack import pack
50 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
51 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
52 from cwltool.command_line_tool import compute_checksums
53 from arvados.api import OrderedJsonModel
54
55 logger = logging.getLogger('arvados.cwl-runner')
56 metrics = logging.getLogger('arvados.cwl-runner.metrics')
57 logger.setLevel(logging.INFO)
58
59 arvados.log_handler.setFormatter(logging.Formatter(
60         '%(asctime)s %(name)s %(levelname)s: %(message)s',
61         '%Y-%m-%d %H:%M:%S'))
62
63 DEFAULT_PRIORITY = 500
64
65 class ArvCwlRunner(object):
66     """Execute a CWL tool or workflow, submit work (using either jobs or
67     containers API), wait for them to complete, and report output.
68
69     """
70
71     def __init__(self, api_client, work_api=None, keep_client=None,
72                  output_name=None, output_tags=None, num_retries=4,
73                  thread_count=4):
74         self.api = api_client
75         self.processes = {}
76         self.workflow_eval_lock = threading.Condition(threading.RLock())
77         self.final_output = None
78         self.final_status = None
79         self.uploaded = {}
80         self.num_retries = num_retries
81         self.uuid = None
82         self.stop_polling = threading.Event()
83         self.poll_api = None
84         self.pipeline = None
85         self.final_output_collection = None
86         self.output_name = output_name
87         self.output_tags = output_tags
88         self.project_uuid = None
89         self.intermediate_output_ttl = 0
90         self.intermediate_output_collections = []
91         self.trash_intermediate = False
92         self.thread_count = thread_count
93         self.poll_interval = 12
94
95         if keep_client is not None:
96             self.keep_client = keep_client
97         else:
98             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
99
100         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
101
102         self.fetcher_constructor = partial(CollectionFetcher,
103                                            api_client=self.api,
104                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
105                                            num_retries=self.num_retries)
106
107         self.work_api = None
108         expected_api = ["jobs", "containers"]
109         for api in expected_api:
110             try:
111                 methods = self.api._rootDesc.get('resources')[api]['methods']
112                 if ('httpMethod' in methods['create'] and
113                     (work_api == api or work_api is None)):
114                     self.work_api = api
115                     break
116             except KeyError:
117                 pass
118
119         if not self.work_api:
120             if work_api is None:
121                 raise Exception("No supported APIs")
122             else:
123                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
124
125     def arv_make_tool(self, toolpath_object, **kwargs):
126         kwargs["work_api"] = self.work_api
127         kwargs["fetcher_constructor"] = self.fetcher_constructor
128         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
129         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
130             return ArvadosCommandTool(self, toolpath_object, **kwargs)
131         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
132             return ArvadosWorkflow(self, toolpath_object, **kwargs)
133         else:
134             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
135
136     def output_callback(self, out, processStatus):
137         with self.workflow_eval_lock:
138             if processStatus == "success":
139                 logger.info("Overall process status is %s", processStatus)
140                 if self.pipeline:
141                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
142                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
143             else:
144                 logger.warn("Overall process status is %s", processStatus)
145                 if self.pipeline:
146                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
147                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
148             self.final_status = processStatus
149             self.final_output = out
150             self.workflow_eval_lock.notifyAll()
151
152
153     def start_run(self, runnable, kwargs):
154         self.task_queue.add(partial(runnable.run, **kwargs))
155
156     def process_submitted(self, container):
157         with self.workflow_eval_lock:
158             self.processes[container.uuid] = container
159
160     def process_done(self, uuid, record):
161         with self.workflow_eval_lock:
162             j = self.processes[uuid]
163             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
164             self.task_queue.add(partial(j.done, record))
165             del self.processes[uuid]
166
167     def wrapped_callback(self, cb, obj, st):
168         with self.workflow_eval_lock:
169             cb(obj, st)
170             self.workflow_eval_lock.notifyAll()
171
172     def get_wrapped_callback(self, cb):
173         return partial(self.wrapped_callback, cb)
174
175     def on_message(self, event):
176         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
177             uuid = event["object_uuid"]
178             if event["properties"]["new_attributes"]["state"] == "Running":
179                 with self.workflow_eval_lock:
180                     j = self.processes[uuid]
181                     if j.running is False:
182                         j.running = True
183                         j.update_pipeline_component(event["properties"]["new_attributes"])
184                         logger.info("%s %s is Running", self.label(j), uuid)
185             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
186                 self.process_done(uuid, event["properties"]["new_attributes"])
187
188     def label(self, obj):
189         return "[%s %s]" % (self.work_api[0:-1], obj.name)
190
191     def poll_states(self):
192         """Poll status of jobs or containers listed in the processes dict.
193
194         Runs in a separate thread.
195         """
196
197         try:
198             remain_wait = self.poll_interval
199             while True:
200                 if remain_wait > 0:
201                     self.stop_polling.wait(remain_wait)
202                 if self.stop_polling.is_set():
203                     break
204                 with self.workflow_eval_lock:
205                     keys = list(self.processes.keys())
206                 if not keys:
207                     remain_wait = self.poll_interval
208                     continue
209
210                 begin_poll = time.time()
211                 if self.work_api == "containers":
212                     table = self.poll_api.container_requests()
213                 elif self.work_api == "jobs":
214                     table = self.poll_api.jobs()
215
216                 try:
217                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
218                 except Exception as e:
219                     logger.warn("Error checking states on API server: %s", e)
220                     remain_wait = self.poll_interval
221                     continue
222
223                 for p in proc_states["items"]:
224                     self.on_message({
225                         "object_uuid": p["uuid"],
226                         "event_type": "update",
227                         "properties": {
228                             "new_attributes": p
229                         }
230                     })
231                 finish_poll = time.time()
232                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
233         except:
234             logger.exception("Fatal error in state polling thread.")
235             with self.workflow_eval_lock:
236                 self.processes.clear()
237                 self.workflow_eval_lock.notifyAll()
238         finally:
239             self.stop_polling.set()
240
241     def get_uploaded(self):
242         return self.uploaded.copy()
243
244     def add_uploaded(self, src, pair):
245         self.uploaded[src] = pair
246
247     def add_intermediate_output(self, uuid):
248         if uuid:
249             self.intermediate_output_collections.append(uuid)
250
251     def trash_intermediate_output(self):
252         logger.info("Cleaning up intermediate output collections")
253         for i in self.intermediate_output_collections:
254             try:
255                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
256             except:
257                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
258             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
259                 break
260
261     def check_features(self, obj):
262         if isinstance(obj, dict):
263             if obj.get("writable") and self.work_api != "containers":
264                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
265             if obj.get("class") == "DockerRequirement":
266                 if obj.get("dockerOutputDirectory"):
267                     if self.work_api != "containers":
268                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
269                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
270                     if not obj.get("dockerOutputDirectory").startswith('/'):
271                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
272                             "Option 'dockerOutputDirectory' must be an absolute path.")
273             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
274                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
275             for v in obj.itervalues():
276                 self.check_features(v)
277         elif isinstance(obj, list):
278             for i,v in enumerate(obj):
279                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
280                     self.check_features(v)
281
282     def make_output_collection(self, name, tagsString, outputObj):
283         outputObj = copy.deepcopy(outputObj)
284
285         files = []
286         def capture(fileobj):
287             files.append(fileobj)
288
289         adjustDirObjs(outputObj, capture)
290         adjustFileObjs(outputObj, capture)
291
292         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
293
294         final = arvados.collection.Collection(api_client=self.api,
295                                               keep_client=self.keep_client,
296                                               num_retries=self.num_retries)
297
298         for k,v in generatemapper.items():
299             if k.startswith("_:"):
300                 if v.type == "Directory":
301                     continue
302                 if v.type == "CreateFile":
303                     with final.open(v.target, "wb") as f:
304                         f.write(v.resolved.encode("utf-8"))
305                     continue
306
307             if not k.startswith("keep:"):
308                 raise Exception("Output source is not in keep or a literal")
309             sp = k.split("/")
310             srccollection = sp[0][5:]
311             try:
312                 reader = self.collection_cache.get(srccollection)
313                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
314                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
315             except arvados.errors.ArgumentError as e:
316                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
317                 raise
318             except IOError as e:
319                 logger.warn("While preparing output collection: %s", e)
320
321         def rewrite(fileobj):
322             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
323             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
324                 if k in fileobj:
325                     del fileobj[k]
326
327         adjustDirObjs(outputObj, rewrite)
328         adjustFileObjs(outputObj, rewrite)
329
330         with final.open("cwl.output.json", "w") as f:
331             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
332
333         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
334
335         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
336                     final.api_response()["name"],
337                     final.manifest_locator())
338
339         final_uuid = final.manifest_locator()
340         tags = tagsString.split(',')
341         for tag in tags:
342              self.api.links().create(body={
343                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
344                 }).execute(num_retries=self.num_retries)
345
346         def finalcollection(fileobj):
347             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
348
349         adjustDirObjs(outputObj, finalcollection)
350         adjustFileObjs(outputObj, finalcollection)
351
352         return (outputObj, final)
353
354     def set_crunch_output(self):
355         if self.work_api == "containers":
356             try:
357                 current = self.api.containers().current().execute(num_retries=self.num_retries)
358             except ApiError as e:
359                 # Status code 404 just means we're not running in a container.
360                 if e.resp.status != 404:
361                     logger.info("Getting current container: %s", e)
362                 return
363             try:
364                 self.api.containers().update(uuid=current['uuid'],
365                                              body={
366                                                  'output': self.final_output_collection.portable_data_hash(),
367                                              }).execute(num_retries=self.num_retries)
368                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
369                                               body={
370                                                   'is_trashed': True
371                                               }).execute(num_retries=self.num_retries)
372             except Exception as e:
373                 logger.info("Setting container output: %s", e)
374         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
375             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
376                                    body={
377                                        'output': self.final_output_collection.portable_data_hash(),
378                                        'success': self.final_status == "success",
379                                        'progress':1.0
380                                    }).execute(num_retries=self.num_retries)
381
382     def arv_executor(self, tool, job_order, **kwargs):
383         self.debug = kwargs.get("debug")
384
385         tool.visit(self.check_features)
386
387         self.project_uuid = kwargs.get("project_uuid")
388         self.pipeline = None
389         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
390                                                                  collection_cache=self.collection_cache)
391         self.fs_access = make_fs_access(kwargs["basedir"])
392         self.secret_store = kwargs.get("secret_store")
393
394         self.trash_intermediate = kwargs["trash_intermediate"]
395         if self.trash_intermediate and self.work_api != "containers":
396             raise Exception("--trash-intermediate is only supported with --api=containers.")
397
398         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
399         if self.intermediate_output_ttl and self.work_api != "containers":
400             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
401         if self.intermediate_output_ttl < 0:
402             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
403
404         if kwargs.get("submit_request_uuid") and self.work_api != "containers":
405             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
406
407         if not kwargs.get("name"):
408             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
409
410         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
411         # Also uploads docker images.
412         merged_map = upload_workflow_deps(self, tool)
413
414         # Reload tool object which may have been updated by
415         # upload_workflow_deps
416         # Don't validate this time because it will just print redundant errors.
417         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
418                                   makeTool=self.arv_make_tool,
419                                   loader=tool.doc_loader,
420                                   avsc_names=tool.doc_schema,
421                                   metadata=tool.metadata,
422                                   do_validate=False)
423
424         # Upload local file references in the job order.
425         job_order = upload_job_order(self, "%s input" % kwargs["name"],
426                                      tool, job_order)
427
428         existing_uuid = kwargs.get("update_workflow")
429         if existing_uuid or kwargs.get("create_workflow"):
430             # Create a pipeline template or workflow record and exit.
431             if self.work_api == "jobs":
432                 tmpl = RunnerTemplate(self, tool, job_order,
433                                       kwargs.get("enable_reuse"),
434                                       uuid=existing_uuid,
435                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
436                                       name=kwargs["name"],
437                                       merged_map=merged_map)
438                 tmpl.save()
439                 # cwltool.main will write our return value to stdout.
440                 return (tmpl.uuid, "success")
441             elif self.work_api == "containers":
442                 return (upload_workflow(self, tool, job_order,
443                                         self.project_uuid,
444                                         uuid=existing_uuid,
445                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
446                                         name=kwargs["name"],
447                                         merged_map=merged_map),
448                         "success")
449
450         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
451         self.eval_timeout = kwargs.get("eval_timeout")
452
453         kwargs["make_fs_access"] = make_fs_access
454         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
455         kwargs["use_container"] = True
456         kwargs["tmpdir_prefix"] = "tmp"
457         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
458
459         if self.work_api == "containers":
460             if self.ignore_docker_for_reuse:
461                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
462             kwargs["outdir"] = "/var/spool/cwl"
463             kwargs["docker_outdir"] = "/var/spool/cwl"
464             kwargs["tmpdir"] = "/tmp"
465             kwargs["docker_tmpdir"] = "/tmp"
466         elif self.work_api == "jobs":
467             if kwargs["priority"] != DEFAULT_PRIORITY:
468                 raise Exception("--priority not implemented for jobs API.")
469             kwargs["outdir"] = "$(task.outdir)"
470             kwargs["docker_outdir"] = "$(task.outdir)"
471             kwargs["tmpdir"] = "$(task.tmpdir)"
472
473         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
474             raise Exception("--priority must be in the range 1..1000.")
475
476         runnerjob = None
477         if kwargs.get("submit"):
478             # Submit a runner job to run the workflow for us.
479             if self.work_api == "containers":
480                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
481                     kwargs["runnerjob"] = tool.tool["id"]
482                     runnerjob = tool.job(job_order,
483                                          self.output_callback,
484                                          **kwargs).next()
485                 else:
486                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
487                                                 self.output_name,
488                                                 self.output_tags,
489                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
490                                                 name=kwargs.get("name"),
491                                                 on_error=kwargs.get("on_error"),
492                                                 submit_runner_image=kwargs.get("submit_runner_image"),
493                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
494                                                 merged_map=merged_map,
495                                                 priority=kwargs.get("priority"),
496                                                 secret_store=self.secret_store)
497             elif self.work_api == "jobs":
498                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
499                                       self.output_name,
500                                       self.output_tags,
501                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
502                                       name=kwargs.get("name"),
503                                       on_error=kwargs.get("on_error"),
504                                       submit_runner_image=kwargs.get("submit_runner_image"),
505                                       merged_map=merged_map)
506         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
507             # Create pipeline for local run
508             self.pipeline = self.api.pipeline_instances().create(
509                 body={
510                     "owner_uuid": self.project_uuid,
511                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
512                     "components": {},
513                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
514             logger.info("Pipeline instance %s", self.pipeline["uuid"])
515
516         if runnerjob and not kwargs.get("wait"):
517             submitargs = kwargs.copy()
518             submitargs['submit'] = False
519             runnerjob.run(**submitargs)
520             return (runnerjob.uuid, "success")
521
522         self.poll_api = arvados.api('v1')
523         self.polling_thread = threading.Thread(target=self.poll_states)
524         self.polling_thread.start()
525
526         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
527
528         if runnerjob:
529             jobiter = iter((runnerjob,))
530         else:
531             if "cwl_runner_job" in kwargs:
532                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
533             jobiter = tool.job(job_order,
534                                self.output_callback,
535                                **kwargs)
536
537         try:
538             self.workflow_eval_lock.acquire()
539             # Holds the lock while this code runs and releases it when
540             # it is safe to do so in self.workflow_eval_lock.wait(),
541             # at which point on_message can update job state and
542             # process output callbacks.
543
544             loopperf = Perf(metrics, "jobiter")
545             loopperf.__enter__()
546             for runnable in jobiter:
547                 loopperf.__exit__()
548
549                 if self.stop_polling.is_set():
550                     break
551
552                 if self.task_queue.error is not None:
553                     raise self.task_queue.error
554
555                 if runnable:
556                     with Perf(metrics, "run"):
557                         self.start_run(runnable, kwargs)
558                 else:
559                     if (self.task_queue.in_flight + len(self.processes)) > 0:
560                         self.workflow_eval_lock.wait(3)
561                     else:
562                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
563                         break
564                 loopperf.__enter__()
565             loopperf.__exit__()
566
567             while (self.task_queue.in_flight + len(self.processes)) > 0:
568                 if self.task_queue.error is not None:
569                     raise self.task_queue.error
570                 self.workflow_eval_lock.wait(3)
571
572         except UnsupportedRequirement:
573             raise
574         except:
575             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
576                 logger.error("Interrupted, workflow will be cancelled")
577             else:
578                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
579             if self.pipeline:
580                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
581                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
582             if runnerjob and runnerjob.uuid and self.work_api == "containers":
583                 self.api.container_requests().update(uuid=runnerjob.uuid,
584                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
585         finally:
586             self.workflow_eval_lock.release()
587             self.task_queue.drain()
588             self.stop_polling.set()
589             self.polling_thread.join()
590             self.task_queue.join()
591
592         if self.final_status == "UnsupportedRequirement":
593             raise UnsupportedRequirement("Check log for details.")
594
595         if self.final_output is None:
596             raise WorkflowException("Workflow did not return a result.")
597
598         if kwargs.get("submit") and isinstance(runnerjob, Runner):
599             logger.info("Final output collection %s", runnerjob.final_output)
600         else:
601             if self.output_name is None:
602                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
603             if self.output_tags is None:
604                 self.output_tags = ""
605             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
606             self.set_crunch_output()
607
608         if kwargs.get("compute_checksum"):
609             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
610             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
611
612         if self.trash_intermediate and self.final_status == "success":
613             self.trash_intermediate_output()
614
615         return (self.final_output, self.final_status)
616
617
618 def versionstring():
619     """Print version string of key packages for provenance and debugging."""
620
621     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
622     arvpkg = pkg_resources.require("arvados-python-client")
623     cwlpkg = pkg_resources.require("cwltool")
624
625     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
626                                     "arvados-python-client", arvpkg[0].version,
627                                     "cwltool", cwlpkg[0].version)
628
629
630 def arg_parser():  # type: () -> argparse.ArgumentParser
631     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
632
633     parser.add_argument("--basedir", type=str,
634                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
635     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
636                         help="Output directory, default current directory")
637
638     parser.add_argument("--eval-timeout",
639                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
640                         type=float,
641                         default=20)
642
643     exgroup = parser.add_mutually_exclusive_group()
644     exgroup.add_argument("--print-dot", action="store_true",
645                          help="Print workflow visualization in graphviz format and exit")
646     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
647     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
648
649     exgroup = parser.add_mutually_exclusive_group()
650     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
651     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
652     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
653
654     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
655
656     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
657
658     exgroup = parser.add_mutually_exclusive_group()
659     exgroup.add_argument("--enable-reuse", action="store_true",
660                         default=True, dest="enable_reuse",
661                         help="Enable job or container reuse (default)")
662     exgroup.add_argument("--disable-reuse", action="store_false",
663                         default=True, dest="enable_reuse",
664                         help="Disable job or container reuse")
665
666     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
667     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
668     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
669     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
670                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
671                         default=False)
672
673     exgroup = parser.add_mutually_exclusive_group()
674     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
675                         default=True, dest="submit")
676     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
677                         default=True, dest="submit")
678     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
679                          dest="create_workflow")
680     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
681     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
682
683     exgroup = parser.add_mutually_exclusive_group()
684     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
685                         default=True, dest="wait")
686     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
687                         default=True, dest="wait")
688
689     exgroup = parser.add_mutually_exclusive_group()
690     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
691                         default=True, dest="log_timestamps")
692     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
693                         default=True, dest="log_timestamps")
694
695     parser.add_argument("--api", type=str,
696                         default=None, dest="work_api",
697                         choices=("jobs", "containers"),
698                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
699
700     parser.add_argument("--compute-checksum", action="store_true", default=False,
701                         help="Compute checksum of contents while collecting outputs",
702                         dest="compute_checksum")
703
704     parser.add_argument("--submit-runner-ram", type=int,
705                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
706                         default=1024)
707
708     parser.add_argument("--submit-runner-image", type=str,
709                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
710                         default=None)
711
712     parser.add_argument("--submit-request-uuid", type=str,
713                         default=None,
714                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
715
716     parser.add_argument("--name", type=str,
717                         help="Name to use for workflow execution instance.",
718                         default=None)
719
720     parser.add_argument("--on-error", type=str,
721                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
722                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
723
724     parser.add_argument("--enable-dev", action="store_true",
725                         help="Enable loading and running development versions "
726                              "of CWL spec.", default=False)
727
728     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
729                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
730                         default=0)
731
732     parser.add_argument("--priority", type=int,
733                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
734                         default=DEFAULT_PRIORITY)
735
736     parser.add_argument("--disable-validate", dest="do_validate",
737                         action="store_false", default=True,
738                         help=argparse.SUPPRESS)
739
740     parser.add_argument("--disable-js-validation",
741                         action="store_true", default=False,
742                         help=argparse.SUPPRESS)
743
744     parser.add_argument("--thread-count", type=int,
745                         default=4, help="Number of threads to use for job submit and output collection.")
746
747     exgroup = parser.add_mutually_exclusive_group()
748     exgroup.add_argument("--trash-intermediate", action="store_true",
749                         default=False, dest="trash_intermediate",
750                          help="Immediately trash intermediate outputs on workflow success.")
751     exgroup.add_argument("--no-trash-intermediate", action="store_false",
752                         default=False, dest="trash_intermediate",
753                         help="Do not trash intermediate outputs (default).")
754
755     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
756     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
757
758     return parser
759
760 def add_arv_hints():
761     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
762     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
763     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
764     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
765     res.close()
766     cwltool.process.supportedProcessRequirements.extend([
767         "http://arvados.org/cwl#RunInSingleContainer",
768         "http://arvados.org/cwl#OutputDirType",
769         "http://arvados.org/cwl#RuntimeConstraints",
770         "http://arvados.org/cwl#PartitionRequirement",
771         "http://arvados.org/cwl#APIRequirement",
772         "http://commonwl.org/cwltool#LoadListingRequirement",
773         "http://arvados.org/cwl#IntermediateOutput",
774         "http://arvados.org/cwl#ReuseRequirement"
775     ])
776
777 def exit_signal_handler(sigcode, frame):
778     logger.error("Caught signal {}, exiting.".format(sigcode))
779     sys.exit(-sigcode)
780
781 def main(args, stdout, stderr, api_client=None, keep_client=None,
782          install_sig_handlers=True):
783     parser = arg_parser()
784
785     job_order_object = None
786     arvargs = parser.parse_args(args)
787
788     if install_sig_handlers:
789         arv_cmd.install_signal_handlers()
790
791     if arvargs.update_workflow:
792         if arvargs.update_workflow.find('-7fd4e-') == 5:
793             want_api = 'containers'
794         elif arvargs.update_workflow.find('-p5p6p-') == 5:
795             want_api = 'jobs'
796         else:
797             want_api = None
798         if want_api and arvargs.work_api and want_api != arvargs.work_api:
799             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
800                 arvargs.update_workflow, want_api, arvargs.work_api))
801             return 1
802         arvargs.work_api = want_api
803
804     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
805         job_order_object = ({}, "")
806
807     add_arv_hints()
808
809     try:
810         if api_client is None:
811             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
812             keep_client = api_client.keep
813         if keep_client is None:
814             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
815         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
816                               num_retries=4, output_name=arvargs.output_name,
817                               output_tags=arvargs.output_tags,
818                               thread_count=arvargs.thread_count)
819     except Exception as e:
820         logger.error(e)
821         return 1
822
823     if arvargs.debug:
824         logger.setLevel(logging.DEBUG)
825         logging.getLogger('arvados').setLevel(logging.DEBUG)
826
827     if arvargs.quiet:
828         logger.setLevel(logging.WARN)
829         logging.getLogger('arvados').setLevel(logging.WARN)
830         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
831
832     if arvargs.metrics:
833         metrics.setLevel(logging.DEBUG)
834         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
835
836     if arvargs.log_timestamps:
837         arvados.log_handler.setFormatter(logging.Formatter(
838             '%(asctime)s %(name)s %(levelname)s: %(message)s',
839             '%Y-%m-%d %H:%M:%S'))
840     else:
841         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
842
843     arvargs.conformance_test = None
844     arvargs.use_container = True
845     arvargs.relax_path_checks = True
846     arvargs.print_supported_versions = False
847
848     make_fs_access = partial(CollectionFsAccess,
849                            collection_cache=runner.collection_cache)
850
851     return cwltool.main.main(args=arvargs,
852                              stdout=stdout,
853                              stderr=stderr,
854                              executor=runner.arv_executor,
855                              makeTool=runner.arv_make_tool,
856                              versionfunc=versionstring,
857                              job_order_object=job_order_object,
858                              make_fs_access=make_fs_access,
859                              fetcher_constructor=partial(CollectionFetcher,
860                                                          api_client=api_client,
861                                                          fs_access=make_fs_access(""),
862                                                          num_retries=runner.num_retries),
863                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
864                              logger_handler=arvados.log_handler,
865                              custom_schema_callback=add_arv_hints)