change http timeout keyword from `http-timeout` to `http_timeout`
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31
32 import arvados
33 import arvados.config
34 from arvados.keep import KeepClient
35 from arvados.errors import ApiError
36 import arvados.commands._util as arv_cmd
37
38 from .arvcontainer import ArvadosContainer, RunnerContainer
39 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
40 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
41 from .arvtool import ArvadosCommandTool
42 from .arvworkflow import ArvadosWorkflow, upload_workflow
43 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
44 from .perf import Perf
45 from .pathmapper import NoFollowPathMapper
46 from .task_queue import TaskQueue
47 from ._version import __version__
48
49 from cwltool.pack import pack
50 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
51 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
52 from cwltool.command_line_tool import compute_checksums
53 from arvados.api import OrderedJsonModel
54
55 logger = logging.getLogger('arvados.cwl-runner')
56 metrics = logging.getLogger('arvados.cwl-runner.metrics')
57 logger.setLevel(logging.INFO)
58
59 arvados.log_handler.setFormatter(logging.Formatter(
60         '%(asctime)s %(name)s %(levelname)s: %(message)s',
61         '%Y-%m-%d %H:%M:%S'))
62
63 DEFAULT_PRIORITY = 500
64
65 class ArvCwlRunner(object):
66     """Execute a CWL tool or workflow, submit work (using either jobs or
67     containers API), wait for them to complete, and report output.
68
69     """
70
71     def __init__(self, api_client, work_api=None, keep_client=None,
72                  output_name=None, output_tags=None, num_retries=4,
73                  thread_count=4):
74         self.api = api_client
75         self.processes = {}
76         self.workflow_eval_lock = threading.Condition(threading.RLock())
77         self.final_output = None
78         self.final_status = None
79         self.num_retries = num_retries
80         self.uuid = None
81         self.stop_polling = threading.Event()
82         self.poll_api = None
83         self.pipeline = None
84         self.final_output_collection = None
85         self.output_name = output_name
86         self.output_tags = output_tags
87         self.project_uuid = None
88         self.intermediate_output_ttl = 0
89         self.intermediate_output_collections = []
90         self.trash_intermediate = False
91         self.thread_count = thread_count
92         self.poll_interval = 12
93
94         if keep_client is not None:
95             self.keep_client = keep_client
96         else:
97             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
98
99         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
100
101         self.fetcher_constructor = partial(CollectionFetcher,
102                                            api_client=self.api,
103                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
104                                            num_retries=self.num_retries)
105
106         self.work_api = None
107         expected_api = ["jobs", "containers"]
108         for api in expected_api:
109             try:
110                 methods = self.api._rootDesc.get('resources')[api]['methods']
111                 if ('httpMethod' in methods['create'] and
112                     (work_api == api or work_api is None)):
113                     self.work_api = api
114                     break
115             except KeyError:
116                 pass
117
118         if not self.work_api:
119             if work_api is None:
120                 raise Exception("No supported APIs")
121             else:
122                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
123
124     def arv_make_tool(self, toolpath_object, **kwargs):
125         kwargs["work_api"] = self.work_api
126         kwargs["fetcher_constructor"] = self.fetcher_constructor
127         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
128         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
129             return ArvadosCommandTool(self, toolpath_object, **kwargs)
130         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
131             return ArvadosWorkflow(self, toolpath_object, **kwargs)
132         else:
133             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
134
135     def output_callback(self, out, processStatus):
136         with self.workflow_eval_lock:
137             if processStatus == "success":
138                 logger.info("Overall process status is %s", processStatus)
139                 if self.pipeline:
140                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
141                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
142             else:
143                 logger.warn("Overall process status is %s", processStatus)
144                 if self.pipeline:
145                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
146                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
147             self.final_status = processStatus
148             self.final_output = out
149             self.workflow_eval_lock.notifyAll()
150
151
152     def start_run(self, runnable, kwargs):
153         self.task_queue.add(partial(runnable.run, **kwargs))
154
155     def process_submitted(self, container):
156         with self.workflow_eval_lock:
157             self.processes[container.uuid] = container
158
159     def process_done(self, uuid, record):
160         with self.workflow_eval_lock:
161             j = self.processes[uuid]
162             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
163             self.task_queue.add(partial(j.done, record))
164             del self.processes[uuid]
165
166     def wrapped_callback(self, cb, obj, st):
167         with self.workflow_eval_lock:
168             cb(obj, st)
169             self.workflow_eval_lock.notifyAll()
170
171     def get_wrapped_callback(self, cb):
172         return partial(self.wrapped_callback, cb)
173
174     def on_message(self, event):
175         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
176             uuid = event["object_uuid"]
177             if event["properties"]["new_attributes"]["state"] == "Running":
178                 with self.workflow_eval_lock:
179                     j = self.processes[uuid]
180                     if j.running is False:
181                         j.running = True
182                         j.update_pipeline_component(event["properties"]["new_attributes"])
183                         logger.info("%s %s is Running", self.label(j), uuid)
184             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
185                 self.process_done(uuid, event["properties"]["new_attributes"])
186
187     def label(self, obj):
188         return "[%s %s]" % (self.work_api[0:-1], obj.name)
189
190     def poll_states(self):
191         """Poll status of jobs or containers listed in the processes dict.
192
193         Runs in a separate thread.
194         """
195
196         try:
197             remain_wait = self.poll_interval
198             while True:
199                 if remain_wait > 0:
200                     self.stop_polling.wait(remain_wait)
201                 if self.stop_polling.is_set():
202                     break
203                 with self.workflow_eval_lock:
204                     keys = list(self.processes.keys())
205                 if not keys:
206                     remain_wait = self.poll_interval
207                     continue
208
209                 begin_poll = time.time()
210                 if self.work_api == "containers":
211                     table = self.poll_api.container_requests()
212                 elif self.work_api == "jobs":
213                     table = self.poll_api.jobs()
214
215                 try:
216                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
217                 except Exception as e:
218                     logger.warn("Error checking states on API server: %s", e)
219                     remain_wait = self.poll_interval
220                     continue
221
222                 for p in proc_states["items"]:
223                     self.on_message({
224                         "object_uuid": p["uuid"],
225                         "event_type": "update",
226                         "properties": {
227                             "new_attributes": p
228                         }
229                     })
230                 finish_poll = time.time()
231                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
232         except:
233             logger.exception("Fatal error in state polling thread.")
234             with self.workflow_eval_lock:
235                 self.processes.clear()
236                 self.workflow_eval_lock.notifyAll()
237         finally:
238             self.stop_polling.set()
239
240     def add_intermediate_output(self, uuid):
241         if uuid:
242             self.intermediate_output_collections.append(uuid)
243
244     def trash_intermediate_output(self):
245         logger.info("Cleaning up intermediate output collections")
246         for i in self.intermediate_output_collections:
247             try:
248                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
249             except:
250                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
251             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
252                 break
253
254     def check_features(self, obj):
255         if isinstance(obj, dict):
256             if obj.get("writable") and self.work_api != "containers":
257                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
258             if obj.get("class") == "DockerRequirement":
259                 if obj.get("dockerOutputDirectory"):
260                     if self.work_api != "containers":
261                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
262                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
263                     if not obj.get("dockerOutputDirectory").startswith('/'):
264                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
265                             "Option 'dockerOutputDirectory' must be an absolute path.")
266             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
267                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
268             for v in obj.itervalues():
269                 self.check_features(v)
270         elif isinstance(obj, list):
271             for i,v in enumerate(obj):
272                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
273                     self.check_features(v)
274
275     def make_output_collection(self, name, tagsString, outputObj):
276         outputObj = copy.deepcopy(outputObj)
277
278         files = []
279         def capture(fileobj):
280             files.append(fileobj)
281
282         adjustDirObjs(outputObj, capture)
283         adjustFileObjs(outputObj, capture)
284
285         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
286
287         final = arvados.collection.Collection(api_client=self.api,
288                                               keep_client=self.keep_client,
289                                               num_retries=self.num_retries)
290
291         for k,v in generatemapper.items():
292             if k.startswith("_:"):
293                 if v.type == "Directory":
294                     continue
295                 if v.type == "CreateFile":
296                     with final.open(v.target, "wb") as f:
297                         f.write(v.resolved.encode("utf-8"))
298                     continue
299
300             if not k.startswith("keep:"):
301                 raise Exception("Output source is not in keep or a literal")
302             sp = k.split("/")
303             srccollection = sp[0][5:]
304             try:
305                 reader = self.collection_cache.get(srccollection)
306                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
307                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
308             except arvados.errors.ArgumentError as e:
309                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
310                 raise
311             except IOError as e:
312                 logger.warn("While preparing output collection: %s", e)
313
314         def rewrite(fileobj):
315             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
316             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
317                 if k in fileobj:
318                     del fileobj[k]
319
320         adjustDirObjs(outputObj, rewrite)
321         adjustFileObjs(outputObj, rewrite)
322
323         with final.open("cwl.output.json", "w") as f:
324             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
325
326         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
327
328         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
329                     final.api_response()["name"],
330                     final.manifest_locator())
331
332         final_uuid = final.manifest_locator()
333         tags = tagsString.split(',')
334         for tag in tags:
335              self.api.links().create(body={
336                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
337                 }).execute(num_retries=self.num_retries)
338
339         def finalcollection(fileobj):
340             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
341
342         adjustDirObjs(outputObj, finalcollection)
343         adjustFileObjs(outputObj, finalcollection)
344
345         return (outputObj, final)
346
347     def set_crunch_output(self):
348         if self.work_api == "containers":
349             try:
350                 current = self.api.containers().current().execute(num_retries=self.num_retries)
351             except ApiError as e:
352                 # Status code 404 just means we're not running in a container.
353                 if e.resp.status != 404:
354                     logger.info("Getting current container: %s", e)
355                 return
356             try:
357                 self.api.containers().update(uuid=current['uuid'],
358                                              body={
359                                                  'output': self.final_output_collection.portable_data_hash(),
360                                              }).execute(num_retries=self.num_retries)
361                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
362                                               body={
363                                                   'is_trashed': True
364                                               }).execute(num_retries=self.num_retries)
365             except Exception as e:
366                 logger.info("Setting container output: %s", e)
367         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
368             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
369                                    body={
370                                        'output': self.final_output_collection.portable_data_hash(),
371                                        'success': self.final_status == "success",
372                                        'progress':1.0
373                                    }).execute(num_retries=self.num_retries)
374
375     def arv_executor(self, tool, job_order, **kwargs):
376         self.debug = kwargs.get("debug")
377
378         tool.visit(self.check_features)
379
380         self.project_uuid = kwargs.get("project_uuid")
381         self.pipeline = None
382         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
383                                                                  collection_cache=self.collection_cache)
384         self.fs_access = make_fs_access(kwargs["basedir"])
385         self.secret_store = kwargs.get("secret_store")
386
387         self.trash_intermediate = kwargs["trash_intermediate"]
388         if self.trash_intermediate and self.work_api != "containers":
389             raise Exception("--trash-intermediate is only supported with --api=containers.")
390
391         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
392         if self.intermediate_output_ttl and self.work_api != "containers":
393             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
394         if self.intermediate_output_ttl < 0:
395             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
396
397         if kwargs.get("submit_request_uuid") and self.work_api != "containers":
398             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
399
400         if not kwargs.get("name"):
401             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
402
403         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
404         # Also uploads docker images.
405         merged_map = upload_workflow_deps(self, tool)
406
407         # Reload tool object which may have been updated by
408         # upload_workflow_deps
409         # Don't validate this time because it will just print redundant errors.
410         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
411                                   makeTool=self.arv_make_tool,
412                                   loader=tool.doc_loader,
413                                   avsc_names=tool.doc_schema,
414                                   metadata=tool.metadata,
415                                   do_validate=False)
416
417         # Upload local file references in the job order.
418         job_order = upload_job_order(self, "%s input" % kwargs["name"],
419                                      tool, job_order)
420
421         existing_uuid = kwargs.get("update_workflow")
422         if existing_uuid or kwargs.get("create_workflow"):
423             # Create a pipeline template or workflow record and exit.
424             if self.work_api == "jobs":
425                 tmpl = RunnerTemplate(self, tool, job_order,
426                                       kwargs.get("enable_reuse"),
427                                       uuid=existing_uuid,
428                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
429                                       name=kwargs["name"],
430                                       merged_map=merged_map)
431                 tmpl.save()
432                 # cwltool.main will write our return value to stdout.
433                 return (tmpl.uuid, "success")
434             elif self.work_api == "containers":
435                 return (upload_workflow(self, tool, job_order,
436                                         self.project_uuid,
437                                         uuid=existing_uuid,
438                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
439                                         name=kwargs["name"],
440                                         merged_map=merged_map),
441                         "success")
442
443         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
444         self.eval_timeout = kwargs.get("eval_timeout")
445
446         kwargs["make_fs_access"] = make_fs_access
447         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
448         kwargs["use_container"] = True
449         kwargs["tmpdir_prefix"] = "tmp"
450         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
451
452         if self.work_api == "containers":
453             if self.ignore_docker_for_reuse:
454                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
455             kwargs["outdir"] = "/var/spool/cwl"
456             kwargs["docker_outdir"] = "/var/spool/cwl"
457             kwargs["tmpdir"] = "/tmp"
458             kwargs["docker_tmpdir"] = "/tmp"
459         elif self.work_api == "jobs":
460             if kwargs["priority"] != DEFAULT_PRIORITY:
461                 raise Exception("--priority not implemented for jobs API.")
462             kwargs["outdir"] = "$(task.outdir)"
463             kwargs["docker_outdir"] = "$(task.outdir)"
464             kwargs["tmpdir"] = "$(task.tmpdir)"
465
466         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
467             raise Exception("--priority must be in the range 1..1000.")
468
469         runnerjob = None
470         if kwargs.get("submit"):
471             # Submit a runner job to run the workflow for us.
472             if self.work_api == "containers":
473                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
474                     kwargs["runnerjob"] = tool.tool["id"]
475                     runnerjob = tool.job(job_order,
476                                          self.output_callback,
477                                          **kwargs).next()
478                 else:
479                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
480                                                 self.output_name,
481                                                 self.output_tags,
482                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
483                                                 name=kwargs.get("name"),
484                                                 on_error=kwargs.get("on_error"),
485                                                 submit_runner_image=kwargs.get("submit_runner_image"),
486                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
487                                                 merged_map=merged_map,
488                                                 priority=kwargs.get("priority"),
489                                                 secret_store=self.secret_store)
490             elif self.work_api == "jobs":
491                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
492                                       self.output_name,
493                                       self.output_tags,
494                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
495                                       name=kwargs.get("name"),
496                                       on_error=kwargs.get("on_error"),
497                                       submit_runner_image=kwargs.get("submit_runner_image"),
498                                       merged_map=merged_map)
499         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
500             # Create pipeline for local run
501             self.pipeline = self.api.pipeline_instances().create(
502                 body={
503                     "owner_uuid": self.project_uuid,
504                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
505                     "components": {},
506                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
507             logger.info("Pipeline instance %s", self.pipeline["uuid"])
508
509         if runnerjob and not kwargs.get("wait"):
510             submitargs = kwargs.copy()
511             submitargs['submit'] = False
512             runnerjob.run(**submitargs)
513             return (runnerjob.uuid, "success")
514
515         self.poll_api = arvados.api('v1', timeout=kwargs["http_timeout"])
516         self.polling_thread = threading.Thread(target=self.poll_states)
517         self.polling_thread.start()
518
519         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
520
521         if runnerjob:
522             jobiter = iter((runnerjob,))
523         else:
524             if "cwl_runner_job" in kwargs:
525                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
526             jobiter = tool.job(job_order,
527                                self.output_callback,
528                                **kwargs)
529
530         try:
531             self.workflow_eval_lock.acquire()
532             # Holds the lock while this code runs and releases it when
533             # it is safe to do so in self.workflow_eval_lock.wait(),
534             # at which point on_message can update job state and
535             # process output callbacks.
536
537             loopperf = Perf(metrics, "jobiter")
538             loopperf.__enter__()
539             for runnable in jobiter:
540                 loopperf.__exit__()
541
542                 if self.stop_polling.is_set():
543                     break
544
545                 if self.task_queue.error is not None:
546                     raise self.task_queue.error
547
548                 if runnable:
549                     with Perf(metrics, "run"):
550                         self.start_run(runnable, kwargs)
551                 else:
552                     if (self.task_queue.in_flight + len(self.processes)) > 0:
553                         self.workflow_eval_lock.wait(3)
554                     else:
555                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
556                         break
557                 loopperf.__enter__()
558             loopperf.__exit__()
559
560             while (self.task_queue.in_flight + len(self.processes)) > 0:
561                 if self.task_queue.error is not None:
562                     raise self.task_queue.error
563                 self.workflow_eval_lock.wait(3)
564
565         except UnsupportedRequirement:
566             raise
567         except:
568             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
569                 logger.error("Interrupted, workflow will be cancelled")
570             else:
571                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
572             if self.pipeline:
573                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
574                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
575             if runnerjob and runnerjob.uuid and self.work_api == "containers":
576                 self.api.container_requests().update(uuid=runnerjob.uuid,
577                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
578         finally:
579             self.workflow_eval_lock.release()
580             self.task_queue.drain()
581             self.stop_polling.set()
582             self.polling_thread.join()
583             self.task_queue.join()
584
585         if self.final_status == "UnsupportedRequirement":
586             raise UnsupportedRequirement("Check log for details.")
587
588         if self.final_output is None:
589             raise WorkflowException("Workflow did not return a result.")
590
591         if kwargs.get("submit") and isinstance(runnerjob, Runner):
592             logger.info("Final output collection %s", runnerjob.final_output)
593         else:
594             if self.output_name is None:
595                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
596             if self.output_tags is None:
597                 self.output_tags = ""
598             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
599             self.set_crunch_output()
600
601         if kwargs.get("compute_checksum"):
602             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
603             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
604
605         if self.trash_intermediate and self.final_status == "success":
606             self.trash_intermediate_output()
607
608         return (self.final_output, self.final_status)
609
610
611 def versionstring():
612     """Print version string of key packages for provenance and debugging."""
613
614     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
615     arvpkg = pkg_resources.require("arvados-python-client")
616     cwlpkg = pkg_resources.require("cwltool")
617
618     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
619                                     "arvados-python-client", arvpkg[0].version,
620                                     "cwltool", cwlpkg[0].version)
621
622
623 def arg_parser():  # type: () -> argparse.ArgumentParser
624     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
625
626     parser.add_argument("--basedir", type=str,
627                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
628     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
629                         help="Output directory, default current directory")
630
631     parser.add_argument("--eval-timeout",
632                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
633                         type=float,
634                         default=20)
635
636     exgroup = parser.add_mutually_exclusive_group()
637     exgroup.add_argument("--print-dot", action="store_true",
638                          help="Print workflow visualization in graphviz format and exit")
639     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
640     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
641
642     exgroup = parser.add_mutually_exclusive_group()
643     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
644     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
645     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
646
647     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
648
649     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
650
651     exgroup = parser.add_mutually_exclusive_group()
652     exgroup.add_argument("--enable-reuse", action="store_true",
653                         default=True, dest="enable_reuse",
654                         help="Enable job or container reuse (default)")
655     exgroup.add_argument("--disable-reuse", action="store_false",
656                         default=True, dest="enable_reuse",
657                         help="Disable job or container reuse")
658
659     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
660     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
661     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
662     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
663                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
664                         default=False)
665
666     exgroup = parser.add_mutually_exclusive_group()
667     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
668                         default=True, dest="submit")
669     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
670                         default=True, dest="submit")
671     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
672                          dest="create_workflow")
673     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
674     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
675
676     exgroup = parser.add_mutually_exclusive_group()
677     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
678                         default=True, dest="wait")
679     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
680                         default=True, dest="wait")
681
682     exgroup = parser.add_mutually_exclusive_group()
683     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
684                         default=True, dest="log_timestamps")
685     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
686                         default=True, dest="log_timestamps")
687
688     parser.add_argument("--api", type=str,
689                         default=None, dest="work_api",
690                         choices=("jobs", "containers"),
691                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
692
693     parser.add_argument("--compute-checksum", action="store_true", default=False,
694                         help="Compute checksum of contents while collecting outputs",
695                         dest="compute_checksum")
696
697     parser.add_argument("--submit-runner-ram", type=int,
698                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
699                         default=1024)
700
701     parser.add_argument("--submit-runner-image", type=str,
702                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
703                         default=None)
704
705     parser.add_argument("--submit-request-uuid", type=str,
706                         default=None,
707                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
708
709     parser.add_argument("--name", type=str,
710                         help="Name to use for workflow execution instance.",
711                         default=None)
712
713     parser.add_argument("--on-error", type=str,
714                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
715                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
716
717     parser.add_argument("--enable-dev", action="store_true",
718                         help="Enable loading and running development versions "
719                              "of CWL spec.", default=False)
720
721     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
722                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
723                         default=0)
724
725     parser.add_argument("--priority", type=int,
726                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
727                         default=DEFAULT_PRIORITY)
728
729     parser.add_argument("--disable-validate", dest="do_validate",
730                         action="store_false", default=True,
731                         help=argparse.SUPPRESS)
732
733     parser.add_argument("--disable-js-validation",
734                         action="store_true", default=False,
735                         help=argparse.SUPPRESS)
736
737     parser.add_argument("--thread-count", type=int,
738                         default=4, help="Number of threads to use for job submit and output collection.")
739
740     parser.add_argument("--http-timeout", type=int,
741                         default=5*60, dest="http_timeout", help="Http timeout. Default is 5 minutes.")
742
743     exgroup = parser.add_mutually_exclusive_group()
744     exgroup.add_argument("--trash-intermediate", action="store_true",
745                         default=False, dest="trash_intermediate",
746                          help="Immediately trash intermediate outputs on workflow success.")
747     exgroup.add_argument("--no-trash-intermediate", action="store_false",
748                         default=False, dest="trash_intermediate",
749                         help="Do not trash intermediate outputs (default).")
750
751     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
752     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
753
754     return parser
755
756 def add_arv_hints():
757     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
758     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
759     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
760     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
761     res.close()
762     cwltool.process.supportedProcessRequirements.extend([
763         "http://arvados.org/cwl#RunInSingleContainer",
764         "http://arvados.org/cwl#OutputDirType",
765         "http://arvados.org/cwl#RuntimeConstraints",
766         "http://arvados.org/cwl#PartitionRequirement",
767         "http://arvados.org/cwl#APIRequirement",
768         "http://commonwl.org/cwltool#LoadListingRequirement",
769         "http://arvados.org/cwl#IntermediateOutput",
770         "http://arvados.org/cwl#ReuseRequirement"
771     ])
772
773 def exit_signal_handler(sigcode, frame):
774     logger.error("Caught signal {}, exiting.".format(sigcode))
775     sys.exit(-sigcode)
776
777 def main(args, stdout, stderr, api_client=None, keep_client=None,
778          install_sig_handlers=True):
779     parser = arg_parser()
780
781     job_order_object = None
782     arvargs = parser.parse_args(args)
783
784     if install_sig_handlers:
785         arv_cmd.install_signal_handlers()
786
787     if arvargs.update_workflow:
788         if arvargs.update_workflow.find('-7fd4e-') == 5:
789             want_api = 'containers'
790         elif arvargs.update_workflow.find('-p5p6p-') == 5:
791             want_api = 'jobs'
792         else:
793             want_api = None
794         if want_api and arvargs.work_api and want_api != arvargs.work_api:
795             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
796                 arvargs.update_workflow, want_api, arvargs.work_api))
797             return 1
798         arvargs.work_api = want_api
799
800     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
801         job_order_object = ({}, "")
802
803     add_arv_hints()
804
805     try:
806         if api_client is None:
807             api_client = arvados.safeapi.ThreadSafeApiCache(
808                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
809                 keep_params={"num_retries": 4})
810             keep_client = api_client.keep
811         if keep_client is None:
812             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
813         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
814                               num_retries=4, output_name=arvargs.output_name,
815                               output_tags=arvargs.output_tags,
816                               thread_count=arvargs.thread_count)
817     except Exception as e:
818         logger.error(e)
819         return 1
820
821     if arvargs.debug:
822         logger.setLevel(logging.DEBUG)
823         logging.getLogger('arvados').setLevel(logging.DEBUG)
824
825     if arvargs.quiet:
826         logger.setLevel(logging.WARN)
827         logging.getLogger('arvados').setLevel(logging.WARN)
828         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
829
830     if arvargs.metrics:
831         metrics.setLevel(logging.DEBUG)
832         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
833
834     if arvargs.log_timestamps:
835         arvados.log_handler.setFormatter(logging.Formatter(
836             '%(asctime)s %(name)s %(levelname)s: %(message)s',
837             '%Y-%m-%d %H:%M:%S'))
838     else:
839         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
840
841     arvargs.conformance_test = None
842     arvargs.use_container = True
843     arvargs.relax_path_checks = True
844     arvargs.print_supported_versions = False
845
846     make_fs_access = partial(CollectionFsAccess,
847                            collection_cache=runner.collection_cache)
848
849     return cwltool.main.main(args=arvargs,
850                              stdout=stdout,
851                              stderr=stderr,
852                              executor=runner.arv_executor,
853                              makeTool=runner.arv_make_tool,
854                              versionfunc=versionstring,
855                              job_order_object=job_order_object,
856                              make_fs_access=make_fs_access,
857                              fetcher_constructor=partial(CollectionFetcher,
858                                                          api_client=api_client,
859                                                          fs_access=make_fs_access(""),
860                                                          num_retries=runner.num_retries),
861                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
862                              logger_handler=arvados.log_handler,
863                              custom_schema_callback=add_arv_hints)