13306: Updates ArvCwlExecutor to properly convert objects to JSON in unicode
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from past.utils import old_div
9
10 import argparse
11 import logging
12 import os
13 import sys
14 import threading
15 import copy
16 import json
17 import re
18 from functools import partial
19 import time
20
21 from cwltool.errors import WorkflowException
22 import cwltool.workflow
23 from schema_salad.sourceline import SourceLine
24 import schema_salad.validate as validate
25
26 import arvados
27 import arvados.config
28 from arvados.keep import KeepClient
29 from arvados.errors import ApiError
30
31 import arvados_cwl.util
32 from .arvcontainer import RunnerContainer
33 from .arvjob import RunnerJob, RunnerTemplate
34 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
35 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
36 from .arvworkflow import ArvadosWorkflow, upload_workflow
37 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
38 from .perf import Perf
39 from .pathmapper import NoFollowPathMapper
40 from .task_queue import TaskQueue
41 from .context import ArvLoadingContext, ArvRuntimeContext
42 from ._version import __version__
43
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
46 from cwltool.command_line_tool import compute_checksums
47
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50
51 DEFAULT_PRIORITY = 500
52
53 class RuntimeStatusLoggingHandler(logging.Handler):
54     """
55     Intercepts logging calls and report them as runtime statuses on runner
56     containers.
57     """
58     def __init__(self, runtime_status_update_func):
59         super(RuntimeStatusLoggingHandler, self).__init__()
60         self.runtime_status_update = runtime_status_update_func
61
62     def emit(self, record):
63         kind = None
64         if record.levelno >= logging.ERROR:
65             kind = 'error'
66         elif record.levelno >= logging.WARNING:
67             kind = 'warning'
68         if kind is not None:
69             log_msg = record.getMessage()
70             if '\n' in log_msg:
71                 # If the logged message is multi-line, use its first line as status
72                 # and the rest as detail.
73                 status, detail = log_msg.split('\n', 1)
74                 self.runtime_status_update(
75                     kind,
76                     "%s: %s" % (record.name, status),
77                     detail
78                 )
79             else:
80                 self.runtime_status_update(
81                     kind,
82                     "%s: %s" % (record.name, record.getMessage())
83                 )
84
85 class ArvCwlExecutor(object):
86     """Execute a CWL tool or workflow, submit work (using either jobs or
87     containers API), wait for them to complete, and report output.
88
89     """
90
91     def __init__(self, api_client,
92                  arvargs=None,
93                  keep_client=None,
94                  num_retries=4,
95                  thread_count=4):
96
97         if arvargs is None:
98             arvargs = argparse.Namespace()
99             arvargs.work_api = None
100             arvargs.output_name = None
101             arvargs.output_tags = None
102             arvargs.thread_count = 1
103             arvargs.collection_cache_size = None
104
105         self.api = api_client
106         self.processes = {}
107         self.workflow_eval_lock = threading.Condition(threading.RLock())
108         self.final_output = None
109         self.final_status = None
110         self.num_retries = num_retries
111         self.uuid = None
112         self.stop_polling = threading.Event()
113         self.poll_api = None
114         self.pipeline = None
115         self.final_output_collection = None
116         self.output_name = arvargs.output_name
117         self.output_tags = arvargs.output_tags
118         self.project_uuid = None
119         self.intermediate_output_ttl = 0
120         self.intermediate_output_collections = []
121         self.trash_intermediate = False
122         self.thread_count = arvargs.thread_count
123         self.poll_interval = 12
124         self.loadingContext = None
125         self.should_estimate_cache_size = True
126
127         if keep_client is not None:
128             self.keep_client = keep_client
129         else:
130             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
131
132         if arvargs.collection_cache_size:
133             collection_cache_size = arvargs.collection_cache_size*1024*1024
134             self.should_estimate_cache_size = False
135         else:
136             collection_cache_size = 256*1024*1024
137
138         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
139                                                 cap=collection_cache_size)
140
141         self.fetcher_constructor = partial(CollectionFetcher,
142                                            api_client=self.api,
143                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
144                                            num_retries=self.num_retries)
145
146         self.work_api = None
147         expected_api = ["jobs", "containers"]
148         for api in expected_api:
149             try:
150                 methods = self.api._rootDesc.get('resources')[api]['methods']
151                 if ('httpMethod' in methods['create'] and
152                     (arvargs.work_api == api or arvargs.work_api is None)):
153                     self.work_api = api
154                     break
155             except KeyError:
156                 pass
157
158         if not self.work_api:
159             if arvargs.work_api is None:
160                 raise Exception("No supported APIs")
161             else:
162                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
163
164         if self.work_api == "jobs":
165             logger.warn("""
166 *******************************
167 Using the deprecated 'jobs' API.
168
169 To get rid of this warning:
170
171 Users: read about migrating at
172 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
173 and use the option --api=containers
174
175 Admins: configure the cluster to disable the 'jobs' API as described at:
176 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
177 *******************************""")
178
179         self.loadingContext = ArvLoadingContext(vars(arvargs))
180         self.loadingContext.fetcher_constructor = self.fetcher_constructor
181         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
182         self.loadingContext.construct_tool_object = self.arv_make_tool
183
184         # Add a custom logging handler to the root logger for runtime status reporting
185         # if running inside a container
186         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
187             root_logger = logging.getLogger('')
188             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
189             root_logger.addHandler(handler)
190
191         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
192         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
193                                                      collection_cache=self.collection_cache)
194
195         validate_cluster_target(self, self.runtimeContext)
196
197
198     def arv_make_tool(self, toolpath_object, loadingContext):
199         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
200             return ArvadosCommandTool(self, toolpath_object, loadingContext)
201         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
202             return ArvadosWorkflow(self, toolpath_object, loadingContext)
203         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
204             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
205         else:
206             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
207
208     def output_callback(self, out, processStatus):
209         with self.workflow_eval_lock:
210             if processStatus == "success":
211                 logger.info("Overall process status is %s", processStatus)
212                 state = "Complete"
213             else:
214                 logger.error("Overall process status is %s", processStatus)
215                 state = "Failed"
216             if self.pipeline:
217                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
218                                                         body={"state": state}).execute(num_retries=self.num_retries)
219             self.final_status = processStatus
220             self.final_output = out
221             self.workflow_eval_lock.notifyAll()
222
223
224     def start_run(self, runnable, runtimeContext):
225         self.task_queue.add(partial(runnable.run, runtimeContext),
226                             self.workflow_eval_lock, self.stop_polling)
227
228     def process_submitted(self, container):
229         with self.workflow_eval_lock:
230             self.processes[container.uuid] = container
231
232     def process_done(self, uuid, record):
233         with self.workflow_eval_lock:
234             j = self.processes[uuid]
235             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
236             self.task_queue.add(partial(j.done, record),
237                                 self.workflow_eval_lock, self.stop_polling)
238             del self.processes[uuid]
239
240     def runtime_status_update(self, kind, message, detail=None):
241         """
242         Updates the runtime_status field on the runner container.
243         Called when there's a need to report errors, warnings or just
244         activity statuses, for example in the RuntimeStatusLoggingHandler.
245         """
246         with self.workflow_eval_lock:
247             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
248             if current is None:
249                 return
250             runtime_status = current.get('runtime_status', {})
251             # In case of status being an error, only report the first one.
252             if kind == 'error':
253                 if not runtime_status.get('error'):
254                     runtime_status.update({
255                         'error': message
256                     })
257                     if detail is not None:
258                         runtime_status.update({
259                             'errorDetail': detail
260                         })
261                 # Further errors are only mentioned as a count.
262                 else:
263                     # Get anything before an optional 'and N more' string.
264                     try:
265                         error_msg = re.match(
266                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
267                         more_failures = re.match(
268                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
269                     except TypeError:
270                         # Ignore tests stubbing errors
271                         return
272                     if more_failures:
273                         failure_qty = int(more_failures.groups()[0])
274                         runtime_status.update({
275                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
276                         })
277                     else:
278                         runtime_status.update({
279                             'error': "%s (and 1 more)" % error_msg
280                         })
281             elif kind in ['warning', 'activity']:
282                 # Record the last warning/activity status without regard of
283                 # previous occurences.
284                 runtime_status.update({
285                     kind: message
286                 })
287                 if detail is not None:
288                     runtime_status.update({
289                         kind+"Detail": detail
290                     })
291             else:
292                 # Ignore any other status kind
293                 return
294             try:
295                 self.api.containers().update(uuid=current['uuid'],
296                                             body={
297                                                 'runtime_status': runtime_status,
298                                             }).execute(num_retries=self.num_retries)
299             except Exception as e:
300                 logger.info("Couldn't update runtime_status: %s", e)
301
302     def wrapped_callback(self, cb, obj, st):
303         with self.workflow_eval_lock:
304             cb(obj, st)
305             self.workflow_eval_lock.notifyAll()
306
307     def get_wrapped_callback(self, cb):
308         return partial(self.wrapped_callback, cb)
309
310     def on_message(self, event):
311         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
312             uuid = event["object_uuid"]
313             if event["properties"]["new_attributes"]["state"] == "Running":
314                 with self.workflow_eval_lock:
315                     j = self.processes[uuid]
316                     if j.running is False:
317                         j.running = True
318                         j.update_pipeline_component(event["properties"]["new_attributes"])
319                         logger.info("%s %s is Running", self.label(j), uuid)
320             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
321                 self.process_done(uuid, event["properties"]["new_attributes"])
322
323     def label(self, obj):
324         return "[%s %s]" % (self.work_api[0:-1], obj.name)
325
326     def poll_states(self):
327         """Poll status of jobs or containers listed in the processes dict.
328
329         Runs in a separate thread.
330         """
331
332         try:
333             remain_wait = self.poll_interval
334             while True:
335                 if remain_wait > 0:
336                     self.stop_polling.wait(remain_wait)
337                 if self.stop_polling.is_set():
338                     break
339                 with self.workflow_eval_lock:
340                     keys = list(self.processes.keys())
341                 if not keys:
342                     remain_wait = self.poll_interval
343                     continue
344
345                 begin_poll = time.time()
346                 if self.work_api == "containers":
347                     table = self.poll_api.container_requests()
348                 elif self.work_api == "jobs":
349                     table = self.poll_api.jobs()
350
351                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
352
353                 while keys:
354                     page = keys[:pageSize]
355                     keys = keys[pageSize:]
356                     try:
357                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
358                     except Exception as e:
359                         logger.warn("Error checking states on API server: %s", e)
360                         remain_wait = self.poll_interval
361                         continue
362
363                     for p in proc_states["items"]:
364                         self.on_message({
365                             "object_uuid": p["uuid"],
366                             "event_type": "update",
367                             "properties": {
368                                 "new_attributes": p
369                             }
370                         })
371                 finish_poll = time.time()
372                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
373         except:
374             logger.exception("Fatal error in state polling thread.")
375             with self.workflow_eval_lock:
376                 self.processes.clear()
377                 self.workflow_eval_lock.notifyAll()
378         finally:
379             self.stop_polling.set()
380
381     def add_intermediate_output(self, uuid):
382         if uuid:
383             self.intermediate_output_collections.append(uuid)
384
385     def trash_intermediate_output(self):
386         logger.info("Cleaning up intermediate output collections")
387         for i in self.intermediate_output_collections:
388             try:
389                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
390             except:
391                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
392             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
393                 break
394
395     def check_features(self, obj):
396         if isinstance(obj, dict):
397             if obj.get("writable") and self.work_api != "containers":
398                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
399             if obj.get("class") == "DockerRequirement":
400                 if obj.get("dockerOutputDirectory"):
401                     if self.work_api != "containers":
402                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
403                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
404                     if not obj.get("dockerOutputDirectory").startswith('/'):
405                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
406                             "Option 'dockerOutputDirectory' must be an absolute path.")
407             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
408                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
409             for v in obj.values():
410                 self.check_features(v)
411         elif isinstance(obj, list):
412             for i,v in enumerate(obj):
413                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
414                     self.check_features(v)
415
416     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
417         outputObj = copy.deepcopy(outputObj)
418
419         files = []
420         def capture(fileobj):
421             files.append(fileobj)
422
423         adjustDirObjs(outputObj, capture)
424         adjustFileObjs(outputObj, capture)
425
426         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
427
428         final = arvados.collection.Collection(api_client=self.api,
429                                               keep_client=self.keep_client,
430                                               num_retries=self.num_retries)
431
432         for k,v in list(generatemapper.items()):
433             if k.startswith("_:"):
434                 if v.type == "Directory":
435                     continue
436                 if v.type == "CreateFile":
437                     with final.open(v.target, "wb") as f:
438                         f.write(v.resolved.encode("utf-8"))
439                     continue
440
441             if not k.startswith("keep:"):
442                 raise Exception("Output source is not in keep or a literal")
443             sp = k.split("/")
444             srccollection = sp[0][5:]
445             try:
446                 reader = self.collection_cache.get(srccollection)
447                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
448                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
449             except arvados.errors.ArgumentError as e:
450                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
451                 raise
452             except IOError as e:
453                 logger.warn("While preparing output collection: %s", e)
454
455         def rewrite(fileobj):
456             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
457             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
458                 if k in fileobj:
459                     del fileobj[k]
460
461         adjustDirObjs(outputObj, rewrite)
462         adjustFileObjs(outputObj, rewrite)
463
464         with final.open("cwl.output.json", "w") as f:
465             res = json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False).encode('utf-8').decode()
466             f.write(res)           
467
468         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
469
470         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
471                     final.api_response()["name"],
472                     final.manifest_locator())
473
474         final_uuid = final.manifest_locator()
475         tags = tagsString.split(',')
476         for tag in tags:
477              self.api.links().create(body={
478                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
479                 }).execute(num_retries=self.num_retries)
480
481         def finalcollection(fileobj):
482             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
483
484         adjustDirObjs(outputObj, finalcollection)
485         adjustFileObjs(outputObj, finalcollection)
486
487         return (outputObj, final)
488
489     def set_crunch_output(self):
490         if self.work_api == "containers":
491             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
492             if current is None:
493                 return
494             try:
495                 self.api.containers().update(uuid=current['uuid'],
496                                              body={
497                                                  'output': self.final_output_collection.portable_data_hash(),
498                                              }).execute(num_retries=self.num_retries)
499                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
500                                               body={
501                                                   'is_trashed': True
502                                               }).execute(num_retries=self.num_retries)
503             except Exception as e:
504                 logger.info("Setting container output: %s", e)
505         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
506             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
507                                    body={
508                                        'output': self.final_output_collection.portable_data_hash(),
509                                        'success': self.final_status == "success",
510                                        'progress':1.0
511                                    }).execute(num_retries=self.num_retries)
512
513     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
514         self.debug = runtimeContext.debug
515
516         tool.visit(self.check_features)
517
518         self.project_uuid = runtimeContext.project_uuid
519         self.pipeline = None
520         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
521         self.secret_store = runtimeContext.secret_store
522
523         self.trash_intermediate = runtimeContext.trash_intermediate
524         if self.trash_intermediate and self.work_api != "containers":
525             raise Exception("--trash-intermediate is only supported with --api=containers.")
526
527         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
528         if self.intermediate_output_ttl and self.work_api != "containers":
529             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
530         if self.intermediate_output_ttl < 0:
531             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
532
533         if runtimeContext.submit_request_uuid and self.work_api != "containers":
534             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
535
536         if not runtimeContext.name:
537             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
538
539         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
540         # Also uploads docker images.
541         merged_map = upload_workflow_deps(self, tool)
542
543         # Reload tool object which may have been updated by
544         # upload_workflow_deps
545         # Don't validate this time because it will just print redundant errors.
546         loadingContext = self.loadingContext.copy()
547         loadingContext.loader = tool.doc_loader
548         loadingContext.avsc_names = tool.doc_schema
549         loadingContext.metadata = tool.metadata
550         loadingContext.do_validate = False
551
552         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
553                                   loadingContext)
554
555         # Upload local file references in the job order.
556         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
557                                      tool, job_order)
558
559         existing_uuid = runtimeContext.update_workflow
560         if existing_uuid or runtimeContext.create_workflow:
561             # Create a pipeline template or workflow record and exit.
562             if self.work_api == "jobs":
563                 tmpl = RunnerTemplate(self, tool, job_order,
564                                       runtimeContext.enable_reuse,
565                                       uuid=existing_uuid,
566                                       submit_runner_ram=runtimeContext.submit_runner_ram,
567                                       name=runtimeContext.name,
568                                       merged_map=merged_map,
569                                       loadingContext=loadingContext)
570                 tmpl.save()
571                 # cwltool.main will write our return value to stdout.
572                 return (tmpl.uuid, "success")
573             elif self.work_api == "containers":
574                 return (upload_workflow(self, tool, job_order,
575                                         self.project_uuid,
576                                         uuid=existing_uuid,
577                                         submit_runner_ram=runtimeContext.submit_runner_ram,
578                                         name=runtimeContext.name,
579                                         merged_map=merged_map),
580                         "success")
581
582         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
583         self.eval_timeout = runtimeContext.eval_timeout
584
585         runtimeContext = runtimeContext.copy()
586         runtimeContext.use_container = True
587         runtimeContext.tmpdir_prefix = "tmp"
588         runtimeContext.work_api = self.work_api
589
590         if self.work_api == "containers":
591             if self.ignore_docker_for_reuse:
592                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
593             runtimeContext.outdir = "/var/spool/cwl"
594             runtimeContext.docker_outdir = "/var/spool/cwl"
595             runtimeContext.tmpdir = "/tmp"
596             runtimeContext.docker_tmpdir = "/tmp"
597         elif self.work_api == "jobs":
598             if runtimeContext.priority != DEFAULT_PRIORITY:
599                 raise Exception("--priority not implemented for jobs API.")
600             runtimeContext.outdir = "$(task.outdir)"
601             runtimeContext.docker_outdir = "$(task.outdir)"
602             runtimeContext.tmpdir = "$(task.tmpdir)"
603
604         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
605             raise Exception("--priority must be in the range 1..1000.")
606
607         if self.should_estimate_cache_size:
608             visited = set()
609             estimated_size = [0]
610             def estimate_collection_cache(obj):
611                 if obj.get("location", "").startswith("keep:"):
612                     m = pdh_size.match(obj["location"][5:])
613                     if m and m.group(1) not in visited:
614                         visited.add(m.group(1))
615                         estimated_size[0] += int(m.group(2))
616             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
617             runtimeContext.collection_cache_size = max((old_div((estimated_size[0]*192), (1024*1024)))+1, 256)
618             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
619
620         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
621
622         runnerjob = None
623         if runtimeContext.submit:
624             # Submit a runner job to run the workflow for us.
625             if self.work_api == "containers":
626                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
627                     runtimeContext.runnerjob = tool.tool["id"]
628                 else:
629                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
630                                                 self.output_name,
631                                                 self.output_tags,
632                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
633                                                 name=runtimeContext.name,
634                                                 on_error=runtimeContext.on_error,
635                                                 submit_runner_image=runtimeContext.submit_runner_image,
636                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
637                                                 merged_map=merged_map,
638                                                 priority=runtimeContext.priority,
639                                                 secret_store=self.secret_store,
640                                                 collection_cache_size=runtimeContext.collection_cache_size,
641                                                 collection_cache_is_default=self.should_estimate_cache_size)
642             elif self.work_api == "jobs":
643                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
644                                       self.output_name,
645                                       self.output_tags,
646                                       submit_runner_ram=runtimeContext.submit_runner_ram,
647                                       name=runtimeContext.name,
648                                       on_error=runtimeContext.on_error,
649                                       submit_runner_image=runtimeContext.submit_runner_image,
650                                       merged_map=merged_map)
651         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
652             # Create pipeline for local run
653             self.pipeline = self.api.pipeline_instances().create(
654                 body={
655                     "owner_uuid": self.project_uuid,
656                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
657                     "components": {},
658                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
659             logger.info("Pipeline instance %s", self.pipeline["uuid"])
660
661         if runtimeContext.cwl_runner_job is not None:
662             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
663
664         jobiter = tool.job(job_order,
665                            self.output_callback,
666                            runtimeContext)
667
668         if runtimeContext.submit and not runtimeContext.wait:
669             runnerjob = next(jobiter)
670             runnerjob.run(runtimeContext)
671             return (runnerjob.uuid, "success")
672
673         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
674         if current_container:
675             logger.info("Running inside container %s", current_container.get("uuid"))
676
677         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
678         self.polling_thread = threading.Thread(target=self.poll_states)
679         self.polling_thread.start()
680
681         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
682
683         try:
684             self.workflow_eval_lock.acquire()
685
686             # Holds the lock while this code runs and releases it when
687             # it is safe to do so in self.workflow_eval_lock.wait(),
688             # at which point on_message can update job state and
689             # process output callbacks.
690
691             loopperf = Perf(metrics, "jobiter")
692             loopperf.__enter__()
693             for runnable in jobiter:
694                 loopperf.__exit__()
695
696                 if self.stop_polling.is_set():
697                     break
698
699                 if self.task_queue.error is not None:
700                     raise self.task_queue.error
701
702                 if runnable:
703                     with Perf(metrics, "run"):
704                         self.start_run(runnable, runtimeContext)
705                 else:
706                     if (self.task_queue.in_flight + len(self.processes)) > 0:
707                         self.workflow_eval_lock.wait(3)
708                     else:
709                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
710                         break
711
712                 if self.stop_polling.is_set():
713                     break
714
715                 loopperf.__enter__()
716             loopperf.__exit__()
717
718             while (self.task_queue.in_flight + len(self.processes)) > 0:
719                 if self.task_queue.error is not None:
720                     raise self.task_queue.error
721                 self.workflow_eval_lock.wait(3)
722
723         except UnsupportedRequirement:
724             raise
725         except:
726             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
727                 logger.error("Interrupted, workflow will be cancelled")
728             else:
729                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
730             if self.pipeline:
731                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
732                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
733             if runtimeContext.submit and isinstance(tool, Runner):
734                 runnerjob = tool
735                 if runnerjob.uuid and self.work_api == "containers":
736                     self.api.container_requests().update(uuid=runnerjob.uuid,
737                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
738         finally:
739             self.workflow_eval_lock.release()
740             self.task_queue.drain()
741             self.stop_polling.set()
742             self.polling_thread.join()
743             self.task_queue.join()
744
745         if self.final_status == "UnsupportedRequirement":
746             raise UnsupportedRequirement("Check log for details.")
747
748         if self.final_output is None:
749             raise WorkflowException("Workflow did not return a result.")
750
751         if runtimeContext.submit and isinstance(tool, Runner):
752             logger.info("Final output collection %s", tool.final_output)
753         else:
754             if self.output_name is None:
755                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
756             if self.output_tags is None:
757                 self.output_tags = ""
758
759             storage_classes = runtimeContext.storage_classes.strip().split(",")
760             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
761             self.set_crunch_output()
762
763         if runtimeContext.compute_checksum:
764             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
765             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
766
767         if self.trash_intermediate and self.final_status == "success":
768             self.trash_intermediate_output()
769
770         return (self.final_output, self.final_status)