Merge branch 'master' into 14012-arvput-check-cache
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 import arvados_cwl.util
27 from .arvcontainer import RunnerContainer
28 from .arvjob import RunnerJob, RunnerTemplate
29 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
30 from .arvtool import ArvadosCommandTool, validate_cluster_target
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
33 from .perf import Perf
34 from .pathmapper import NoFollowPathMapper
35 from .task_queue import TaskQueue
36 from .context import ArvLoadingContext, ArvRuntimeContext
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 DEFAULT_PRIORITY = 500
47
48 class RuntimeStatusLoggingHandler(logging.Handler):
49     """
50     Intercepts logging calls and report them as runtime statuses on runner
51     containers.
52     """
53     def __init__(self, runtime_status_update_func):
54         super(RuntimeStatusLoggingHandler, self).__init__()
55         self.runtime_status_update = runtime_status_update_func
56
57     def emit(self, record):
58         kind = None
59         if record.levelno >= logging.ERROR:
60             kind = 'error'
61         elif record.levelno >= logging.WARNING:
62             kind = 'warning'
63         if kind is not None:
64             log_msg = record.getMessage()
65             if '\n' in log_msg:
66                 # If the logged message is multi-line, use its first line as status
67                 # and the rest as detail.
68                 status, detail = log_msg.split('\n', 1)
69                 self.runtime_status_update(
70                     kind,
71                     "%s: %s" % (record.name, status),
72                     detail
73                 )
74             else:
75                 self.runtime_status_update(
76                     kind,
77                     "%s: %s" % (record.name, record.getMessage())
78                 )
79
80 class ArvCwlExecutor(object):
81     """Execute a CWL tool or workflow, submit work (using either jobs or
82     containers API), wait for them to complete, and report output.
83
84     """
85
86     def __init__(self, api_client,
87                  arvargs=None,
88                  keep_client=None,
89                  num_retries=4,
90                  thread_count=4):
91
92         if arvargs is None:
93             arvargs = argparse.Namespace()
94             arvargs.work_api = None
95             arvargs.output_name = None
96             arvargs.output_tags = None
97             arvargs.thread_count = 1
98             arvargs.collection_cache_size = None
99
100         self.api = api_client
101         self.processes = {}
102         self.workflow_eval_lock = threading.Condition(threading.RLock())
103         self.final_output = None
104         self.final_status = None
105         self.num_retries = num_retries
106         self.uuid = None
107         self.stop_polling = threading.Event()
108         self.poll_api = None
109         self.pipeline = None
110         self.final_output_collection = None
111         self.output_name = arvargs.output_name
112         self.output_tags = arvargs.output_tags
113         self.project_uuid = None
114         self.intermediate_output_ttl = 0
115         self.intermediate_output_collections = []
116         self.trash_intermediate = False
117         self.thread_count = arvargs.thread_count
118         self.poll_interval = 12
119         self.loadingContext = None
120         self.should_estimate_cache_size = True
121
122         if keep_client is not None:
123             self.keep_client = keep_client
124         else:
125             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
126
127         if arvargs.collection_cache_size:
128             collection_cache_size = arvargs.collection_cache_size*1024*1024
129             self.should_estimate_cache_size = False
130         else:
131             collection_cache_size = 256*1024*1024
132
133         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
134                                                 cap=collection_cache_size)
135
136         self.fetcher_constructor = partial(CollectionFetcher,
137                                            api_client=self.api,
138                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
139                                            num_retries=self.num_retries)
140
141         self.work_api = None
142         expected_api = ["jobs", "containers"]
143         for api in expected_api:
144             try:
145                 methods = self.api._rootDesc.get('resources')[api]['methods']
146                 if ('httpMethod' in methods['create'] and
147                     (arvargs.work_api == api or arvargs.work_api is None)):
148                     self.work_api = api
149                     break
150             except KeyError:
151                 pass
152
153         if not self.work_api:
154             if arvargs.work_api is None:
155                 raise Exception("No supported APIs")
156             else:
157                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
158
159         if self.work_api == "jobs":
160             logger.warn("""
161 *******************************
162 Using the deprecated 'jobs' API.
163
164 To get rid of this warning:
165
166 Users: read about migrating at
167 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
168 and use the option --api=containers
169
170 Admins: configure the cluster to disable the 'jobs' API as described at:
171 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
172 *******************************""")
173
174         self.loadingContext = ArvLoadingContext(vars(arvargs))
175         self.loadingContext.fetcher_constructor = self.fetcher_constructor
176         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
177         self.loadingContext.construct_tool_object = self.arv_make_tool
178
179         # Add a custom logging handler to the root logger for runtime status reporting
180         # if running inside a container
181         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
182             root_logger = logging.getLogger('')
183             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
184             root_logger.addHandler(handler)
185
186         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
187         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
188                                                      collection_cache=self.collection_cache)
189
190         validate_cluster_target(self, self.runtimeContext)
191
192
193     def arv_make_tool(self, toolpath_object, loadingContext):
194         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
195             return ArvadosCommandTool(self, toolpath_object, loadingContext)
196         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
197             return ArvadosWorkflow(self, toolpath_object, loadingContext)
198         else:
199             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
200
201     def output_callback(self, out, processStatus):
202         with self.workflow_eval_lock:
203             if processStatus == "success":
204                 logger.info("Overall process status is %s", processStatus)
205                 state = "Complete"
206             else:
207                 logger.error("Overall process status is %s", processStatus)
208                 state = "Failed"
209             if self.pipeline:
210                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
211                                                         body={"state": state}).execute(num_retries=self.num_retries)
212             self.final_status = processStatus
213             self.final_output = out
214             self.workflow_eval_lock.notifyAll()
215
216
217     def start_run(self, runnable, runtimeContext):
218         self.task_queue.add(partial(runnable.run, runtimeContext),
219                             self.workflow_eval_lock, self.stop_polling)
220
221     def process_submitted(self, container):
222         with self.workflow_eval_lock:
223             self.processes[container.uuid] = container
224
225     def process_done(self, uuid, record):
226         with self.workflow_eval_lock:
227             j = self.processes[uuid]
228             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
229             self.task_queue.add(partial(j.done, record),
230                                 self.workflow_eval_lock, self.stop_polling)
231             del self.processes[uuid]
232
233     def runtime_status_update(self, kind, message, detail=None):
234         """
235         Updates the runtime_status field on the runner container.
236         Called when there's a need to report errors, warnings or just
237         activity statuses, for example in the RuntimeStatusLoggingHandler.
238         """
239         with self.workflow_eval_lock:
240             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
241             if current is None:
242                 return
243             runtime_status = current.get('runtime_status', {})
244             # In case of status being an error, only report the first one.
245             if kind == 'error':
246                 if not runtime_status.get('error'):
247                     runtime_status.update({
248                         'error': message
249                     })
250                     if detail is not None:
251                         runtime_status.update({
252                             'errorDetail': detail
253                         })
254                 # Further errors are only mentioned as a count.
255                 else:
256                     # Get anything before an optional 'and N more' string.
257                     try:
258                         error_msg = re.match(
259                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
260                         more_failures = re.match(
261                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
262                     except TypeError:
263                         # Ignore tests stubbing errors
264                         return
265                     if more_failures:
266                         failure_qty = int(more_failures.groups()[0])
267                         runtime_status.update({
268                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
269                         })
270                     else:
271                         runtime_status.update({
272                             'error': "%s (and 1 more)" % error_msg
273                         })
274             elif kind in ['warning', 'activity']:
275                 # Record the last warning/activity status without regard of
276                 # previous occurences.
277                 runtime_status.update({
278                     kind: message
279                 })
280                 if detail is not None:
281                     runtime_status.update({
282                         kind+"Detail": detail
283                     })
284             else:
285                 # Ignore any other status kind
286                 return
287             try:
288                 self.api.containers().update(uuid=current['uuid'],
289                                             body={
290                                                 'runtime_status': runtime_status,
291                                             }).execute(num_retries=self.num_retries)
292             except Exception as e:
293                 logger.info("Couldn't update runtime_status: %s", e)
294
295     def wrapped_callback(self, cb, obj, st):
296         with self.workflow_eval_lock:
297             cb(obj, st)
298             self.workflow_eval_lock.notifyAll()
299
300     def get_wrapped_callback(self, cb):
301         return partial(self.wrapped_callback, cb)
302
303     def on_message(self, event):
304         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
305             uuid = event["object_uuid"]
306             if event["properties"]["new_attributes"]["state"] == "Running":
307                 with self.workflow_eval_lock:
308                     j = self.processes[uuid]
309                     if j.running is False:
310                         j.running = True
311                         j.update_pipeline_component(event["properties"]["new_attributes"])
312                         logger.info("%s %s is Running", self.label(j), uuid)
313             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
314                 self.process_done(uuid, event["properties"]["new_attributes"])
315
316     def label(self, obj):
317         return "[%s %s]" % (self.work_api[0:-1], obj.name)
318
319     def poll_states(self):
320         """Poll status of jobs or containers listed in the processes dict.
321
322         Runs in a separate thread.
323         """
324
325         try:
326             remain_wait = self.poll_interval
327             while True:
328                 if remain_wait > 0:
329                     self.stop_polling.wait(remain_wait)
330                 if self.stop_polling.is_set():
331                     break
332                 with self.workflow_eval_lock:
333                     keys = list(self.processes.keys())
334                 if not keys:
335                     remain_wait = self.poll_interval
336                     continue
337
338                 begin_poll = time.time()
339                 if self.work_api == "containers":
340                     table = self.poll_api.container_requests()
341                 elif self.work_api == "jobs":
342                     table = self.poll_api.jobs()
343
344                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
345
346                 while keys:
347                     page = keys[:pageSize]
348                     keys = keys[pageSize:]
349                     try:
350                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
351                     except Exception as e:
352                         logger.warn("Error checking states on API server: %s", e)
353                         remain_wait = self.poll_interval
354                         continue
355
356                     for p in proc_states["items"]:
357                         self.on_message({
358                             "object_uuid": p["uuid"],
359                             "event_type": "update",
360                             "properties": {
361                                 "new_attributes": p
362                             }
363                         })
364                 finish_poll = time.time()
365                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
366         except:
367             logger.exception("Fatal error in state polling thread.")
368             with self.workflow_eval_lock:
369                 self.processes.clear()
370                 self.workflow_eval_lock.notifyAll()
371         finally:
372             self.stop_polling.set()
373
374     def add_intermediate_output(self, uuid):
375         if uuid:
376             self.intermediate_output_collections.append(uuid)
377
378     def trash_intermediate_output(self):
379         logger.info("Cleaning up intermediate output collections")
380         for i in self.intermediate_output_collections:
381             try:
382                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
383             except:
384                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
385             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
386                 break
387
388     def check_features(self, obj):
389         if isinstance(obj, dict):
390             if obj.get("writable") and self.work_api != "containers":
391                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
392             if obj.get("class") == "DockerRequirement":
393                 if obj.get("dockerOutputDirectory"):
394                     if self.work_api != "containers":
395                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
396                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
397                     if not obj.get("dockerOutputDirectory").startswith('/'):
398                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
399                             "Option 'dockerOutputDirectory' must be an absolute path.")
400             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
401                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
402             for v in obj.itervalues():
403                 self.check_features(v)
404         elif isinstance(obj, list):
405             for i,v in enumerate(obj):
406                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
407                     self.check_features(v)
408
409     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
410         outputObj = copy.deepcopy(outputObj)
411
412         files = []
413         def capture(fileobj):
414             files.append(fileobj)
415
416         adjustDirObjs(outputObj, capture)
417         adjustFileObjs(outputObj, capture)
418
419         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
420
421         final = arvados.collection.Collection(api_client=self.api,
422                                               keep_client=self.keep_client,
423                                               num_retries=self.num_retries)
424
425         for k,v in generatemapper.items():
426             if k.startswith("_:"):
427                 if v.type == "Directory":
428                     continue
429                 if v.type == "CreateFile":
430                     with final.open(v.target, "wb") as f:
431                         f.write(v.resolved.encode("utf-8"))
432                     continue
433
434             if not k.startswith("keep:"):
435                 raise Exception("Output source is not in keep or a literal")
436             sp = k.split("/")
437             srccollection = sp[0][5:]
438             try:
439                 reader = self.collection_cache.get(srccollection)
440                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
441                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
442             except arvados.errors.ArgumentError as e:
443                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
444                 raise
445             except IOError as e:
446                 logger.warn("While preparing output collection: %s", e)
447
448         def rewrite(fileobj):
449             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
450             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
451                 if k in fileobj:
452                     del fileobj[k]
453
454         adjustDirObjs(outputObj, rewrite)
455         adjustFileObjs(outputObj, rewrite)
456
457         with final.open("cwl.output.json", "w") as f:
458             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
459
460         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
461
462         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
463                     final.api_response()["name"],
464                     final.manifest_locator())
465
466         final_uuid = final.manifest_locator()
467         tags = tagsString.split(',')
468         for tag in tags:
469              self.api.links().create(body={
470                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
471                 }).execute(num_retries=self.num_retries)
472
473         def finalcollection(fileobj):
474             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
475
476         adjustDirObjs(outputObj, finalcollection)
477         adjustFileObjs(outputObj, finalcollection)
478
479         return (outputObj, final)
480
481     def set_crunch_output(self):
482         if self.work_api == "containers":
483             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
484             if current is None:
485                 return
486             try:
487                 self.api.containers().update(uuid=current['uuid'],
488                                              body={
489                                                  'output': self.final_output_collection.portable_data_hash(),
490                                              }).execute(num_retries=self.num_retries)
491                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
492                                               body={
493                                                   'is_trashed': True
494                                               }).execute(num_retries=self.num_retries)
495             except Exception as e:
496                 logger.info("Setting container output: %s", e)
497         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
498             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
499                                    body={
500                                        'output': self.final_output_collection.portable_data_hash(),
501                                        'success': self.final_status == "success",
502                                        'progress':1.0
503                                    }).execute(num_retries=self.num_retries)
504
505     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
506         self.debug = runtimeContext.debug
507
508         tool.visit(self.check_features)
509
510         self.project_uuid = runtimeContext.project_uuid
511         self.pipeline = None
512         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
513         self.secret_store = runtimeContext.secret_store
514
515         self.trash_intermediate = runtimeContext.trash_intermediate
516         if self.trash_intermediate and self.work_api != "containers":
517             raise Exception("--trash-intermediate is only supported with --api=containers.")
518
519         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
520         if self.intermediate_output_ttl and self.work_api != "containers":
521             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
522         if self.intermediate_output_ttl < 0:
523             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
524
525         if runtimeContext.submit_request_uuid and self.work_api != "containers":
526             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
527
528         if not runtimeContext.name:
529             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
530
531         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
532         # Also uploads docker images.
533         merged_map = upload_workflow_deps(self, tool)
534
535         # Reload tool object which may have been updated by
536         # upload_workflow_deps
537         # Don't validate this time because it will just print redundant errors.
538         loadingContext = self.loadingContext.copy()
539         loadingContext.loader = tool.doc_loader
540         loadingContext.avsc_names = tool.doc_schema
541         loadingContext.metadata = tool.metadata
542         loadingContext.do_validate = False
543
544         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
545                                   loadingContext)
546
547         # Upload local file references in the job order.
548         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
549                                      tool, job_order)
550
551         existing_uuid = runtimeContext.update_workflow
552         if existing_uuid or runtimeContext.create_workflow:
553             # Create a pipeline template or workflow record and exit.
554             if self.work_api == "jobs":
555                 tmpl = RunnerTemplate(self, tool, job_order,
556                                       runtimeContext.enable_reuse,
557                                       uuid=existing_uuid,
558                                       submit_runner_ram=runtimeContext.submit_runner_ram,
559                                       name=runtimeContext.name,
560                                       merged_map=merged_map)
561                 tmpl.save()
562                 # cwltool.main will write our return value to stdout.
563                 return (tmpl.uuid, "success")
564             elif self.work_api == "containers":
565                 return (upload_workflow(self, tool, job_order,
566                                         self.project_uuid,
567                                         uuid=existing_uuid,
568                                         submit_runner_ram=runtimeContext.submit_runner_ram,
569                                         name=runtimeContext.name,
570                                         merged_map=merged_map),
571                         "success")
572
573         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
574         self.eval_timeout = runtimeContext.eval_timeout
575
576         runtimeContext = runtimeContext.copy()
577         runtimeContext.use_container = True
578         runtimeContext.tmpdir_prefix = "tmp"
579         runtimeContext.work_api = self.work_api
580
581         if self.work_api == "containers":
582             if self.ignore_docker_for_reuse:
583                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
584             runtimeContext.outdir = "/var/spool/cwl"
585             runtimeContext.docker_outdir = "/var/spool/cwl"
586             runtimeContext.tmpdir = "/tmp"
587             runtimeContext.docker_tmpdir = "/tmp"
588         elif self.work_api == "jobs":
589             if runtimeContext.priority != DEFAULT_PRIORITY:
590                 raise Exception("--priority not implemented for jobs API.")
591             runtimeContext.outdir = "$(task.outdir)"
592             runtimeContext.docker_outdir = "$(task.outdir)"
593             runtimeContext.tmpdir = "$(task.tmpdir)"
594
595         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
596             raise Exception("--priority must be in the range 1..1000.")
597
598         if self.should_estimate_cache_size:
599             visited = set()
600             estimated_size = [0]
601             def estimate_collection_cache(obj):
602                 if obj.get("location", "").startswith("keep:"):
603                     m = pdh_size.match(obj["location"][5:])
604                     if m and m.group(1) not in visited:
605                         visited.add(m.group(1))
606                         estimated_size[0] += int(m.group(2))
607             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
608             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) / (1024*1024))+1, 256)
609             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
610
611         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
612
613         runnerjob = None
614         if runtimeContext.submit:
615             # Submit a runner job to run the workflow for us.
616             if self.work_api == "containers":
617                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
618                     runtimeContext.runnerjob = tool.tool["id"]
619                     runnerjob = tool.job(job_order,
620                                          self.output_callback,
621                                          runtimeContext).next()
622                 else:
623                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
624                                                 self.output_name,
625                                                 self.output_tags,
626                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
627                                                 name=runtimeContext.name,
628                                                 on_error=runtimeContext.on_error,
629                                                 submit_runner_image=runtimeContext.submit_runner_image,
630                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
631                                                 merged_map=merged_map,
632                                                 priority=runtimeContext.priority,
633                                                 secret_store=self.secret_store,
634                                                 collection_cache_size=runtimeContext.collection_cache_size,
635                                                 collection_cache_is_default=self.should_estimate_cache_size)
636             elif self.work_api == "jobs":
637                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
638                                       self.output_name,
639                                       self.output_tags,
640                                       submit_runner_ram=runtimeContext.submit_runner_ram,
641                                       name=runtimeContext.name,
642                                       on_error=runtimeContext.on_error,
643                                       submit_runner_image=runtimeContext.submit_runner_image,
644                                       merged_map=merged_map)
645         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
646             # Create pipeline for local run
647             self.pipeline = self.api.pipeline_instances().create(
648                 body={
649                     "owner_uuid": self.project_uuid,
650                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
651                     "components": {},
652                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
653             logger.info("Pipeline instance %s", self.pipeline["uuid"])
654
655         if runnerjob and not runtimeContext.wait:
656             submitargs = runtimeContext.copy()
657             submitargs.submit = False
658             runnerjob.run(submitargs)
659             return (runnerjob.uuid, "success")
660
661         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
662         if current_container:
663             logger.info("Running inside container %s", current_container.get("uuid"))
664
665         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
666         self.polling_thread = threading.Thread(target=self.poll_states)
667         self.polling_thread.start()
668
669         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
670
671         try:
672             self.workflow_eval_lock.acquire()
673             if runnerjob:
674                 jobiter = iter((runnerjob,))
675             else:
676                 if runtimeContext.cwl_runner_job is not None:
677                     self.uuid = runtimeContext.cwl_runner_job.get('uuid')
678                 jobiter = tool.job(job_order,
679                                    self.output_callback,
680                                    runtimeContext)
681
682             # Holds the lock while this code runs and releases it when
683             # it is safe to do so in self.workflow_eval_lock.wait(),
684             # at which point on_message can update job state and
685             # process output callbacks.
686
687             loopperf = Perf(metrics, "jobiter")
688             loopperf.__enter__()
689             for runnable in jobiter:
690                 loopperf.__exit__()
691
692                 if self.stop_polling.is_set():
693                     break
694
695                 if self.task_queue.error is not None:
696                     raise self.task_queue.error
697
698                 if runnable:
699                     with Perf(metrics, "run"):
700                         self.start_run(runnable, runtimeContext)
701                 else:
702                     if (self.task_queue.in_flight + len(self.processes)) > 0:
703                         self.workflow_eval_lock.wait(3)
704                     else:
705                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
706                         break
707
708                 if self.stop_polling.is_set():
709                     break
710
711                 loopperf.__enter__()
712             loopperf.__exit__()
713
714             while (self.task_queue.in_flight + len(self.processes)) > 0:
715                 if self.task_queue.error is not None:
716                     raise self.task_queue.error
717                 self.workflow_eval_lock.wait(3)
718
719         except UnsupportedRequirement:
720             raise
721         except:
722             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
723                 logger.error("Interrupted, workflow will be cancelled")
724             else:
725                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
726             if self.pipeline:
727                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
728                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
729             if runnerjob and runnerjob.uuid and self.work_api == "containers":
730                 self.api.container_requests().update(uuid=runnerjob.uuid,
731                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
732         finally:
733             self.workflow_eval_lock.release()
734             self.task_queue.drain()
735             self.stop_polling.set()
736             self.polling_thread.join()
737             self.task_queue.join()
738
739         if self.final_status == "UnsupportedRequirement":
740             raise UnsupportedRequirement("Check log for details.")
741
742         if self.final_output is None:
743             raise WorkflowException("Workflow did not return a result.")
744
745         if runtimeContext.submit and isinstance(runnerjob, Runner):
746             logger.info("Final output collection %s", runnerjob.final_output)
747         else:
748             if self.output_name is None:
749                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
750             if self.output_tags is None:
751                 self.output_tags = ""
752
753             storage_classes = runtimeContext.storage_classes.strip().split(",")
754             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
755             self.set_crunch_output()
756
757         if runtimeContext.compute_checksum:
758             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
759             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
760
761         if self.trash_intermediate and self.final_status == "success":
762             self.trash_intermediate_output()
763
764         return (self.final_output, self.final_status)