14476: When creating a Runner, check input is valid before submitting
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 import arvados_cwl.util
27 from .arvcontainer import RunnerContainer
28 from .arvjob import RunnerJob, RunnerTemplate
29 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
30 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
33 from .perf import Perf
34 from .pathmapper import NoFollowPathMapper
35 from .task_queue import TaskQueue
36 from .context import ArvLoadingContext, ArvRuntimeContext
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 DEFAULT_PRIORITY = 500
47
48 class RuntimeStatusLoggingHandler(logging.Handler):
49     """
50     Intercepts logging calls and report them as runtime statuses on runner
51     containers.
52     """
53     def __init__(self, runtime_status_update_func):
54         super(RuntimeStatusLoggingHandler, self).__init__()
55         self.runtime_status_update = runtime_status_update_func
56
57     def emit(self, record):
58         kind = None
59         if record.levelno >= logging.ERROR:
60             kind = 'error'
61         elif record.levelno >= logging.WARNING:
62             kind = 'warning'
63         if kind is not None:
64             log_msg = record.getMessage()
65             if '\n' in log_msg:
66                 # If the logged message is multi-line, use its first line as status
67                 # and the rest as detail.
68                 status, detail = log_msg.split('\n', 1)
69                 self.runtime_status_update(
70                     kind,
71                     "%s: %s" % (record.name, status),
72                     detail
73                 )
74             else:
75                 self.runtime_status_update(
76                     kind,
77                     "%s: %s" % (record.name, record.getMessage())
78                 )
79
80 class ArvCwlExecutor(object):
81     """Execute a CWL tool or workflow, submit work (using either jobs or
82     containers API), wait for them to complete, and report output.
83
84     """
85
86     def __init__(self, api_client,
87                  arvargs=None,
88                  keep_client=None,
89                  num_retries=4,
90                  thread_count=4):
91
92         if arvargs is None:
93             arvargs = argparse.Namespace()
94             arvargs.work_api = None
95             arvargs.output_name = None
96             arvargs.output_tags = None
97             arvargs.thread_count = 1
98             arvargs.collection_cache_size = None
99
100         self.api = api_client
101         self.processes = {}
102         self.workflow_eval_lock = threading.Condition(threading.RLock())
103         self.final_output = None
104         self.final_status = None
105         self.num_retries = num_retries
106         self.uuid = None
107         self.stop_polling = threading.Event()
108         self.poll_api = None
109         self.pipeline = None
110         self.final_output_collection = None
111         self.output_name = arvargs.output_name
112         self.output_tags = arvargs.output_tags
113         self.project_uuid = None
114         self.intermediate_output_ttl = 0
115         self.intermediate_output_collections = []
116         self.trash_intermediate = False
117         self.thread_count = arvargs.thread_count
118         self.poll_interval = 12
119         self.loadingContext = None
120         self.should_estimate_cache_size = True
121
122         if keep_client is not None:
123             self.keep_client = keep_client
124         else:
125             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
126
127         if arvargs.collection_cache_size:
128             collection_cache_size = arvargs.collection_cache_size*1024*1024
129             self.should_estimate_cache_size = False
130         else:
131             collection_cache_size = 256*1024*1024
132
133         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
134                                                 cap=collection_cache_size)
135
136         self.fetcher_constructor = partial(CollectionFetcher,
137                                            api_client=self.api,
138                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
139                                            num_retries=self.num_retries)
140
141         self.work_api = None
142         expected_api = ["jobs", "containers"]
143         for api in expected_api:
144             try:
145                 methods = self.api._rootDesc.get('resources')[api]['methods']
146                 if ('httpMethod' in methods['create'] and
147                     (arvargs.work_api == api or arvargs.work_api is None)):
148                     self.work_api = api
149                     break
150             except KeyError:
151                 pass
152
153         if not self.work_api:
154             if arvargs.work_api is None:
155                 raise Exception("No supported APIs")
156             else:
157                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
158
159         if self.work_api == "jobs":
160             logger.warn("""
161 *******************************
162 Using the deprecated 'jobs' API.
163
164 To get rid of this warning:
165
166 Users: read about migrating at
167 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
168 and use the option --api=containers
169
170 Admins: configure the cluster to disable the 'jobs' API as described at:
171 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
172 *******************************""")
173
174         self.loadingContext = ArvLoadingContext(vars(arvargs))
175         self.loadingContext.fetcher_constructor = self.fetcher_constructor
176         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
177         self.loadingContext.construct_tool_object = self.arv_make_tool
178
179         # Add a custom logging handler to the root logger for runtime status reporting
180         # if running inside a container
181         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
182             root_logger = logging.getLogger('')
183             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
184             root_logger.addHandler(handler)
185
186         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
187         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
188                                                      collection_cache=self.collection_cache)
189
190         validate_cluster_target(self, self.runtimeContext)
191
192
193     def arv_make_tool(self, toolpath_object, loadingContext):
194         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
195             return ArvadosCommandTool(self, toolpath_object, loadingContext)
196         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
197             return ArvadosWorkflow(self, toolpath_object, loadingContext)
198         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
199             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
200         else:
201             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
202
203     def output_callback(self, out, processStatus):
204         with self.workflow_eval_lock:
205             if processStatus == "success":
206                 logger.info("Overall process status is %s", processStatus)
207                 state = "Complete"
208             else:
209                 logger.error("Overall process status is %s", processStatus)
210                 state = "Failed"
211             if self.pipeline:
212                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
213                                                         body={"state": state}).execute(num_retries=self.num_retries)
214             self.final_status = processStatus
215             self.final_output = out
216             self.workflow_eval_lock.notifyAll()
217
218
219     def start_run(self, runnable, runtimeContext):
220         self.task_queue.add(partial(runnable.run, runtimeContext),
221                             self.workflow_eval_lock, self.stop_polling)
222
223     def process_submitted(self, container):
224         with self.workflow_eval_lock:
225             self.processes[container.uuid] = container
226
227     def process_done(self, uuid, record):
228         with self.workflow_eval_lock:
229             j = self.processes[uuid]
230             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
231             self.task_queue.add(partial(j.done, record),
232                                 self.workflow_eval_lock, self.stop_polling)
233             del self.processes[uuid]
234
235     def runtime_status_update(self, kind, message, detail=None):
236         """
237         Updates the runtime_status field on the runner container.
238         Called when there's a need to report errors, warnings or just
239         activity statuses, for example in the RuntimeStatusLoggingHandler.
240         """
241         with self.workflow_eval_lock:
242             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
243             if current is None:
244                 return
245             runtime_status = current.get('runtime_status', {})
246             # In case of status being an error, only report the first one.
247             if kind == 'error':
248                 if not runtime_status.get('error'):
249                     runtime_status.update({
250                         'error': message
251                     })
252                     if detail is not None:
253                         runtime_status.update({
254                             'errorDetail': detail
255                         })
256                 # Further errors are only mentioned as a count.
257                 else:
258                     # Get anything before an optional 'and N more' string.
259                     try:
260                         error_msg = re.match(
261                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
262                         more_failures = re.match(
263                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
264                     except TypeError:
265                         # Ignore tests stubbing errors
266                         return
267                     if more_failures:
268                         failure_qty = int(more_failures.groups()[0])
269                         runtime_status.update({
270                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
271                         })
272                     else:
273                         runtime_status.update({
274                             'error': "%s (and 1 more)" % error_msg
275                         })
276             elif kind in ['warning', 'activity']:
277                 # Record the last warning/activity status without regard of
278                 # previous occurences.
279                 runtime_status.update({
280                     kind: message
281                 })
282                 if detail is not None:
283                     runtime_status.update({
284                         kind+"Detail": detail
285                     })
286             else:
287                 # Ignore any other status kind
288                 return
289             try:
290                 self.api.containers().update(uuid=current['uuid'],
291                                             body={
292                                                 'runtime_status': runtime_status,
293                                             }).execute(num_retries=self.num_retries)
294             except Exception as e:
295                 logger.info("Couldn't update runtime_status: %s", e)
296
297     def wrapped_callback(self, cb, obj, st):
298         with self.workflow_eval_lock:
299             cb(obj, st)
300             self.workflow_eval_lock.notifyAll()
301
302     def get_wrapped_callback(self, cb):
303         return partial(self.wrapped_callback, cb)
304
305     def on_message(self, event):
306         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
307             uuid = event["object_uuid"]
308             if event["properties"]["new_attributes"]["state"] == "Running":
309                 with self.workflow_eval_lock:
310                     j = self.processes[uuid]
311                     if j.running is False:
312                         j.running = True
313                         j.update_pipeline_component(event["properties"]["new_attributes"])
314                         logger.info("%s %s is Running", self.label(j), uuid)
315             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
316                 self.process_done(uuid, event["properties"]["new_attributes"])
317
318     def label(self, obj):
319         return "[%s %s]" % (self.work_api[0:-1], obj.name)
320
321     def poll_states(self):
322         """Poll status of jobs or containers listed in the processes dict.
323
324         Runs in a separate thread.
325         """
326
327         try:
328             remain_wait = self.poll_interval
329             while True:
330                 if remain_wait > 0:
331                     self.stop_polling.wait(remain_wait)
332                 if self.stop_polling.is_set():
333                     break
334                 with self.workflow_eval_lock:
335                     keys = list(self.processes.keys())
336                 if not keys:
337                     remain_wait = self.poll_interval
338                     continue
339
340                 begin_poll = time.time()
341                 if self.work_api == "containers":
342                     table = self.poll_api.container_requests()
343                 elif self.work_api == "jobs":
344                     table = self.poll_api.jobs()
345
346                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
347
348                 while keys:
349                     page = keys[:pageSize]
350                     keys = keys[pageSize:]
351                     try:
352                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
353                     except Exception as e:
354                         logger.warn("Error checking states on API server: %s", e)
355                         remain_wait = self.poll_interval
356                         continue
357
358                     for p in proc_states["items"]:
359                         self.on_message({
360                             "object_uuid": p["uuid"],
361                             "event_type": "update",
362                             "properties": {
363                                 "new_attributes": p
364                             }
365                         })
366                 finish_poll = time.time()
367                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
368         except:
369             logger.exception("Fatal error in state polling thread.")
370             with self.workflow_eval_lock:
371                 self.processes.clear()
372                 self.workflow_eval_lock.notifyAll()
373         finally:
374             self.stop_polling.set()
375
376     def add_intermediate_output(self, uuid):
377         if uuid:
378             self.intermediate_output_collections.append(uuid)
379
380     def trash_intermediate_output(self):
381         logger.info("Cleaning up intermediate output collections")
382         for i in self.intermediate_output_collections:
383             try:
384                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
385             except:
386                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
387             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
388                 break
389
390     def check_features(self, obj):
391         if isinstance(obj, dict):
392             if obj.get("writable") and self.work_api != "containers":
393                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
394             if obj.get("class") == "DockerRequirement":
395                 if obj.get("dockerOutputDirectory"):
396                     if self.work_api != "containers":
397                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
398                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
399                     if not obj.get("dockerOutputDirectory").startswith('/'):
400                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
401                             "Option 'dockerOutputDirectory' must be an absolute path.")
402             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
403                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
404             for v in obj.itervalues():
405                 self.check_features(v)
406         elif isinstance(obj, list):
407             for i,v in enumerate(obj):
408                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
409                     self.check_features(v)
410
411     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
412         outputObj = copy.deepcopy(outputObj)
413
414         files = []
415         def capture(fileobj):
416             files.append(fileobj)
417
418         adjustDirObjs(outputObj, capture)
419         adjustFileObjs(outputObj, capture)
420
421         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
422
423         final = arvados.collection.Collection(api_client=self.api,
424                                               keep_client=self.keep_client,
425                                               num_retries=self.num_retries)
426
427         for k,v in generatemapper.items():
428             if k.startswith("_:"):
429                 if v.type == "Directory":
430                     continue
431                 if v.type == "CreateFile":
432                     with final.open(v.target, "wb") as f:
433                         f.write(v.resolved.encode("utf-8"))
434                     continue
435
436             if not k.startswith("keep:"):
437                 raise Exception("Output source is not in keep or a literal")
438             sp = k.split("/")
439             srccollection = sp[0][5:]
440             try:
441                 reader = self.collection_cache.get(srccollection)
442                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
443                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
444             except arvados.errors.ArgumentError as e:
445                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
446                 raise
447             except IOError as e:
448                 logger.warn("While preparing output collection: %s", e)
449
450         def rewrite(fileobj):
451             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
452             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
453                 if k in fileobj:
454                     del fileobj[k]
455
456         adjustDirObjs(outputObj, rewrite)
457         adjustFileObjs(outputObj, rewrite)
458
459         with final.open("cwl.output.json", "w") as f:
460             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
461
462         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
463
464         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
465                     final.api_response()["name"],
466                     final.manifest_locator())
467
468         final_uuid = final.manifest_locator()
469         tags = tagsString.split(',')
470         for tag in tags:
471              self.api.links().create(body={
472                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
473                 }).execute(num_retries=self.num_retries)
474
475         def finalcollection(fileobj):
476             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
477
478         adjustDirObjs(outputObj, finalcollection)
479         adjustFileObjs(outputObj, finalcollection)
480
481         return (outputObj, final)
482
483     def set_crunch_output(self):
484         if self.work_api == "containers":
485             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
486             if current is None:
487                 return
488             try:
489                 self.api.containers().update(uuid=current['uuid'],
490                                              body={
491                                                  'output': self.final_output_collection.portable_data_hash(),
492                                              }).execute(num_retries=self.num_retries)
493                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
494                                               body={
495                                                   'is_trashed': True
496                                               }).execute(num_retries=self.num_retries)
497             except Exception as e:
498                 logger.info("Setting container output: %s", e)
499         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
500             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
501                                    body={
502                                        'output': self.final_output_collection.portable_data_hash(),
503                                        'success': self.final_status == "success",
504                                        'progress':1.0
505                                    }).execute(num_retries=self.num_retries)
506
507     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
508         self.debug = runtimeContext.debug
509
510         tool.visit(self.check_features)
511
512         self.project_uuid = runtimeContext.project_uuid
513         self.pipeline = None
514         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
515         self.secret_store = runtimeContext.secret_store
516
517         self.trash_intermediate = runtimeContext.trash_intermediate
518         if self.trash_intermediate and self.work_api != "containers":
519             raise Exception("--trash-intermediate is only supported with --api=containers.")
520
521         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
522         if self.intermediate_output_ttl and self.work_api != "containers":
523             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
524         if self.intermediate_output_ttl < 0:
525             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
526
527         if runtimeContext.submit_request_uuid and self.work_api != "containers":
528             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
529
530         if not runtimeContext.name:
531             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
532
533         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
534         # Also uploads docker images.
535         merged_map = upload_workflow_deps(self, tool)
536
537         # Reload tool object which may have been updated by
538         # upload_workflow_deps
539         # Don't validate this time because it will just print redundant errors.
540         loadingContext = self.loadingContext.copy()
541         loadingContext.loader = tool.doc_loader
542         loadingContext.avsc_names = tool.doc_schema
543         loadingContext.metadata = tool.metadata
544         loadingContext.do_validate = False
545
546         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
547                                   loadingContext)
548
549         # Upload local file references in the job order.
550         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
551                                      tool, job_order)
552
553         existing_uuid = runtimeContext.update_workflow
554         if existing_uuid or runtimeContext.create_workflow:
555             # Create a pipeline template or workflow record and exit.
556             if self.work_api == "jobs":
557                 tmpl = RunnerTemplate(self, tool, job_order,
558                                       runtimeContext.enable_reuse,
559                                       uuid=existing_uuid,
560                                       submit_runner_ram=runtimeContext.submit_runner_ram,
561                                       name=runtimeContext.name,
562                                       merged_map=merged_map)
563                 tmpl.save()
564                 # cwltool.main will write our return value to stdout.
565                 return (tmpl.uuid, "success")
566             elif self.work_api == "containers":
567                 return (upload_workflow(self, tool, job_order,
568                                         self.project_uuid,
569                                         uuid=existing_uuid,
570                                         submit_runner_ram=runtimeContext.submit_runner_ram,
571                                         name=runtimeContext.name,
572                                         merged_map=merged_map),
573                         "success")
574
575         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
576         self.eval_timeout = runtimeContext.eval_timeout
577
578         runtimeContext = runtimeContext.copy()
579         runtimeContext.use_container = True
580         runtimeContext.tmpdir_prefix = "tmp"
581         runtimeContext.work_api = self.work_api
582
583         if self.work_api == "containers":
584             if self.ignore_docker_for_reuse:
585                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
586             runtimeContext.outdir = "/var/spool/cwl"
587             runtimeContext.docker_outdir = "/var/spool/cwl"
588             runtimeContext.tmpdir = "/tmp"
589             runtimeContext.docker_tmpdir = "/tmp"
590         elif self.work_api == "jobs":
591             if runtimeContext.priority != DEFAULT_PRIORITY:
592                 raise Exception("--priority not implemented for jobs API.")
593             runtimeContext.outdir = "$(task.outdir)"
594             runtimeContext.docker_outdir = "$(task.outdir)"
595             runtimeContext.tmpdir = "$(task.tmpdir)"
596
597         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
598             raise Exception("--priority must be in the range 1..1000.")
599
600         if self.should_estimate_cache_size:
601             visited = set()
602             estimated_size = [0]
603             def estimate_collection_cache(obj):
604                 if obj.get("location", "").startswith("keep:"):
605                     m = pdh_size.match(obj["location"][5:])
606                     if m and m.group(1) not in visited:
607                         visited.add(m.group(1))
608                         estimated_size[0] += int(m.group(2))
609             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
610             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) / (1024*1024))+1, 256)
611             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
612
613         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
614
615         runnerjob = None
616         if runtimeContext.submit:
617             # Submit a runner job to run the workflow for us.
618             if self.work_api == "containers":
619                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
620                     runtimeContext.runnerjob = tool.tool["id"]
621                 else:
622                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
623                                                 self.output_name,
624                                                 self.output_tags,
625                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
626                                                 name=runtimeContext.name,
627                                                 on_error=runtimeContext.on_error,
628                                                 submit_runner_image=runtimeContext.submit_runner_image,
629                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
630                                                 merged_map=merged_map,
631                                                 priority=runtimeContext.priority,
632                                                 secret_store=self.secret_store,
633                                                 collection_cache_size=runtimeContext.collection_cache_size,
634                                                 collection_cache_is_default=self.should_estimate_cache_size)
635             elif self.work_api == "jobs":
636                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
637                                       self.output_name,
638                                       self.output_tags,
639                                       submit_runner_ram=runtimeContext.submit_runner_ram,
640                                       name=runtimeContext.name,
641                                       on_error=runtimeContext.on_error,
642                                       submit_runner_image=runtimeContext.submit_runner_image,
643                                       merged_map=merged_map)
644         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
645             # Create pipeline for local run
646             self.pipeline = self.api.pipeline_instances().create(
647                 body={
648                     "owner_uuid": self.project_uuid,
649                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
650                     "components": {},
651                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
652             logger.info("Pipeline instance %s", self.pipeline["uuid"])
653
654         if runtimeContext.cwl_runner_job is not None:
655             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
656
657         jobiter = tool.job(job_order,
658                            self.output_callback,
659                            runtimeContext)
660
661         if runtimeContext.submit and not runtimeContext.wait:
662             runnerjob = jobiter.next()
663             runnerjob.run(runtimeContext)
664             return (runnerjob.uuid, "success")
665
666         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
667         if current_container:
668             logger.info("Running inside container %s", current_container.get("uuid"))
669
670         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
671         self.polling_thread = threading.Thread(target=self.poll_states)
672         self.polling_thread.start()
673
674         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
675
676         try:
677             self.workflow_eval_lock.acquire()
678
679             # Holds the lock while this code runs and releases it when
680             # it is safe to do so in self.workflow_eval_lock.wait(),
681             # at which point on_message can update job state and
682             # process output callbacks.
683
684             loopperf = Perf(metrics, "jobiter")
685             loopperf.__enter__()
686             for runnable in jobiter:
687                 loopperf.__exit__()
688
689                 if self.stop_polling.is_set():
690                     break
691
692                 if self.task_queue.error is not None:
693                     raise self.task_queue.error
694
695                 if runnable:
696                     with Perf(metrics, "run"):
697                         self.start_run(runnable, runtimeContext)
698                 else:
699                     if (self.task_queue.in_flight + len(self.processes)) > 0:
700                         self.workflow_eval_lock.wait(3)
701                     else:
702                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
703                         break
704
705                 if self.stop_polling.is_set():
706                     break
707
708                 loopperf.__enter__()
709             loopperf.__exit__()
710
711             while (self.task_queue.in_flight + len(self.processes)) > 0:
712                 if self.task_queue.error is not None:
713                     raise self.task_queue.error
714                 self.workflow_eval_lock.wait(3)
715
716         except UnsupportedRequirement:
717             raise
718         except:
719             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
720                 logger.error("Interrupted, workflow will be cancelled")
721             else:
722                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
723             if self.pipeline:
724                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
725                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
726             if runtimeContext.submit and isinstance(tool, Runner):
727                 runnerjob = tool
728                 if runnerjob and runnerjob.uuid and self.work_api == "containers":
729                     self.api.container_requests().update(uuid=runnerjob.uuid,
730                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
731         finally:
732             self.workflow_eval_lock.release()
733             self.task_queue.drain()
734             self.stop_polling.set()
735             self.polling_thread.join()
736             self.task_queue.join()
737
738         if self.final_status == "UnsupportedRequirement":
739             raise UnsupportedRequirement("Check log for details.")
740
741         if self.final_output is None:
742             raise WorkflowException("Workflow did not return a result.")
743
744         if runtimeContext.submit and isinstance(tool, Runner):
745             logger.info("Final output collection %s", tool.final_output)
746         else:
747             if self.output_name is None:
748                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
749             if self.output_tags is None:
750                 self.output_tags = ""
751
752             storage_classes = runtimeContext.storage_classes.strip().split(",")
753             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
754             self.set_crunch_output()
755
756         if runtimeContext.compute_checksum:
757             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
758             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
759
760         if self.trash_intermediate and self.final_status == "success":
761             self.trash_intermediate_output()
762
763         return (self.final_output, self.final_status)