14510: Setting collection cache wip
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 import arvados_cwl.util
27 from .arvcontainer import RunnerContainer
28 from .arvjob import RunnerJob, RunnerTemplate
29 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
30 from .arvtool import ArvadosCommandTool, validate_cluster_target
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
33 from .perf import Perf
34 from .pathmapper import NoFollowPathMapper
35 from .task_queue import TaskQueue
36 from .context import ArvLoadingContext, ArvRuntimeContext
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 DEFAULT_PRIORITY = 500
47
48 class RuntimeStatusLoggingHandler(logging.Handler):
49     """
50     Intercepts logging calls and report them as runtime statuses on runner
51     containers.
52     """
53     def __init__(self, runtime_status_update_func):
54         super(RuntimeStatusLoggingHandler, self).__init__()
55         self.runtime_status_update = runtime_status_update_func
56
57     def emit(self, record):
58         kind = None
59         if record.levelno >= logging.ERROR:
60             kind = 'error'
61         elif record.levelno >= logging.WARNING:
62             kind = 'warning'
63         if kind is not None:
64             log_msg = record.getMessage()
65             if '\n' in log_msg:
66                 # If the logged message is multi-line, use its first line as status
67                 # and the rest as detail.
68                 status, detail = log_msg.split('\n', 1)
69                 self.runtime_status_update(
70                     kind,
71                     "%s: %s" % (record.name, status),
72                     detail
73                 )
74             else:
75                 self.runtime_status_update(
76                     kind,
77                     "%s: %s" % (record.name, record.getMessage())
78                 )
79
80 class ArvCwlExecutor(object):
81     """Execute a CWL tool or workflow, submit work (using either jobs or
82     containers API), wait for them to complete, and report output.
83
84     """
85
86     def __init__(self, api_client,
87                  arvargs=None,
88                  keep_client=None,
89                  num_retries=4,
90                  thread_count=4):
91
92         if arvargs is None:
93             arvargs = argparse.Namespace()
94             arvargs.work_api = None
95             arvargs.output_name = None
96             arvargs.output_tags = None
97             arvargs.thread_count = 1
98
99         self.api = api_client
100         self.processes = {}
101         self.workflow_eval_lock = threading.Condition(threading.RLock())
102         self.final_output = None
103         self.final_status = None
104         self.num_retries = num_retries
105         self.uuid = None
106         self.stop_polling = threading.Event()
107         self.poll_api = None
108         self.pipeline = None
109         self.final_output_collection = None
110         self.output_name = arvargs.output_name
111         self.output_tags = arvargs.output_tags
112         self.project_uuid = None
113         self.intermediate_output_ttl = 0
114         self.intermediate_output_collections = []
115         self.trash_intermediate = False
116         self.thread_count = arvargs.thread_count
117         self.poll_interval = 12
118         self.loadingContext = None
119
120         if keep_client is not None:
121             self.keep_client = keep_client
122         else:
123             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
124
125         if arvargs.collection_cache_size:
126             collection_cache_size = arvargs.collection_cache_size*1024*1024
127         else:
128             collection_cache_size = 256*1024*1024
129
130         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
131                                                 cap=collection_cache_size)
132
133         self.fetcher_constructor = partial(CollectionFetcher,
134                                            api_client=self.api,
135                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
136                                            num_retries=self.num_retries)
137
138         self.work_api = None
139         expected_api = ["jobs", "containers"]
140         for api in expected_api:
141             try:
142                 methods = self.api._rootDesc.get('resources')[api]['methods']
143                 if ('httpMethod' in methods['create'] and
144                     (arvargs.work_api == api or arvargs.work_api is None)):
145                     self.work_api = api
146                     break
147             except KeyError:
148                 pass
149
150         if not self.work_api:
151             if arvargs.work_api is None:
152                 raise Exception("No supported APIs")
153             else:
154                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
155
156         if self.work_api == "jobs":
157             logger.warn("""
158 *******************************
159 Using the deprecated 'jobs' API.
160
161 To get rid of this warning:
162
163 Users: read about migrating at
164 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
165 and use the option --api=containers
166
167 Admins: configure the cluster to disable the 'jobs' API as described at:
168 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
169 *******************************""")
170
171         self.loadingContext = ArvLoadingContext(vars(arvargs))
172         self.loadingContext.fetcher_constructor = self.fetcher_constructor
173         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
174         self.loadingContext.construct_tool_object = self.arv_make_tool
175
176         # Add a custom logging handler to the root logger for runtime status reporting
177         # if running inside a container
178         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
179             root_logger = logging.getLogger('')
180             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
181             root_logger.addHandler(handler)
182
183         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
184         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
185                                                      collection_cache=self.collection_cache)
186
187         validate_cluster_target(self, self.runtimeContext)
188
189
190     def arv_make_tool(self, toolpath_object, loadingContext):
191         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
192             return ArvadosCommandTool(self, toolpath_object, loadingContext)
193         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
194             return ArvadosWorkflow(self, toolpath_object, loadingContext)
195         else:
196             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
197
198     def output_callback(self, out, processStatus):
199         with self.workflow_eval_lock:
200             if processStatus == "success":
201                 logger.info("Overall process status is %s", processStatus)
202                 state = "Complete"
203             else:
204                 logger.error("Overall process status is %s", processStatus)
205                 state = "Failed"
206             if self.pipeline:
207                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
208                                                         body={"state": state}).execute(num_retries=self.num_retries)
209             self.final_status = processStatus
210             self.final_output = out
211             self.workflow_eval_lock.notifyAll()
212
213
214     def start_run(self, runnable, runtimeContext):
215         self.task_queue.add(partial(runnable.run, runtimeContext),
216                             self.workflow_eval_lock, self.stop_polling)
217
218     def process_submitted(self, container):
219         with self.workflow_eval_lock:
220             self.processes[container.uuid] = container
221
222     def process_done(self, uuid, record):
223         with self.workflow_eval_lock:
224             j = self.processes[uuid]
225             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
226             self.task_queue.add(partial(j.done, record),
227                                 self.workflow_eval_lock, self.stop_polling)
228             del self.processes[uuid]
229
230     def runtime_status_update(self, kind, message, detail=None):
231         """
232         Updates the runtime_status field on the runner container.
233         Called when there's a need to report errors, warnings or just
234         activity statuses, for example in the RuntimeStatusLoggingHandler.
235         """
236         with self.workflow_eval_lock:
237             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
238             if current is None:
239                 return
240             runtime_status = current.get('runtime_status', {})
241             # In case of status being an error, only report the first one.
242             if kind == 'error':
243                 if not runtime_status.get('error'):
244                     runtime_status.update({
245                         'error': message
246                     })
247                     if detail is not None:
248                         runtime_status.update({
249                             'errorDetail': detail
250                         })
251                 # Further errors are only mentioned as a count.
252                 else:
253                     # Get anything before an optional 'and N more' string.
254                     try:
255                         error_msg = re.match(
256                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
257                         more_failures = re.match(
258                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
259                     except TypeError:
260                         # Ignore tests stubbing errors
261                         return
262                     if more_failures:
263                         failure_qty = int(more_failures.groups()[0])
264                         runtime_status.update({
265                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
266                         })
267                     else:
268                         runtime_status.update({
269                             'error': "%s (and 1 more)" % error_msg
270                         })
271             elif kind in ['warning', 'activity']:
272                 # Record the last warning/activity status without regard of
273                 # previous occurences.
274                 runtime_status.update({
275                     kind: message
276                 })
277                 if detail is not None:
278                     runtime_status.update({
279                         kind+"Detail": detail
280                     })
281             else:
282                 # Ignore any other status kind
283                 return
284             try:
285                 self.api.containers().update(uuid=current['uuid'],
286                                             body={
287                                                 'runtime_status': runtime_status,
288                                             }).execute(num_retries=self.num_retries)
289             except Exception as e:
290                 logger.info("Couldn't update runtime_status: %s", e)
291
292     def wrapped_callback(self, cb, obj, st):
293         with self.workflow_eval_lock:
294             cb(obj, st)
295             self.workflow_eval_lock.notifyAll()
296
297     def get_wrapped_callback(self, cb):
298         return partial(self.wrapped_callback, cb)
299
300     def on_message(self, event):
301         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
302             uuid = event["object_uuid"]
303             if event["properties"]["new_attributes"]["state"] == "Running":
304                 with self.workflow_eval_lock:
305                     j = self.processes[uuid]
306                     if j.running is False:
307                         j.running = True
308                         j.update_pipeline_component(event["properties"]["new_attributes"])
309                         logger.info("%s %s is Running", self.label(j), uuid)
310             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
311                 self.process_done(uuid, event["properties"]["new_attributes"])
312
313     def label(self, obj):
314         return "[%s %s]" % (self.work_api[0:-1], obj.name)
315
316     def poll_states(self):
317         """Poll status of jobs or containers listed in the processes dict.
318
319         Runs in a separate thread.
320         """
321
322         try:
323             remain_wait = self.poll_interval
324             while True:
325                 if remain_wait > 0:
326                     self.stop_polling.wait(remain_wait)
327                 if self.stop_polling.is_set():
328                     break
329                 with self.workflow_eval_lock:
330                     keys = list(self.processes.keys())
331                 if not keys:
332                     remain_wait = self.poll_interval
333                     continue
334
335                 begin_poll = time.time()
336                 if self.work_api == "containers":
337                     table = self.poll_api.container_requests()
338                 elif self.work_api == "jobs":
339                     table = self.poll_api.jobs()
340
341                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
342
343                 while keys:
344                     page = keys[:pageSize]
345                     keys = keys[pageSize:]
346                     try:
347                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
348                     except Exception as e:
349                         logger.warn("Error checking states on API server: %s", e)
350                         remain_wait = self.poll_interval
351                         continue
352
353                     for p in proc_states["items"]:
354                         self.on_message({
355                             "object_uuid": p["uuid"],
356                             "event_type": "update",
357                             "properties": {
358                                 "new_attributes": p
359                             }
360                         })
361                 finish_poll = time.time()
362                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
363         except:
364             logger.exception("Fatal error in state polling thread.")
365             with self.workflow_eval_lock:
366                 self.processes.clear()
367                 self.workflow_eval_lock.notifyAll()
368         finally:
369             self.stop_polling.set()
370
371     def add_intermediate_output(self, uuid):
372         if uuid:
373             self.intermediate_output_collections.append(uuid)
374
375     def trash_intermediate_output(self):
376         logger.info("Cleaning up intermediate output collections")
377         for i in self.intermediate_output_collections:
378             try:
379                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
380             except:
381                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
382             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
383                 break
384
385     def check_features(self, obj):
386         if isinstance(obj, dict):
387             if obj.get("writable") and self.work_api != "containers":
388                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
389             if obj.get("class") == "DockerRequirement":
390                 if obj.get("dockerOutputDirectory"):
391                     if self.work_api != "containers":
392                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
393                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
394                     if not obj.get("dockerOutputDirectory").startswith('/'):
395                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
396                             "Option 'dockerOutputDirectory' must be an absolute path.")
397             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
398                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
399             for v in obj.itervalues():
400                 self.check_features(v)
401         elif isinstance(obj, list):
402             for i,v in enumerate(obj):
403                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
404                     self.check_features(v)
405
406     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
407         outputObj = copy.deepcopy(outputObj)
408
409         files = []
410         def capture(fileobj):
411             files.append(fileobj)
412
413         adjustDirObjs(outputObj, capture)
414         adjustFileObjs(outputObj, capture)
415
416         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
417
418         final = arvados.collection.Collection(api_client=self.api,
419                                               keep_client=self.keep_client,
420                                               num_retries=self.num_retries)
421
422         for k,v in generatemapper.items():
423             if k.startswith("_:"):
424                 if v.type == "Directory":
425                     continue
426                 if v.type == "CreateFile":
427                     with final.open(v.target, "wb") as f:
428                         f.write(v.resolved.encode("utf-8"))
429                     continue
430
431             if not k.startswith("keep:"):
432                 raise Exception("Output source is not in keep or a literal")
433             sp = k.split("/")
434             srccollection = sp[0][5:]
435             try:
436                 reader = self.collection_cache.get(srccollection)
437                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
438                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
439             except arvados.errors.ArgumentError as e:
440                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
441                 raise
442             except IOError as e:
443                 logger.warn("While preparing output collection: %s", e)
444
445         def rewrite(fileobj):
446             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
447             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
448                 if k in fileobj:
449                     del fileobj[k]
450
451         adjustDirObjs(outputObj, rewrite)
452         adjustFileObjs(outputObj, rewrite)
453
454         with final.open("cwl.output.json", "w") as f:
455             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
456
457         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
458
459         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
460                     final.api_response()["name"],
461                     final.manifest_locator())
462
463         final_uuid = final.manifest_locator()
464         tags = tagsString.split(',')
465         for tag in tags:
466              self.api.links().create(body={
467                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
468                 }).execute(num_retries=self.num_retries)
469
470         def finalcollection(fileobj):
471             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
472
473         adjustDirObjs(outputObj, finalcollection)
474         adjustFileObjs(outputObj, finalcollection)
475
476         return (outputObj, final)
477
478     def set_crunch_output(self):
479         if self.work_api == "containers":
480             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
481             if current is None:
482                 return
483             try:
484                 self.api.containers().update(uuid=current['uuid'],
485                                              body={
486                                                  'output': self.final_output_collection.portable_data_hash(),
487                                              }).execute(num_retries=self.num_retries)
488                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
489                                               body={
490                                                   'is_trashed': True
491                                               }).execute(num_retries=self.num_retries)
492             except Exception as e:
493                 logger.info("Setting container output: %s", e)
494         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
495             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
496                                    body={
497                                        'output': self.final_output_collection.portable_data_hash(),
498                                        'success': self.final_status == "success",
499                                        'progress':1.0
500                                    }).execute(num_retries=self.num_retries)
501
502     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
503         self.debug = runtimeContext.debug
504
505         tool.visit(self.check_features)
506
507         self.project_uuid = runtimeContext.project_uuid
508         self.pipeline = None
509         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
510         self.secret_store = runtimeContext.secret_store
511
512         self.trash_intermediate = runtimeContext.trash_intermediate
513         if self.trash_intermediate and self.work_api != "containers":
514             raise Exception("--trash-intermediate is only supported with --api=containers.")
515
516         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
517         if self.intermediate_output_ttl and self.work_api != "containers":
518             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
519         if self.intermediate_output_ttl < 0:
520             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
521
522         if runtimeContext.submit_request_uuid and self.work_api != "containers":
523             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
524
525         if not runtimeContext.name:
526             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
527
528         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
529         # Also uploads docker images.
530         merged_map = upload_workflow_deps(self, tool)
531
532         # Reload tool object which may have been updated by
533         # upload_workflow_deps
534         # Don't validate this time because it will just print redundant errors.
535         loadingContext = self.loadingContext.copy()
536         loadingContext.loader = tool.doc_loader
537         loadingContext.avsc_names = tool.doc_schema
538         loadingContext.metadata = tool.metadata
539         loadingContext.do_validate = False
540
541         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
542                                   loadingContext)
543
544         # Upload local file references in the job order.
545         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
546                                      tool, job_order)
547
548         existing_uuid = runtimeContext.update_workflow
549         if existing_uuid or runtimeContext.create_workflow:
550             # Create a pipeline template or workflow record and exit.
551             if self.work_api == "jobs":
552                 tmpl = RunnerTemplate(self, tool, job_order,
553                                       runtimeContext.enable_reuse,
554                                       uuid=existing_uuid,
555                                       submit_runner_ram=runtimeContext.submit_runner_ram,
556                                       name=runtimeContext.name,
557                                       merged_map=merged_map)
558                 tmpl.save()
559                 # cwltool.main will write our return value to stdout.
560                 return (tmpl.uuid, "success")
561             elif self.work_api == "containers":
562                 return (upload_workflow(self, tool, job_order,
563                                         self.project_uuid,
564                                         uuid=existing_uuid,
565                                         submit_runner_ram=runtimeContext.submit_runner_ram,
566                                         name=runtimeContext.name,
567                                         merged_map=merged_map),
568                         "success")
569
570         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
571         self.eval_timeout = runtimeContext.eval_timeout
572
573         runtimeContext = runtimeContext.copy()
574         runtimeContext.use_container = True
575         runtimeContext.tmpdir_prefix = "tmp"
576         runtimeContext.work_api = self.work_api
577
578         if self.work_api == "containers":
579             if self.ignore_docker_for_reuse:
580                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
581             runtimeContext.outdir = "/var/spool/cwl"
582             runtimeContext.docker_outdir = "/var/spool/cwl"
583             runtimeContext.tmpdir = "/tmp"
584             runtimeContext.docker_tmpdir = "/tmp"
585         elif self.work_api == "jobs":
586             if runtimeContext.priority != DEFAULT_PRIORITY:
587                 raise Exception("--priority not implemented for jobs API.")
588             runtimeContext.outdir = "$(task.outdir)"
589             runtimeContext.docker_outdir = "$(task.outdir)"
590             runtimeContext.tmpdir = "$(task.tmpdir)"
591
592         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
593             raise Exception("--priority must be in the range 1..1000.")
594
595         runnerjob = None
596         if runtimeContext.submit:
597             # Submit a runner job to run the workflow for us.
598             if self.work_api == "containers":
599                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
600                     runtimeContext.runnerjob = tool.tool["id"]
601                     runnerjob = tool.job(job_order,
602                                          self.output_callback,
603                                          runtimeContext).next()
604                 else:
605                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
606                                                 self.output_name,
607                                                 self.output_tags,
608                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
609                                                 name=runtimeContext.name,
610                                                 on_error=runtimeContext.on_error,
611                                                 submit_runner_image=runtimeContext.submit_runner_image,
612                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
613                                                 merged_map=merged_map,
614                                                 priority=runtimeContext.priority,
615                                                 secret_store=self.secret_store,
616                                                 collection_cache_size=runtimeContext.collection_cache_size)
617             elif self.work_api == "jobs":
618                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
619                                       self.output_name,
620                                       self.output_tags,
621                                       submit_runner_ram=runtimeContext.submit_runner_ram,
622                                       name=runtimeContext.name,
623                                       on_error=runtimeContext.on_error,
624                                       submit_runner_image=runtimeContext.submit_runner_image,
625                                       merged_map=merged_map)
626         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
627             # Create pipeline for local run
628             self.pipeline = self.api.pipeline_instances().create(
629                 body={
630                     "owner_uuid": self.project_uuid,
631                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
632                     "components": {},
633                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
634             logger.info("Pipeline instance %s", self.pipeline["uuid"])
635
636         if runnerjob and not runtimeContext.wait:
637             submitargs = runtimeContext.copy()
638             submitargs.submit = False
639             runnerjob.run(submitargs)
640             return (runnerjob.uuid, "success")
641
642         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
643         if current_container:
644             logger.info("Running inside container %s", current_container.get("uuid"))
645
646         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
647         self.polling_thread = threading.Thread(target=self.poll_states)
648         self.polling_thread.start()
649
650         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
651
652         try:
653             self.workflow_eval_lock.acquire()
654             if runnerjob:
655                 jobiter = iter((runnerjob,))
656             else:
657                 if runtimeContext.cwl_runner_job is not None:
658                     self.uuid = runtimeContext.cwl_runner_job.get('uuid')
659                 jobiter = tool.job(job_order,
660                                    self.output_callback,
661                                    runtimeContext)
662
663             # Holds the lock while this code runs and releases it when
664             # it is safe to do so in self.workflow_eval_lock.wait(),
665             # at which point on_message can update job state and
666             # process output callbacks.
667
668             loopperf = Perf(metrics, "jobiter")
669             loopperf.__enter__()
670             for runnable in jobiter:
671                 loopperf.__exit__()
672
673                 if self.stop_polling.is_set():
674                     break
675
676                 if self.task_queue.error is not None:
677                     raise self.task_queue.error
678
679                 if runnable:
680                     with Perf(metrics, "run"):
681                         self.start_run(runnable, runtimeContext)
682                 else:
683                     if (self.task_queue.in_flight + len(self.processes)) > 0:
684                         self.workflow_eval_lock.wait(3)
685                     else:
686                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
687                         break
688
689                 if self.stop_polling.is_set():
690                     break
691
692                 loopperf.__enter__()
693             loopperf.__exit__()
694
695             while (self.task_queue.in_flight + len(self.processes)) > 0:
696                 if self.task_queue.error is not None:
697                     raise self.task_queue.error
698                 self.workflow_eval_lock.wait(3)
699
700         except UnsupportedRequirement:
701             raise
702         except:
703             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
704                 logger.error("Interrupted, workflow will be cancelled")
705             else:
706                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
707             if self.pipeline:
708                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
709                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
710             if runnerjob and runnerjob.uuid and self.work_api == "containers":
711                 self.api.container_requests().update(uuid=runnerjob.uuid,
712                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
713         finally:
714             self.workflow_eval_lock.release()
715             self.task_queue.drain()
716             self.stop_polling.set()
717             self.polling_thread.join()
718             self.task_queue.join()
719
720         if self.final_status == "UnsupportedRequirement":
721             raise UnsupportedRequirement("Check log for details.")
722
723         if self.final_output is None:
724             raise WorkflowException("Workflow did not return a result.")
725
726         if runtimeContext.submit and isinstance(runnerjob, Runner):
727             logger.info("Final output collection %s", runnerjob.final_output)
728         else:
729             if self.output_name is None:
730                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
731             if self.output_tags is None:
732                 self.output_tags = ""
733
734             storage_classes = runtimeContext.storage_classes.strip().split(",")
735             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
736             self.set_crunch_output()
737
738         if runtimeContext.compute_checksum:
739             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
740             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
741
742         if self.trash_intermediate and self.final_status == "success":
743             self.trash_intermediate_output()
744
745         return (self.final_output, self.final_status)