13773: Updates tests checking that runtime_status_error() is called when needed
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31 import cwltool.argparser
32
33 import arvados
34 import arvados.config
35 from arvados.keep import KeepClient
36 from arvados.errors import ApiError
37 import arvados.commands._util as arv_cmd
38
39 from .arvcontainer import ArvadosContainer, RunnerContainer
40 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
41 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
42 from .arvtool import ArvadosCommandTool
43 from .arvworkflow import ArvadosWorkflow, upload_workflow
44 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
45 from .perf import Perf
46 from .pathmapper import NoFollowPathMapper
47 from .task_queue import TaskQueue
48 from .context import ArvLoadingContext, ArvRuntimeContext
49 from ._version import __version__
50
51 from cwltool.pack import pack
52 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
53 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
54 from cwltool.command_line_tool import compute_checksums
55
56 from arvados.api import OrderedJsonModel
57
58 logger = logging.getLogger('arvados.cwl-runner')
59 metrics = logging.getLogger('arvados.cwl-runner.metrics')
60 logger.setLevel(logging.INFO)
61
62 arvados.log_handler.setFormatter(logging.Formatter(
63         '%(asctime)s %(name)s %(levelname)s: %(message)s',
64         '%Y-%m-%d %H:%M:%S'))
65
66 DEFAULT_PRIORITY = 500
67
68 class ArvCwlRunner(object):
69     """Execute a CWL tool or workflow, submit work (using either jobs or
70     containers API), wait for them to complete, and report output.
71
72     """
73
74     def __init__(self, api_client,
75                  arvargs=None,
76                  keep_client=None,
77                  num_retries=4,
78                  thread_count=4):
79
80         if arvargs is None:
81             arvargs = argparse.Namespace()
82             arvargs.work_api = None
83             arvargs.output_name = None
84             arvargs.output_tags = None
85             arvargs.thread_count = 1
86
87         self.api = api_client
88         self.processes = {}
89         self.workflow_eval_lock = threading.Condition(threading.RLock())
90         self.final_output = None
91         self.final_status = None
92         self.num_retries = num_retries
93         self.uuid = None
94         self.stop_polling = threading.Event()
95         self.poll_api = None
96         self.pipeline = None
97         self.final_output_collection = None
98         self.output_name = arvargs.output_name
99         self.output_tags = arvargs.output_tags
100         self.project_uuid = None
101         self.intermediate_output_ttl = 0
102         self.intermediate_output_collections = []
103         self.trash_intermediate = False
104         self.thread_count = arvargs.thread_count
105         self.poll_interval = 12
106         self.loadingContext = None
107
108         if keep_client is not None:
109             self.keep_client = keep_client
110         else:
111             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
112
113         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
114
115         self.fetcher_constructor = partial(CollectionFetcher,
116                                            api_client=self.api,
117                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
118                                            num_retries=self.num_retries)
119
120         self.work_api = None
121         expected_api = ["jobs", "containers"]
122         for api in expected_api:
123             try:
124                 methods = self.api._rootDesc.get('resources')[api]['methods']
125                 if ('httpMethod' in methods['create'] and
126                     (arvargs.work_api == api or arvargs.work_api is None)):
127                     self.work_api = api
128                     break
129             except KeyError:
130                 pass
131
132         if not self.work_api:
133             if arvargs.work_api is None:
134                 raise Exception("No supported APIs")
135             else:
136                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
137
138         if self.work_api == "jobs":
139             logger.warn("""
140 *******************************
141 Using the deprecated 'jobs' API.
142
143 To get rid of this warning:
144
145 Users: read about migrating at
146 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
147 and use the option --api=containers
148
149 Admins: configure the cluster to disable the 'jobs' API as described at:
150 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
151 *******************************""")
152
153         self.loadingContext = ArvLoadingContext(vars(arvargs))
154         self.loadingContext.fetcher_constructor = self.fetcher_constructor
155         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
156         self.loadingContext.construct_tool_object = self.arv_make_tool
157
158
159     def arv_make_tool(self, toolpath_object, loadingContext):
160         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
161             return ArvadosCommandTool(self, toolpath_object, loadingContext)
162         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
163             return ArvadosWorkflow(self, toolpath_object, loadingContext)
164         else:
165             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
166
167     def output_callback(self, out, processStatus):
168         with self.workflow_eval_lock:
169             if processStatus == "success":
170                 logger.info("Overall process status is %s", processStatus)
171                 state = "Complete"
172             else:
173                 logger.error("Overall process status is %s", processStatus)
174                 state = "Failed"
175             if self.pipeline:
176                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
177                                                         body={"state": state}).execute(num_retries=self.num_retries)
178             self.final_status = processStatus
179             self.final_output = out
180             self.workflow_eval_lock.notifyAll()
181
182
183     def start_run(self, runnable, runtimeContext):
184         self.task_queue.add(partial(runnable.run, runtimeContext))
185
186     def process_submitted(self, container):
187         with self.workflow_eval_lock:
188             self.processes[container.uuid] = container
189
190     def process_done(self, uuid, record):
191         with self.workflow_eval_lock:
192             j = self.processes[uuid]
193             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
194             self.task_queue.add(partial(j.done, record))
195             del self.processes[uuid]
196
197     def runtime_status_error(self, child_label, child_uuid, error_log):
198         """
199         Called from a failing child container. Records the first child error
200         on this runner's runtime_status field.
201         On subsequent errors, updates the 'error' key to show how many additional
202         failures happened.
203         """
204         error_msg = "%s %s failed" % (child_label, child_uuid)
205         logger.info(error_msg)
206         with self.workflow_eval_lock:
207             try:
208                 current = self.api.containers().current().execute(num_retries=self.num_retries)
209             except ApiError as e:
210                 # Status code 404 just means we're not running in a container.
211                 if e.resp.status != 404:
212                     logger.info("Getting current container: %s", e)
213                 return
214             runtime_status = current.get('runtime_status', {})
215             # Save first fatal error
216             if not runtime_status.get('error'):
217                 runtime_status.update({
218                     'error': error_msg,
219                     'errorDetail': error_log or "No error logs available"
220                 })
221             # Further errors are only mentioned as a count
222             else:
223                 error_msg = re.match(
224                     r'^(.*failed)\s*\(?', runtime_status.get('error')).groups()[0]
225                 more_failures = re.match(
226                     r'.*\(.*(\d+) more\)', runtime_status.get('error'))
227                 if more_failures:
228                     failure_qty = int(more_failures.groups()[0])
229                     runtime_status.update({
230                         'error': "%s (and %d more)" % (error_msg, failure_qty+1)
231                     })
232                 else:
233                     runtime_status.update({
234                         'error': "%s (and 1 more)" % error_msg
235                     })
236             try:
237                 self.api.containers().update(uuid=current['uuid'],
238                                             body={
239                                                 'runtime_status': runtime_status,
240                                             }).execute(num_retries=self.num_retries)
241             except Exception as e:
242                 logger.error("Updating runtime_status: %s", e)
243
244     def wrapped_callback(self, cb, obj, st):
245         with self.workflow_eval_lock:
246             cb(obj, st)
247             self.workflow_eval_lock.notifyAll()
248
249     def get_wrapped_callback(self, cb):
250         return partial(self.wrapped_callback, cb)
251
252     def on_message(self, event):
253         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
254             uuid = event["object_uuid"]
255             if event["properties"]["new_attributes"]["state"] == "Running":
256                 with self.workflow_eval_lock:
257                     j = self.processes[uuid]
258                     if j.running is False:
259                         j.running = True
260                         j.update_pipeline_component(event["properties"]["new_attributes"])
261                         logger.info("%s %s is Running", self.label(j), uuid)
262             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
263                 self.process_done(uuid, event["properties"]["new_attributes"])
264
265     def label(self, obj):
266         return "[%s %s]" % (self.work_api[0:-1], obj.name)
267
268     def poll_states(self):
269         """Poll status of jobs or containers listed in the processes dict.
270
271         Runs in a separate thread.
272         """
273
274         try:
275             remain_wait = self.poll_interval
276             while True:
277                 if remain_wait > 0:
278                     self.stop_polling.wait(remain_wait)
279                 if self.stop_polling.is_set():
280                     break
281                 with self.workflow_eval_lock:
282                     keys = list(self.processes.keys())
283                 if not keys:
284                     remain_wait = self.poll_interval
285                     continue
286
287                 begin_poll = time.time()
288                 if self.work_api == "containers":
289                     table = self.poll_api.container_requests()
290                 elif self.work_api == "jobs":
291                     table = self.poll_api.jobs()
292
293                 try:
294                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
295                 except Exception as e:
296                     logger.warn("Error checking states on API server: %s", e)
297                     remain_wait = self.poll_interval
298                     continue
299
300                 for p in proc_states["items"]:
301                     self.on_message({
302                         "object_uuid": p["uuid"],
303                         "event_type": "update",
304                         "properties": {
305                             "new_attributes": p
306                         }
307                     })
308                 finish_poll = time.time()
309                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
310         except:
311             logger.exception("Fatal error in state polling thread.")
312             with self.workflow_eval_lock:
313                 self.processes.clear()
314                 self.workflow_eval_lock.notifyAll()
315         finally:
316             self.stop_polling.set()
317
318     def add_intermediate_output(self, uuid):
319         if uuid:
320             self.intermediate_output_collections.append(uuid)
321
322     def trash_intermediate_output(self):
323         logger.info("Cleaning up intermediate output collections")
324         for i in self.intermediate_output_collections:
325             try:
326                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
327             except:
328                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
329             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
330                 break
331
332     def check_features(self, obj):
333         if isinstance(obj, dict):
334             if obj.get("writable") and self.work_api != "containers":
335                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
336             if obj.get("class") == "DockerRequirement":
337                 if obj.get("dockerOutputDirectory"):
338                     if self.work_api != "containers":
339                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
340                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
341                     if not obj.get("dockerOutputDirectory").startswith('/'):
342                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
343                             "Option 'dockerOutputDirectory' must be an absolute path.")
344             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
345                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
346             for v in obj.itervalues():
347                 self.check_features(v)
348         elif isinstance(obj, list):
349             for i,v in enumerate(obj):
350                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
351                     self.check_features(v)
352
353     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
354         outputObj = copy.deepcopy(outputObj)
355
356         files = []
357         def capture(fileobj):
358             files.append(fileobj)
359
360         adjustDirObjs(outputObj, capture)
361         adjustFileObjs(outputObj, capture)
362
363         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
364
365         final = arvados.collection.Collection(api_client=self.api,
366                                               keep_client=self.keep_client,
367                                               num_retries=self.num_retries)
368
369         for k,v in generatemapper.items():
370             if k.startswith("_:"):
371                 if v.type == "Directory":
372                     continue
373                 if v.type == "CreateFile":
374                     with final.open(v.target, "wb") as f:
375                         f.write(v.resolved.encode("utf-8"))
376                     continue
377
378             if not k.startswith("keep:"):
379                 raise Exception("Output source is not in keep or a literal")
380             sp = k.split("/")
381             srccollection = sp[0][5:]
382             try:
383                 reader = self.collection_cache.get(srccollection)
384                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
385                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
386             except arvados.errors.ArgumentError as e:
387                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
388                 raise
389             except IOError as e:
390                 logger.warn("While preparing output collection: %s", e)
391
392         def rewrite(fileobj):
393             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
394             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
395                 if k in fileobj:
396                     del fileobj[k]
397
398         adjustDirObjs(outputObj, rewrite)
399         adjustFileObjs(outputObj, rewrite)
400
401         with final.open("cwl.output.json", "w") as f:
402             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
403
404         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
405
406         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
407                     final.api_response()["name"],
408                     final.manifest_locator())
409
410         final_uuid = final.manifest_locator()
411         tags = tagsString.split(',')
412         for tag in tags:
413              self.api.links().create(body={
414                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
415                 }).execute(num_retries=self.num_retries)
416
417         def finalcollection(fileobj):
418             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
419
420         adjustDirObjs(outputObj, finalcollection)
421         adjustFileObjs(outputObj, finalcollection)
422
423         return (outputObj, final)
424
425     def set_crunch_output(self):
426         if self.work_api == "containers":
427             try:
428                 current = self.api.containers().current().execute(num_retries=self.num_retries)
429             except ApiError as e:
430                 # Status code 404 just means we're not running in a container.
431                 if e.resp.status != 404:
432                     logger.info("Getting current container: %s", e)
433                 return
434             try:
435                 self.api.containers().update(uuid=current['uuid'],
436                                              body={
437                                                  'output': self.final_output_collection.portable_data_hash(),
438                                              }).execute(num_retries=self.num_retries)
439                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
440                                               body={
441                                                   'is_trashed': True
442                                               }).execute(num_retries=self.num_retries)
443             except Exception as e:
444                 logger.info("Setting container output: %s", e)
445         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
446             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
447                                    body={
448                                        'output': self.final_output_collection.portable_data_hash(),
449                                        'success': self.final_status == "success",
450                                        'progress':1.0
451                                    }).execute(num_retries=self.num_retries)
452
453     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
454         self.debug = runtimeContext.debug
455
456         tool.visit(self.check_features)
457
458         self.project_uuid = runtimeContext.project_uuid
459         self.pipeline = None
460         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
461         self.secret_store = runtimeContext.secret_store
462
463         self.trash_intermediate = runtimeContext.trash_intermediate
464         if self.trash_intermediate and self.work_api != "containers":
465             raise Exception("--trash-intermediate is only supported with --api=containers.")
466
467         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
468         if self.intermediate_output_ttl and self.work_api != "containers":
469             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
470         if self.intermediate_output_ttl < 0:
471             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
472
473         if runtimeContext.submit_request_uuid and self.work_api != "containers":
474             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
475
476         if not runtimeContext.name:
477             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
478
479         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
480         # Also uploads docker images.
481         merged_map = upload_workflow_deps(self, tool)
482
483         # Reload tool object which may have been updated by
484         # upload_workflow_deps
485         # Don't validate this time because it will just print redundant errors.
486         loadingContext = self.loadingContext.copy()
487         loadingContext.loader = tool.doc_loader
488         loadingContext.avsc_names = tool.doc_schema
489         loadingContext.metadata = tool.metadata
490         loadingContext.do_validate = False
491
492         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
493                                   loadingContext)
494
495         # Upload local file references in the job order.
496         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
497                                      tool, job_order)
498
499         existing_uuid = runtimeContext.update_workflow
500         if existing_uuid or runtimeContext.create_workflow:
501             # Create a pipeline template or workflow record and exit.
502             if self.work_api == "jobs":
503                 tmpl = RunnerTemplate(self, tool, job_order,
504                                       runtimeContext.enable_reuse,
505                                       uuid=existing_uuid,
506                                       submit_runner_ram=runtimeContext.submit_runner_ram,
507                                       name=runtimeContext.name,
508                                       merged_map=merged_map)
509                 tmpl.save()
510                 # cwltool.main will write our return value to stdout.
511                 return (tmpl.uuid, "success")
512             elif self.work_api == "containers":
513                 return (upload_workflow(self, tool, job_order,
514                                         self.project_uuid,
515                                         uuid=existing_uuid,
516                                         submit_runner_ram=runtimeContext.submit_runner_ram,
517                                         name=runtimeContext.name,
518                                         merged_map=merged_map),
519                         "success")
520
521         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
522         self.eval_timeout = runtimeContext.eval_timeout
523
524         runtimeContext = runtimeContext.copy()
525         runtimeContext.use_container = True
526         runtimeContext.tmpdir_prefix = "tmp"
527         runtimeContext.work_api = self.work_api
528
529         if self.work_api == "containers":
530             if self.ignore_docker_for_reuse:
531                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
532             runtimeContext.outdir = "/var/spool/cwl"
533             runtimeContext.docker_outdir = "/var/spool/cwl"
534             runtimeContext.tmpdir = "/tmp"
535             runtimeContext.docker_tmpdir = "/tmp"
536         elif self.work_api == "jobs":
537             if runtimeContext.priority != DEFAULT_PRIORITY:
538                 raise Exception("--priority not implemented for jobs API.")
539             runtimeContext.outdir = "$(task.outdir)"
540             runtimeContext.docker_outdir = "$(task.outdir)"
541             runtimeContext.tmpdir = "$(task.tmpdir)"
542
543         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
544             raise Exception("--priority must be in the range 1..1000.")
545
546         runnerjob = None
547         if runtimeContext.submit:
548             # Submit a runner job to run the workflow for us.
549             if self.work_api == "containers":
550                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
551                     runtimeContext.runnerjob = tool.tool["id"]
552                     runnerjob = tool.job(job_order,
553                                          self.output_callback,
554                                          runtimeContext).next()
555                 else:
556                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
557                                                 self.output_name,
558                                                 self.output_tags,
559                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
560                                                 name=runtimeContext.name,
561                                                 on_error=runtimeContext.on_error,
562                                                 submit_runner_image=runtimeContext.submit_runner_image,
563                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
564                                                 merged_map=merged_map,
565                                                 priority=runtimeContext.priority,
566                                                 secret_store=self.secret_store)
567             elif self.work_api == "jobs":
568                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
569                                       self.output_name,
570                                       self.output_tags,
571                                       submit_runner_ram=runtimeContext.submit_runner_ram,
572                                       name=runtimeContext.name,
573                                       on_error=runtimeContext.on_error,
574                                       submit_runner_image=runtimeContext.submit_runner_image,
575                                       merged_map=merged_map)
576         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
577             # Create pipeline for local run
578             self.pipeline = self.api.pipeline_instances().create(
579                 body={
580                     "owner_uuid": self.project_uuid,
581                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
582                     "components": {},
583                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
584             logger.info("Pipeline instance %s", self.pipeline["uuid"])
585
586         if runnerjob and not runtimeContext.wait:
587             submitargs = runtimeContext.copy()
588             submitargs.submit = False
589             runnerjob.run(submitargs)
590             return (runnerjob.uuid, "success")
591
592         self.poll_api = arvados.api('v1')
593         self.polling_thread = threading.Thread(target=self.poll_states)
594         self.polling_thread.start()
595
596         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
597
598         if runnerjob:
599             jobiter = iter((runnerjob,))
600         else:
601             if runtimeContext.cwl_runner_job is not None:
602                 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
603             jobiter = tool.job(job_order,
604                                self.output_callback,
605                                runtimeContext)
606
607         try:
608             self.workflow_eval_lock.acquire()
609             # Holds the lock while this code runs and releases it when
610             # it is safe to do so in self.workflow_eval_lock.wait(),
611             # at which point on_message can update job state and
612             # process output callbacks.
613
614             loopperf = Perf(metrics, "jobiter")
615             loopperf.__enter__()
616             for runnable in jobiter:
617                 loopperf.__exit__()
618
619                 if self.stop_polling.is_set():
620                     break
621
622                 if self.task_queue.error is not None:
623                     raise self.task_queue.error
624
625                 if runnable:
626                     with Perf(metrics, "run"):
627                         self.start_run(runnable, runtimeContext)
628                 else:
629                     if (self.task_queue.in_flight + len(self.processes)) > 0:
630                         self.workflow_eval_lock.wait(3)
631                     else:
632                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
633                         break
634                 loopperf.__enter__()
635             loopperf.__exit__()
636
637             while (self.task_queue.in_flight + len(self.processes)) > 0:
638                 if self.task_queue.error is not None:
639                     raise self.task_queue.error
640                 self.workflow_eval_lock.wait(3)
641
642         except UnsupportedRequirement:
643             raise
644         except:
645             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
646                 logger.error("Interrupted, workflow will be cancelled")
647             else:
648                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
649             if self.pipeline:
650                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
651                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
652             if runnerjob and runnerjob.uuid and self.work_api == "containers":
653                 self.api.container_requests().update(uuid=runnerjob.uuid,
654                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
655         finally:
656             self.workflow_eval_lock.release()
657             self.task_queue.drain()
658             self.stop_polling.set()
659             self.polling_thread.join()
660             self.task_queue.join()
661
662         if self.final_status == "UnsupportedRequirement":
663             raise UnsupportedRequirement("Check log for details.")
664
665         if self.final_output is None:
666             raise WorkflowException("Workflow did not return a result.")
667
668         if runtimeContext.submit and isinstance(runnerjob, Runner):
669             logger.info("Final output collection %s", runnerjob.final_output)
670         else:
671             if self.output_name is None:
672                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
673             if self.output_tags is None:
674                 self.output_tags = ""
675
676             storage_classes = runtimeContext.storage_classes.strip().split(",")
677             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
678             self.set_crunch_output()
679
680         if runtimeContext.compute_checksum:
681             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
682             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
683
684         if self.trash_intermediate and self.final_status == "success":
685             self.trash_intermediate_output()
686
687         return (self.final_output, self.final_status)
688
689
690 def versionstring():
691     """Print version string of key packages for provenance and debugging."""
692
693     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
694     arvpkg = pkg_resources.require("arvados-python-client")
695     cwlpkg = pkg_resources.require("cwltool")
696
697     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
698                                     "arvados-python-client", arvpkg[0].version,
699                                     "cwltool", cwlpkg[0].version)
700
701
702 def arg_parser():  # type: () -> argparse.ArgumentParser
703     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
704
705     parser.add_argument("--basedir", type=str,
706                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
707     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
708                         help="Output directory, default current directory")
709
710     parser.add_argument("--eval-timeout",
711                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
712                         type=float,
713                         default=20)
714
715     exgroup = parser.add_mutually_exclusive_group()
716     exgroup.add_argument("--print-dot", action="store_true",
717                          help="Print workflow visualization in graphviz format and exit")
718     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
719     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
720
721     exgroup = parser.add_mutually_exclusive_group()
722     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
723     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
724     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
725
726     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
727
728     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
729
730     exgroup = parser.add_mutually_exclusive_group()
731     exgroup.add_argument("--enable-reuse", action="store_true",
732                         default=True, dest="enable_reuse",
733                         help="Enable job or container reuse (default)")
734     exgroup.add_argument("--disable-reuse", action="store_false",
735                         default=True, dest="enable_reuse",
736                         help="Disable job or container reuse")
737
738     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
739     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
740     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
741     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
742                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
743                         default=False)
744
745     exgroup = parser.add_mutually_exclusive_group()
746     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
747                         default=True, dest="submit")
748     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
749                         default=True, dest="submit")
750     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
751                          dest="create_workflow")
752     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
753     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
754
755     exgroup = parser.add_mutually_exclusive_group()
756     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
757                         default=True, dest="wait")
758     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
759                         default=True, dest="wait")
760
761     exgroup = parser.add_mutually_exclusive_group()
762     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
763                         default=True, dest="log_timestamps")
764     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
765                         default=True, dest="log_timestamps")
766
767     parser.add_argument("--api", type=str,
768                         default=None, dest="work_api",
769                         choices=("jobs", "containers"),
770                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
771
772     parser.add_argument("--compute-checksum", action="store_true", default=False,
773                         help="Compute checksum of contents while collecting outputs",
774                         dest="compute_checksum")
775
776     parser.add_argument("--submit-runner-ram", type=int,
777                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
778                         default=None)
779
780     parser.add_argument("--submit-runner-image", type=str,
781                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
782                         default=None)
783
784     parser.add_argument("--submit-request-uuid", type=str,
785                         default=None,
786                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
787
788     parser.add_argument("--name", type=str,
789                         help="Name to use for workflow execution instance.",
790                         default=None)
791
792     parser.add_argument("--on-error", type=str,
793                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
794                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
795
796     parser.add_argument("--enable-dev", action="store_true",
797                         help="Enable loading and running development versions "
798                              "of CWL spec.", default=False)
799     parser.add_argument('--storage-classes', default="default", type=str,
800                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
801
802     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
803                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
804                         default=0)
805
806     parser.add_argument("--priority", type=int,
807                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
808                         default=DEFAULT_PRIORITY)
809
810     parser.add_argument("--disable-validate", dest="do_validate",
811                         action="store_false", default=True,
812                         help=argparse.SUPPRESS)
813
814     parser.add_argument("--disable-js-validation",
815                         action="store_true", default=False,
816                         help=argparse.SUPPRESS)
817
818     parser.add_argument("--thread-count", type=int,
819                         default=4, help="Number of threads to use for job submit and output collection.")
820
821     exgroup = parser.add_mutually_exclusive_group()
822     exgroup.add_argument("--trash-intermediate", action="store_true",
823                         default=False, dest="trash_intermediate",
824                          help="Immediately trash intermediate outputs on workflow success.")
825     exgroup.add_argument("--no-trash-intermediate", action="store_false",
826                         default=False, dest="trash_intermediate",
827                         help="Do not trash intermediate outputs (default).")
828
829     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
830     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
831
832     return parser
833
834 def add_arv_hints():
835     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
836     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
837     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
838     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
839     res.close()
840     cwltool.process.supportedProcessRequirements.extend([
841         "http://arvados.org/cwl#RunInSingleContainer",
842         "http://arvados.org/cwl#OutputDirType",
843         "http://arvados.org/cwl#RuntimeConstraints",
844         "http://arvados.org/cwl#PartitionRequirement",
845         "http://arvados.org/cwl#APIRequirement",
846         "http://commonwl.org/cwltool#LoadListingRequirement",
847         "http://arvados.org/cwl#IntermediateOutput",
848         "http://arvados.org/cwl#ReuseRequirement"
849     ])
850
851 def exit_signal_handler(sigcode, frame):
852     logger.error("Caught signal {}, exiting.".format(sigcode))
853     sys.exit(-sigcode)
854
855 def main(args, stdout, stderr, api_client=None, keep_client=None,
856          install_sig_handlers=True):
857     parser = arg_parser()
858
859     job_order_object = None
860     arvargs = parser.parse_args(args)
861
862     if len(arvargs.storage_classes.strip().split(',')) > 1:
863         logger.error("Multiple storage classes are not supported currently.")
864         return 1
865
866     arvargs.use_container = True
867     arvargs.relax_path_checks = True
868     arvargs.print_supported_versions = False
869
870     if install_sig_handlers:
871         arv_cmd.install_signal_handlers()
872
873     if arvargs.update_workflow:
874         if arvargs.update_workflow.find('-7fd4e-') == 5:
875             want_api = 'containers'
876         elif arvargs.update_workflow.find('-p5p6p-') == 5:
877             want_api = 'jobs'
878         else:
879             want_api = None
880         if want_api and arvargs.work_api and want_api != arvargs.work_api:
881             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
882                 arvargs.update_workflow, want_api, arvargs.work_api))
883             return 1
884         arvargs.work_api = want_api
885
886     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
887         job_order_object = ({}, "")
888
889     add_arv_hints()
890
891     try:
892         if api_client is None:
893             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
894             keep_client = api_client.keep
895             # Make an API object now so errors are reported early.
896             api_client.users().current().execute()
897         if keep_client is None:
898             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
899         runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
900     except Exception as e:
901         logger.error(e)
902         return 1
903
904     if arvargs.debug:
905         logger.setLevel(logging.DEBUG)
906         logging.getLogger('arvados').setLevel(logging.DEBUG)
907
908     if arvargs.quiet:
909         logger.setLevel(logging.WARN)
910         logging.getLogger('arvados').setLevel(logging.WARN)
911         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
912
913     if arvargs.metrics:
914         metrics.setLevel(logging.DEBUG)
915         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
916
917     if arvargs.log_timestamps:
918         arvados.log_handler.setFormatter(logging.Formatter(
919             '%(asctime)s %(name)s %(levelname)s: %(message)s',
920             '%Y-%m-%d %H:%M:%S'))
921     else:
922         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
923
924     for key, val in cwltool.argparser.get_default_args().items():
925         if not hasattr(arvargs, key):
926             setattr(arvargs, key, val)
927
928     runtimeContext = ArvRuntimeContext(vars(arvargs))
929     runtimeContext.make_fs_access = partial(CollectionFsAccess,
930                              collection_cache=runner.collection_cache)
931
932     return cwltool.main.main(args=arvargs,
933                              stdout=stdout,
934                              stderr=stderr,
935                              executor=runner.arv_executor,
936                              versionfunc=versionstring,
937                              job_order_object=job_order_object,
938                              logger_handler=arvados.log_handler,
939                              custom_schema_callback=add_arv_hints,
940                              loadingContext=runner.loadingContext,
941                              runtimeContext=runtimeContext)