12626: Merge branch 'master' into 12626-merge-accounts
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31
32 import arvados
33 import arvados.config
34 from arvados.keep import KeepClient
35 from arvados.errors import ApiError
36 import arvados.commands._util as arv_cmd
37
38 from .arvcontainer import ArvadosContainer, RunnerContainer
39 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
40 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
41 from .arvtool import ArvadosCommandTool
42 from .arvworkflow import ArvadosWorkflow, upload_workflow
43 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
44 from .perf import Perf
45 from .pathmapper import NoFollowPathMapper
46 from .task_queue import TaskQueue
47 from ._version import __version__
48
49 from cwltool.pack import pack
50 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
51 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
52 from cwltool.command_line_tool import compute_checksums
53 from arvados.api import OrderedJsonModel
54
55 logger = logging.getLogger('arvados.cwl-runner')
56 metrics = logging.getLogger('arvados.cwl-runner.metrics')
57 logger.setLevel(logging.INFO)
58
59 arvados.log_handler.setFormatter(logging.Formatter(
60         '%(asctime)s %(name)s %(levelname)s: %(message)s',
61         '%Y-%m-%d %H:%M:%S'))
62
63 DEFAULT_PRIORITY = 500
64
65 class ArvCwlRunner(object):
66     """Execute a CWL tool or workflow, submit work (using either jobs or
67     containers API), wait for them to complete, and report output.
68
69     """
70
71     def __init__(self, api_client, work_api=None, keep_client=None,
72                  output_name=None, output_tags=None, num_retries=4,
73                  thread_count=4):
74         self.api = api_client
75         self.processes = {}
76         self.workflow_eval_lock = threading.Condition(threading.RLock())
77         self.final_output = None
78         self.final_status = None
79         self.uploaded = {}
80         self.num_retries = num_retries
81         self.uuid = None
82         self.stop_polling = threading.Event()
83         self.poll_api = None
84         self.pipeline = None
85         self.final_output_collection = None
86         self.output_name = output_name
87         self.output_tags = output_tags
88         self.project_uuid = None
89         self.intermediate_output_ttl = 0
90         self.intermediate_output_collections = []
91         self.trash_intermediate = False
92         self.thread_count = thread_count
93         self.poll_interval = 12
94
95         if keep_client is not None:
96             self.keep_client = keep_client
97         else:
98             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
99
100         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
101
102         self.fetcher_constructor = partial(CollectionFetcher,
103                                            api_client=self.api,
104                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
105                                            num_retries=self.num_retries)
106
107         self.work_api = None
108         expected_api = ["jobs", "containers"]
109         for api in expected_api:
110             try:
111                 methods = self.api._rootDesc.get('resources')[api]['methods']
112                 if ('httpMethod' in methods['create'] and
113                     (work_api == api or work_api is None)):
114                     self.work_api = api
115                     break
116             except KeyError:
117                 pass
118
119         if not self.work_api:
120             if work_api is None:
121                 raise Exception("No supported APIs")
122             else:
123                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
124
125     def arv_make_tool(self, toolpath_object, **kwargs):
126         kwargs["work_api"] = self.work_api
127         kwargs["fetcher_constructor"] = self.fetcher_constructor
128         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
129         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
130             return ArvadosCommandTool(self, toolpath_object, **kwargs)
131         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
132             return ArvadosWorkflow(self, toolpath_object, **kwargs)
133         else:
134             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
135
136     def output_callback(self, out, processStatus):
137         with self.workflow_eval_lock:
138             if processStatus == "success":
139                 logger.info("Overall process status is %s", processStatus)
140                 if self.pipeline:
141                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
142                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
143             else:
144                 logger.warn("Overall process status is %s", processStatus)
145                 if self.pipeline:
146                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
147                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
148             self.final_status = processStatus
149             self.final_output = out
150             self.workflow_eval_lock.notifyAll()
151
152
153     def start_run(self, runnable, kwargs):
154         self.task_queue.add(partial(runnable.run, **kwargs))
155
156     def process_submitted(self, container):
157         with self.workflow_eval_lock:
158             self.processes[container.uuid] = container
159
160     def process_done(self, uuid, record):
161         with self.workflow_eval_lock:
162             j = self.processes[uuid]
163             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
164             self.task_queue.add(partial(j.done, record))
165             del self.processes[uuid]
166
167     def wrapped_callback(self, cb, obj, st):
168         with self.workflow_eval_lock:
169             cb(obj, st)
170             self.workflow_eval_lock.notifyAll()
171
172     def get_wrapped_callback(self, cb):
173         return partial(self.wrapped_callback, cb)
174
175     def on_message(self, event):
176         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
177             uuid = event["object_uuid"]
178             if event["properties"]["new_attributes"]["state"] == "Running":
179                 with self.workflow_eval_lock:
180                     j = self.processes[uuid]
181                     if j.running is False:
182                         j.running = True
183                         j.update_pipeline_component(event["properties"]["new_attributes"])
184                         logger.info("%s %s is Running", self.label(j), uuid)
185             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
186                 self.process_done(uuid, event["properties"]["new_attributes"])
187
188     def label(self, obj):
189         return "[%s %s]" % (self.work_api[0:-1], obj.name)
190
191     def poll_states(self):
192         """Poll status of jobs or containers listed in the processes dict.
193
194         Runs in a separate thread.
195         """
196
197         try:
198             remain_wait = self.poll_interval
199             while True:
200                 if remain_wait > 0:
201                     self.stop_polling.wait(remain_wait)
202                 if self.stop_polling.is_set():
203                     break
204                 with self.workflow_eval_lock:
205                     keys = list(self.processes.keys())
206                 if not keys:
207                     remain_wait = self.poll_interval
208                     continue
209
210                 begin_poll = time.time()
211                 if self.work_api == "containers":
212                     table = self.poll_api.container_requests()
213                 elif self.work_api == "jobs":
214                     table = self.poll_api.jobs()
215
216                 try:
217                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
218                 except Exception as e:
219                     logger.warn("Error checking states on API server: %s", e)
220                     remain_wait = self.poll_interval
221                     continue
222
223                 for p in proc_states["items"]:
224                     self.on_message({
225                         "object_uuid": p["uuid"],
226                         "event_type": "update",
227                         "properties": {
228                             "new_attributes": p
229                         }
230                     })
231                 finish_poll = time.time()
232                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
233         except:
234             logger.exception("Fatal error in state polling thread.")
235             with self.workflow_eval_lock:
236                 self.processes.clear()
237                 self.workflow_eval_lock.notifyAll()
238         finally:
239             self.stop_polling.set()
240
241     def get_uploaded(self):
242         return self.uploaded.copy()
243
244     def add_uploaded(self, src, pair):
245         self.uploaded[src] = pair
246
247     def add_intermediate_output(self, uuid):
248         if uuid:
249             self.intermediate_output_collections.append(uuid)
250
251     def trash_intermediate_output(self):
252         logger.info("Cleaning up intermediate output collections")
253         for i in self.intermediate_output_collections:
254             try:
255                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
256             except:
257                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
258             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
259                 break
260
261     def check_features(self, obj):
262         if isinstance(obj, dict):
263             if obj.get("writable") and self.work_api != "containers":
264                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
265             if obj.get("class") == "DockerRequirement":
266                 if obj.get("dockerOutputDirectory"):
267                     if self.work_api != "containers":
268                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
269                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
270                     if not obj.get("dockerOutputDirectory").startswith('/'):
271                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
272                             "Option 'dockerOutputDirectory' must be an absolute path.")
273             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
274                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
275             for v in obj.itervalues():
276                 self.check_features(v)
277         elif isinstance(obj, list):
278             for i,v in enumerate(obj):
279                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
280                     self.check_features(v)
281
282     def make_output_collection(self, name, tagsString, outputObj):
283         outputObj = copy.deepcopy(outputObj)
284
285         files = []
286         def capture(fileobj):
287             files.append(fileobj)
288
289         adjustDirObjs(outputObj, capture)
290         adjustFileObjs(outputObj, capture)
291
292         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
293
294         final = arvados.collection.Collection(api_client=self.api,
295                                               keep_client=self.keep_client,
296                                               num_retries=self.num_retries)
297
298         for k,v in generatemapper.items():
299             if k.startswith("_:"):
300                 if v.type == "Directory":
301                     continue
302                 if v.type == "CreateFile":
303                     with final.open(v.target, "wb") as f:
304                         f.write(v.resolved.encode("utf-8"))
305                     continue
306
307             if not k.startswith("keep:"):
308                 raise Exception("Output source is not in keep or a literal")
309             sp = k.split("/")
310             srccollection = sp[0][5:]
311             try:
312                 reader = self.collection_cache.get(srccollection)
313                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
314                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
315             except arvados.errors.ArgumentError as e:
316                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
317                 raise
318             except IOError as e:
319                 logger.warn("While preparing output collection: %s", e)
320
321         def rewrite(fileobj):
322             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
323             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
324                 if k in fileobj:
325                     del fileobj[k]
326
327         adjustDirObjs(outputObj, rewrite)
328         adjustFileObjs(outputObj, rewrite)
329
330         with final.open("cwl.output.json", "w") as f:
331             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
332
333         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
334
335         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
336                     final.api_response()["name"],
337                     final.manifest_locator())
338
339         final_uuid = final.manifest_locator()
340         tags = tagsString.split(',')
341         for tag in tags:
342              self.api.links().create(body={
343                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
344                 }).execute(num_retries=self.num_retries)
345
346         def finalcollection(fileobj):
347             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
348
349         adjustDirObjs(outputObj, finalcollection)
350         adjustFileObjs(outputObj, finalcollection)
351
352         return (outputObj, final)
353
354     def set_crunch_output(self):
355         if self.work_api == "containers":
356             try:
357                 current = self.api.containers().current().execute(num_retries=self.num_retries)
358             except ApiError as e:
359                 # Status code 404 just means we're not running in a container.
360                 if e.resp.status != 404:
361                     logger.info("Getting current container: %s", e)
362                 return
363             try:
364                 self.api.containers().update(uuid=current['uuid'],
365                                              body={
366                                                  'output': self.final_output_collection.portable_data_hash(),
367                                              }).execute(num_retries=self.num_retries)
368                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
369                                               body={
370                                                   'is_trashed': True
371                                               }).execute(num_retries=self.num_retries)
372             except Exception as e:
373                 logger.info("Setting container output: %s", e)
374         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
375             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
376                                    body={
377                                        'output': self.final_output_collection.portable_data_hash(),
378                                        'success': self.final_status == "success",
379                                        'progress':1.0
380                                    }).execute(num_retries=self.num_retries)
381
382     def arv_executor(self, tool, job_order, **kwargs):
383         self.debug = kwargs.get("debug")
384
385         tool.visit(self.check_features)
386
387         self.project_uuid = kwargs.get("project_uuid")
388         self.pipeline = None
389         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
390                                                                  collection_cache=self.collection_cache)
391         self.fs_access = make_fs_access(kwargs["basedir"])
392         self.secret_store = kwargs.get("secret_store")
393
394         self.trash_intermediate = kwargs["trash_intermediate"]
395         if self.trash_intermediate and self.work_api != "containers":
396             raise Exception("--trash-intermediate is only supported with --api=containers.")
397
398         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
399         if self.intermediate_output_ttl and self.work_api != "containers":
400             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
401         if self.intermediate_output_ttl < 0:
402             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
403
404         if not kwargs.get("name"):
405             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
406
407         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
408         # Also uploads docker images.
409         merged_map = upload_workflow_deps(self, tool)
410
411         # Reload tool object which may have been updated by
412         # upload_workflow_deps
413         # Don't validate this time because it will just print redundant errors.
414         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
415                                   makeTool=self.arv_make_tool,
416                                   loader=tool.doc_loader,
417                                   avsc_names=tool.doc_schema,
418                                   metadata=tool.metadata,
419                                   do_validate=False)
420
421         # Upload local file references in the job order.
422         job_order = upload_job_order(self, "%s input" % kwargs["name"],
423                                      tool, job_order)
424
425         existing_uuid = kwargs.get("update_workflow")
426         if existing_uuid or kwargs.get("create_workflow"):
427             # Create a pipeline template or workflow record and exit.
428             if self.work_api == "jobs":
429                 tmpl = RunnerTemplate(self, tool, job_order,
430                                       kwargs.get("enable_reuse"),
431                                       uuid=existing_uuid,
432                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
433                                       name=kwargs["name"],
434                                       merged_map=merged_map)
435                 tmpl.save()
436                 # cwltool.main will write our return value to stdout.
437                 return (tmpl.uuid, "success")
438             elif self.work_api == "containers":
439                 return (upload_workflow(self, tool, job_order,
440                                         self.project_uuid,
441                                         uuid=existing_uuid,
442                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
443                                         name=kwargs["name"],
444                                         merged_map=merged_map),
445                         "success")
446
447         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
448         self.eval_timeout = kwargs.get("eval_timeout")
449
450         kwargs["make_fs_access"] = make_fs_access
451         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
452         kwargs["use_container"] = True
453         kwargs["tmpdir_prefix"] = "tmp"
454         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
455
456         if self.work_api == "containers":
457             if self.ignore_docker_for_reuse:
458                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
459             kwargs["outdir"] = "/var/spool/cwl"
460             kwargs["docker_outdir"] = "/var/spool/cwl"
461             kwargs["tmpdir"] = "/tmp"
462             kwargs["docker_tmpdir"] = "/tmp"
463         elif self.work_api == "jobs":
464             if kwargs["priority"] != DEFAULT_PRIORITY:
465                 raise Exception("--priority not implemented for jobs API.")
466             kwargs["outdir"] = "$(task.outdir)"
467             kwargs["docker_outdir"] = "$(task.outdir)"
468             kwargs["tmpdir"] = "$(task.tmpdir)"
469
470         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
471             raise Exception("--priority must be in the range 1..1000.")
472
473         runnerjob = None
474         if kwargs.get("submit"):
475             # Submit a runner job to run the workflow for us.
476             if self.work_api == "containers":
477                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
478                     kwargs["runnerjob"] = tool.tool["id"]
479                     runnerjob = tool.job(job_order,
480                                          self.output_callback,
481                                          **kwargs).next()
482                 else:
483                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
484                                                 self.output_name,
485                                                 self.output_tags,
486                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
487                                                 name=kwargs.get("name"),
488                                                 on_error=kwargs.get("on_error"),
489                                                 submit_runner_image=kwargs.get("submit_runner_image"),
490                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
491                                                 merged_map=merged_map,
492                                                 priority=kwargs.get("priority"),
493                                                 secret_store=self.secret_store)
494             elif self.work_api == "jobs":
495                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
496                                       self.output_name,
497                                       self.output_tags,
498                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
499                                       name=kwargs.get("name"),
500                                       on_error=kwargs.get("on_error"),
501                                       submit_runner_image=kwargs.get("submit_runner_image"),
502                                       merged_map=merged_map)
503         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
504             # Create pipeline for local run
505             self.pipeline = self.api.pipeline_instances().create(
506                 body={
507                     "owner_uuid": self.project_uuid,
508                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
509                     "components": {},
510                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
511             logger.info("Pipeline instance %s", self.pipeline["uuid"])
512
513         if runnerjob and not kwargs.get("wait"):
514             submitargs = kwargs.copy()
515             submitargs['submit'] = False
516             runnerjob.run(**submitargs)
517             return (runnerjob.uuid, "success")
518
519         self.poll_api = arvados.api('v1')
520         self.polling_thread = threading.Thread(target=self.poll_states)
521         self.polling_thread.start()
522
523         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
524
525         if runnerjob:
526             jobiter = iter((runnerjob,))
527         else:
528             if "cwl_runner_job" in kwargs:
529                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
530             jobiter = tool.job(job_order,
531                                self.output_callback,
532                                **kwargs)
533
534         try:
535             self.workflow_eval_lock.acquire()
536             # Holds the lock while this code runs and releases it when
537             # it is safe to do so in self.workflow_eval_lock.wait(),
538             # at which point on_message can update job state and
539             # process output callbacks.
540
541             loopperf = Perf(metrics, "jobiter")
542             loopperf.__enter__()
543             for runnable in jobiter:
544                 loopperf.__exit__()
545
546                 if self.stop_polling.is_set():
547                     break
548
549                 if self.task_queue.error is not None:
550                     raise self.task_queue.error
551
552                 if runnable:
553                     with Perf(metrics, "run"):
554                         self.start_run(runnable, kwargs)
555                 else:
556                     if (self.task_queue.in_flight + len(self.processes)) > 0:
557                         self.workflow_eval_lock.wait(3)
558                     else:
559                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
560                         break
561                 loopperf.__enter__()
562             loopperf.__exit__()
563
564             while (self.task_queue.in_flight + len(self.processes)) > 0:
565                 if self.task_queue.error is not None:
566                     raise self.task_queue.error
567                 self.workflow_eval_lock.wait(3)
568
569         except UnsupportedRequirement:
570             raise
571         except:
572             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
573                 logger.error("Interrupted, workflow will be cancelled")
574             else:
575                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
576             if self.pipeline:
577                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
578                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
579             if runnerjob and runnerjob.uuid and self.work_api == "containers":
580                 self.api.container_requests().update(uuid=runnerjob.uuid,
581                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
582         finally:
583             self.workflow_eval_lock.release()
584             self.task_queue.drain()
585             self.stop_polling.set()
586             self.polling_thread.join()
587             self.task_queue.join()
588
589         if self.final_status == "UnsupportedRequirement":
590             raise UnsupportedRequirement("Check log for details.")
591
592         if self.final_output is None:
593             raise WorkflowException("Workflow did not return a result.")
594
595         if kwargs.get("submit") and isinstance(runnerjob, Runner):
596             logger.info("Final output collection %s", runnerjob.final_output)
597         else:
598             if self.output_name is None:
599                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
600             if self.output_tags is None:
601                 self.output_tags = ""
602             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
603             self.set_crunch_output()
604
605         if kwargs.get("compute_checksum"):
606             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
607             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
608
609         if self.trash_intermediate and self.final_status == "success":
610             self.trash_intermediate_output()
611
612         return (self.final_output, self.final_status)
613
614
615 def versionstring():
616     """Print version string of key packages for provenance and debugging."""
617
618     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
619     arvpkg = pkg_resources.require("arvados-python-client")
620     cwlpkg = pkg_resources.require("cwltool")
621
622     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
623                                     "arvados-python-client", arvpkg[0].version,
624                                     "cwltool", cwlpkg[0].version)
625
626
627 def arg_parser():  # type: () -> argparse.ArgumentParser
628     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
629
630     parser.add_argument("--basedir", type=str,
631                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
632     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
633                         help="Output directory, default current directory")
634
635     parser.add_argument("--eval-timeout",
636                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
637                         type=float,
638                         default=20)
639
640     exgroup = parser.add_mutually_exclusive_group()
641     exgroup.add_argument("--print-dot", action="store_true",
642                          help="Print workflow visualization in graphviz format and exit")
643     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
644     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
645
646     exgroup = parser.add_mutually_exclusive_group()
647     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
648     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
649     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
650
651     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
652
653     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
654
655     exgroup = parser.add_mutually_exclusive_group()
656     exgroup.add_argument("--enable-reuse", action="store_true",
657                         default=True, dest="enable_reuse",
658                         help="Enable job or container reuse (default)")
659     exgroup.add_argument("--disable-reuse", action="store_false",
660                         default=True, dest="enable_reuse",
661                         help="Disable job or container reuse")
662
663     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
664     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
665     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
666     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
667                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
668                         default=False)
669
670     exgroup = parser.add_mutually_exclusive_group()
671     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
672                         default=True, dest="submit")
673     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
674                         default=True, dest="submit")
675     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
676                          dest="create_workflow")
677     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
678     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
679
680     exgroup = parser.add_mutually_exclusive_group()
681     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
682                         default=True, dest="wait")
683     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
684                         default=True, dest="wait")
685
686     exgroup = parser.add_mutually_exclusive_group()
687     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
688                         default=True, dest="log_timestamps")
689     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
690                         default=True, dest="log_timestamps")
691
692     parser.add_argument("--api", type=str,
693                         default=None, dest="work_api",
694                         choices=("jobs", "containers"),
695                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
696
697     parser.add_argument("--compute-checksum", action="store_true", default=False,
698                         help="Compute checksum of contents while collecting outputs",
699                         dest="compute_checksum")
700
701     parser.add_argument("--submit-runner-ram", type=int,
702                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
703                         default=1024)
704
705     parser.add_argument("--submit-runner-image", type=str,
706                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
707                         default=None)
708
709     parser.add_argument("--name", type=str,
710                         help="Name to use for workflow execution instance.",
711                         default=None)
712
713     parser.add_argument("--on-error", type=str,
714                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
715                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
716
717     parser.add_argument("--enable-dev", action="store_true",
718                         help="Enable loading and running development versions "
719                              "of CWL spec.", default=False)
720
721     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
722                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
723                         default=0)
724
725     parser.add_argument("--priority", type=int,
726                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
727                         default=DEFAULT_PRIORITY)
728
729     parser.add_argument("--disable-validate", dest="do_validate",
730                         action="store_false", default=True,
731                         help=argparse.SUPPRESS)
732
733     parser.add_argument("--disable-js-validation",
734                         action="store_true", default=False,
735                         help=argparse.SUPPRESS)
736
737     parser.add_argument("--thread-count", type=int,
738                         default=4, help="Number of threads to use for job submit and output collection.")
739
740     exgroup = parser.add_mutually_exclusive_group()
741     exgroup.add_argument("--trash-intermediate", action="store_true",
742                         default=False, dest="trash_intermediate",
743                          help="Immediately trash intermediate outputs on workflow success.")
744     exgroup.add_argument("--no-trash-intermediate", action="store_false",
745                         default=False, dest="trash_intermediate",
746                         help="Do not trash intermediate outputs (default).")
747
748     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
749     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
750
751     return parser
752
753 def add_arv_hints():
754     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
755     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
756     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
757     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
758     res.close()
759     cwltool.process.supportedProcessRequirements.extend([
760         "http://arvados.org/cwl#RunInSingleContainer",
761         "http://arvados.org/cwl#OutputDirType",
762         "http://arvados.org/cwl#RuntimeConstraints",
763         "http://arvados.org/cwl#PartitionRequirement",
764         "http://arvados.org/cwl#APIRequirement",
765         "http://commonwl.org/cwltool#LoadListingRequirement",
766         "http://arvados.org/cwl#IntermediateOutput",
767         "http://arvados.org/cwl#ReuseRequirement"
768     ])
769
770 def exit_signal_handler(sigcode, frame):
771     logger.error("Caught signal {}, exiting.".format(sigcode))
772     sys.exit(-sigcode)
773
774 def main(args, stdout, stderr, api_client=None, keep_client=None,
775          install_sig_handlers=True):
776     parser = arg_parser()
777
778     job_order_object = None
779     arvargs = parser.parse_args(args)
780
781     if install_sig_handlers:
782         arv_cmd.install_signal_handlers()
783
784     if arvargs.update_workflow:
785         if arvargs.update_workflow.find('-7fd4e-') == 5:
786             want_api = 'containers'
787         elif arvargs.update_workflow.find('-p5p6p-') == 5:
788             want_api = 'jobs'
789         else:
790             want_api = None
791         if want_api and arvargs.work_api and want_api != arvargs.work_api:
792             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
793                 arvargs.update_workflow, want_api, arvargs.work_api))
794             return 1
795         arvargs.work_api = want_api
796
797     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
798         job_order_object = ({}, "")
799
800     add_arv_hints()
801
802     try:
803         if api_client is None:
804             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
805             keep_client = api_client.keep
806         if keep_client is None:
807             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
808         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
809                               num_retries=4, output_name=arvargs.output_name,
810                               output_tags=arvargs.output_tags,
811                               thread_count=arvargs.thread_count)
812     except Exception as e:
813         logger.error(e)
814         return 1
815
816     if arvargs.debug:
817         logger.setLevel(logging.DEBUG)
818         logging.getLogger('arvados').setLevel(logging.DEBUG)
819
820     if arvargs.quiet:
821         logger.setLevel(logging.WARN)
822         logging.getLogger('arvados').setLevel(logging.WARN)
823         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
824
825     if arvargs.metrics:
826         metrics.setLevel(logging.DEBUG)
827         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
828
829     if arvargs.log_timestamps:
830         arvados.log_handler.setFormatter(logging.Formatter(
831             '%(asctime)s %(name)s %(levelname)s: %(message)s',
832             '%Y-%m-%d %H:%M:%S'))
833     else:
834         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
835
836     arvargs.conformance_test = None
837     arvargs.use_container = True
838     arvargs.relax_path_checks = True
839     arvargs.print_supported_versions = False
840
841     make_fs_access = partial(CollectionFsAccess,
842                            collection_cache=runner.collection_cache)
843
844     return cwltool.main.main(args=arvargs,
845                              stdout=stdout,
846                              stderr=stderr,
847                              executor=runner.arv_executor,
848                              makeTool=runner.arv_make_tool,
849                              versionfunc=versionstring,
850                              job_order_object=job_order_object,
851                              make_fs_access=make_fs_access,
852                              fetcher_constructor=partial(CollectionFetcher,
853                                                          api_client=api_client,
854                                                          fs_access=make_fs_access(""),
855                                                          num_retries=runner.num_retries),
856                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
857                              logger_handler=arvados.log_handler,
858                              custom_schema_callback=add_arv_hints)