14198: Add paging on container list, check for valid cluster id
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 from .arvcontainer import RunnerContainer
27 from .arvjob import RunnerJob, RunnerTemplate
28 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
29 from .arvtool import ArvadosCommandTool
30 from .arvworkflow import ArvadosWorkflow, upload_workflow
31 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
32 from .perf import Perf
33 from .pathmapper import NoFollowPathMapper
34 from .task_queue import TaskQueue
35 from .context import ArvLoadingContext, ArvRuntimeContext
36 from .util import get_current_container
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 class RuntimeStatusLoggingHandler(logging.Handler):
47     """
48     Intercepts logging calls and report them as runtime statuses on runner
49     containers.
50     """
51     def __init__(self, runtime_status_update_func):
52         super(RuntimeStatusLoggingHandler, self).__init__()
53         self.runtime_status_update = runtime_status_update_func
54
55     def emit(self, record):
56         kind = None
57         if record.levelno >= logging.ERROR:
58             kind = 'error'
59         elif record.levelno >= logging.WARNING:
60             kind = 'warning'
61         if kind is not None:
62             log_msg = record.getMessage()
63             if '\n' in log_msg:
64                 # If the logged message is multi-line, use its first line as status
65                 # and the rest as detail.
66                 status, detail = log_msg.split('\n', 1)
67                 self.runtime_status_update(
68                     kind,
69                     "%s: %s" % (record.name, status),
70                     detail
71                 )
72             else:
73                 self.runtime_status_update(
74                     kind,
75                     "%s: %s" % (record.name, record.getMessage())
76                 )
77
78 class ArvCwlExecutor(object):
79     """Execute a CWL tool or workflow, submit work (using either jobs or
80     containers API), wait for them to complete, and report output.
81
82     """
83
84     def __init__(self, api_client,
85                  arvargs=None,
86                  keep_client=None,
87                  num_retries=4,
88                  thread_count=4):
89
90         if arvargs is None:
91             arvargs = argparse.Namespace()
92             arvargs.work_api = None
93             arvargs.output_name = None
94             arvargs.output_tags = None
95             arvargs.thread_count = 1
96
97         self.api = api_client
98         self.processes = {}
99         self.workflow_eval_lock = threading.Condition(threading.RLock())
100         self.final_output = None
101         self.final_status = None
102         self.num_retries = num_retries
103         self.uuid = None
104         self.stop_polling = threading.Event()
105         self.poll_api = None
106         self.pipeline = None
107         self.final_output_collection = None
108         self.output_name = arvargs.output_name
109         self.output_tags = arvargs.output_tags
110         self.project_uuid = None
111         self.intermediate_output_ttl = 0
112         self.intermediate_output_collections = []
113         self.trash_intermediate = False
114         self.thread_count = arvargs.thread_count
115         self.poll_interval = 12
116         self.loadingContext = None
117
118         if keep_client is not None:
119             self.keep_client = keep_client
120         else:
121             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
122
123         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
124
125         self.fetcher_constructor = partial(CollectionFetcher,
126                                            api_client=self.api,
127                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
128                                            num_retries=self.num_retries)
129
130         self.work_api = None
131         expected_api = ["jobs", "containers"]
132         for api in expected_api:
133             try:
134                 methods = self.api._rootDesc.get('resources')[api]['methods']
135                 if ('httpMethod' in methods['create'] and
136                     (arvargs.work_api == api or arvargs.work_api is None)):
137                     self.work_api = api
138                     break
139             except KeyError:
140                 pass
141
142         if not self.work_api:
143             if arvargs.work_api is None:
144                 raise Exception("No supported APIs")
145             else:
146                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
147
148         if self.work_api == "jobs":
149             logger.warn("""
150 *******************************
151 Using the deprecated 'jobs' API.
152
153 To get rid of this warning:
154
155 Users: read about migrating at
156 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
157 and use the option --api=containers
158
159 Admins: configure the cluster to disable the 'jobs' API as described at:
160 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
161 *******************************""")
162
163         self.loadingContext = ArvLoadingContext(vars(arvargs))
164         self.loadingContext.fetcher_constructor = self.fetcher_constructor
165         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
166         self.loadingContext.construct_tool_object = self.arv_make_tool
167
168         # Add a custom logging handler to the root logger for runtime status reporting
169         # if running inside a container
170         if get_current_container(self.api, self.num_retries, logger):
171             root_logger = logging.getLogger('')
172             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
173             root_logger.addHandler(handler)
174
175         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
176         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
177                                                      collection_cache=self.collection_cache)
178
179
180     def arv_make_tool(self, toolpath_object, loadingContext):
181         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
182             return ArvadosCommandTool(self, toolpath_object, loadingContext)
183         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
184             return ArvadosWorkflow(self, toolpath_object, loadingContext)
185         else:
186             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
187
188     def output_callback(self, out, processStatus):
189         with self.workflow_eval_lock:
190             if processStatus == "success":
191                 logger.info("Overall process status is %s", processStatus)
192                 state = "Complete"
193             else:
194                 logger.error("Overall process status is %s", processStatus)
195                 state = "Failed"
196             if self.pipeline:
197                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
198                                                         body={"state": state}).execute(num_retries=self.num_retries)
199             self.final_status = processStatus
200             self.final_output = out
201             self.workflow_eval_lock.notifyAll()
202
203
204     def start_run(self, runnable, runtimeContext):
205         self.task_queue.add(partial(runnable.run, runtimeContext))
206
207     def process_submitted(self, container):
208         with self.workflow_eval_lock:
209             self.processes[container.uuid] = container
210
211     def process_done(self, uuid, record):
212         with self.workflow_eval_lock:
213             j = self.processes[uuid]
214             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
215             self.task_queue.add(partial(j.done, record))
216             del self.processes[uuid]
217
218     def runtime_status_update(self, kind, message, detail=None):
219         """
220         Updates the runtime_status field on the runner container.
221         Called when there's a need to report errors, warnings or just
222         activity statuses, for example in the RuntimeStatusLoggingHandler.
223         """
224         with self.workflow_eval_lock:
225             current = get_current_container(self.api, self.num_retries, logger)
226             if current is None:
227                 return
228             runtime_status = current.get('runtime_status', {})
229             # In case of status being an error, only report the first one.
230             if kind == 'error':
231                 if not runtime_status.get('error'):
232                     runtime_status.update({
233                         'error': message
234                     })
235                     if detail is not None:
236                         runtime_status.update({
237                             'errorDetail': detail
238                         })
239                 # Further errors are only mentioned as a count.
240                 else:
241                     # Get anything before an optional 'and N more' string.
242                     try:
243                         error_msg = re.match(
244                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
245                         more_failures = re.match(
246                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
247                     except TypeError:
248                         # Ignore tests stubbing errors
249                         return
250                     if more_failures:
251                         failure_qty = int(more_failures.groups()[0])
252                         runtime_status.update({
253                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
254                         })
255                     else:
256                         runtime_status.update({
257                             'error': "%s (and 1 more)" % error_msg
258                         })
259             elif kind in ['warning', 'activity']:
260                 # Record the last warning/activity status without regard of
261                 # previous occurences.
262                 runtime_status.update({
263                     kind: message
264                 })
265                 if detail is not None:
266                     runtime_status.update({
267                         kind+"Detail": detail
268                     })
269             else:
270                 # Ignore any other status kind
271                 return
272             try:
273                 self.api.containers().update(uuid=current['uuid'],
274                                             body={
275                                                 'runtime_status': runtime_status,
276                                             }).execute(num_retries=self.num_retries)
277             except Exception as e:
278                 logger.info("Couldn't update runtime_status: %s", e)
279
280     def wrapped_callback(self, cb, obj, st):
281         with self.workflow_eval_lock:
282             cb(obj, st)
283             self.workflow_eval_lock.notifyAll()
284
285     def get_wrapped_callback(self, cb):
286         return partial(self.wrapped_callback, cb)
287
288     def on_message(self, event):
289         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
290             uuid = event["object_uuid"]
291             if event["properties"]["new_attributes"]["state"] == "Running":
292                 with self.workflow_eval_lock:
293                     j = self.processes[uuid]
294                     if j.running is False:
295                         j.running = True
296                         j.update_pipeline_component(event["properties"]["new_attributes"])
297                         logger.info("%s %s is Running", self.label(j), uuid)
298             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
299                 self.process_done(uuid, event["properties"]["new_attributes"])
300
301     def label(self, obj):
302         return "[%s %s]" % (self.work_api[0:-1], obj.name)
303
304     def poll_states(self):
305         """Poll status of jobs or containers listed in the processes dict.
306
307         Runs in a separate thread.
308         """
309
310         try:
311             remain_wait = self.poll_interval
312             while True:
313                 if remain_wait > 0:
314                     self.stop_polling.wait(remain_wait)
315                 if self.stop_polling.is_set():
316                     break
317                 with self.workflow_eval_lock:
318                     keys = list(self.processes.keys())
319                 if not keys:
320                     remain_wait = self.poll_interval
321                     continue
322
323                 begin_poll = time.time()
324                 if self.work_api == "containers":
325                     table = self.poll_api.container_requests()
326                 elif self.work_api == "jobs":
327                     table = self.poll_api.jobs()
328
329                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
330
331                 while keys:
332                     page = keys[:pageSize]
333                     keys = keys[pageSize:]
334                     try:
335                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
336                     except Exception as e:
337                         logger.warn("Error checking states on API server: %s", e)
338                         remain_wait = self.poll_interval
339                         continue
340
341                     for p in proc_states["items"]:
342                         self.on_message({
343                             "object_uuid": p["uuid"],
344                             "event_type": "update",
345                             "properties": {
346                                 "new_attributes": p
347                             }
348                         })
349                 finish_poll = time.time()
350                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
351         except:
352             logger.exception("Fatal error in state polling thread.")
353             with self.workflow_eval_lock:
354                 self.processes.clear()
355                 self.workflow_eval_lock.notifyAll()
356         finally:
357             self.stop_polling.set()
358
359     def add_intermediate_output(self, uuid):
360         if uuid:
361             self.intermediate_output_collections.append(uuid)
362
363     def trash_intermediate_output(self):
364         logger.info("Cleaning up intermediate output collections")
365         for i in self.intermediate_output_collections:
366             try:
367                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
368             except:
369                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
370             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
371                 break
372
373     def check_features(self, obj):
374         if isinstance(obj, dict):
375             if obj.get("writable") and self.work_api != "containers":
376                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
377             if obj.get("class") == "DockerRequirement":
378                 if obj.get("dockerOutputDirectory"):
379                     if self.work_api != "containers":
380                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
381                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
382                     if not obj.get("dockerOutputDirectory").startswith('/'):
383                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
384                             "Option 'dockerOutputDirectory' must be an absolute path.")
385             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
386                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
387             for v in obj.itervalues():
388                 self.check_features(v)
389         elif isinstance(obj, list):
390             for i,v in enumerate(obj):
391                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
392                     self.check_features(v)
393
394     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
395         outputObj = copy.deepcopy(outputObj)
396
397         files = []
398         def capture(fileobj):
399             files.append(fileobj)
400
401         adjustDirObjs(outputObj, capture)
402         adjustFileObjs(outputObj, capture)
403
404         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
405
406         final = arvados.collection.Collection(api_client=self.api,
407                                               keep_client=self.keep_client,
408                                               num_retries=self.num_retries)
409
410         for k,v in generatemapper.items():
411             if k.startswith("_:"):
412                 if v.type == "Directory":
413                     continue
414                 if v.type == "CreateFile":
415                     with final.open(v.target, "wb") as f:
416                         f.write(v.resolved.encode("utf-8"))
417                     continue
418
419             if not k.startswith("keep:"):
420                 raise Exception("Output source is not in keep or a literal")
421             sp = k.split("/")
422             srccollection = sp[0][5:]
423             try:
424                 reader = self.collection_cache.get(srccollection)
425                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
426                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
427             except arvados.errors.ArgumentError as e:
428                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
429                 raise
430             except IOError as e:
431                 logger.warn("While preparing output collection: %s", e)
432
433         def rewrite(fileobj):
434             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
435             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
436                 if k in fileobj:
437                     del fileobj[k]
438
439         adjustDirObjs(outputObj, rewrite)
440         adjustFileObjs(outputObj, rewrite)
441
442         with final.open("cwl.output.json", "w") as f:
443             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
444
445         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
446
447         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
448                     final.api_response()["name"],
449                     final.manifest_locator())
450
451         final_uuid = final.manifest_locator()
452         tags = tagsString.split(',')
453         for tag in tags:
454              self.api.links().create(body={
455                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
456                 }).execute(num_retries=self.num_retries)
457
458         def finalcollection(fileobj):
459             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
460
461         adjustDirObjs(outputObj, finalcollection)
462         adjustFileObjs(outputObj, finalcollection)
463
464         return (outputObj, final)
465
466     def set_crunch_output(self):
467         if self.work_api == "containers":
468             current = get_current_container(self.api, self.num_retries, logger)
469             if current is None:
470                 return
471             try:
472                 self.api.containers().update(uuid=current['uuid'],
473                                              body={
474                                                  'output': self.final_output_collection.portable_data_hash(),
475                                              }).execute(num_retries=self.num_retries)
476                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
477                                               body={
478                                                   'is_trashed': True
479                                               }).execute(num_retries=self.num_retries)
480             except Exception as e:
481                 logger.info("Setting container output: %s", e)
482         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
483             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
484                                    body={
485                                        'output': self.final_output_collection.portable_data_hash(),
486                                        'success': self.final_status == "success",
487                                        'progress':1.0
488                                    }).execute(num_retries=self.num_retries)
489
490     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
491         self.debug = runtimeContext.debug
492
493         tool.visit(self.check_features)
494
495         self.project_uuid = runtimeContext.project_uuid
496         self.pipeline = None
497         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
498         self.secret_store = runtimeContext.secret_store
499
500         self.trash_intermediate = runtimeContext.trash_intermediate
501         if self.trash_intermediate and self.work_api != "containers":
502             raise Exception("--trash-intermediate is only supported with --api=containers.")
503
504         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
505         if self.intermediate_output_ttl and self.work_api != "containers":
506             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
507         if self.intermediate_output_ttl < 0:
508             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
509
510         if runtimeContext.submit_request_uuid and self.work_api != "containers":
511             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
512
513         if not runtimeContext.name:
514             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
515
516         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
517         # Also uploads docker images.
518         merged_map = upload_workflow_deps(self, tool)
519
520         # Reload tool object which may have been updated by
521         # upload_workflow_deps
522         # Don't validate this time because it will just print redundant errors.
523         loadingContext = self.loadingContext.copy()
524         loadingContext.loader = tool.doc_loader
525         loadingContext.avsc_names = tool.doc_schema
526         loadingContext.metadata = tool.metadata
527         loadingContext.do_validate = False
528
529         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
530                                   loadingContext)
531
532         # Upload local file references in the job order.
533         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
534                                      tool, job_order)
535
536         existing_uuid = runtimeContext.update_workflow
537         if existing_uuid or runtimeContext.create_workflow:
538             # Create a pipeline template or workflow record and exit.
539             if self.work_api == "jobs":
540                 tmpl = RunnerTemplate(self, tool, job_order,
541                                       runtimeContext.enable_reuse,
542                                       uuid=existing_uuid,
543                                       submit_runner_ram=runtimeContext.submit_runner_ram,
544                                       name=runtimeContext.name,
545                                       merged_map=merged_map)
546                 tmpl.save()
547                 # cwltool.main will write our return value to stdout.
548                 return (tmpl.uuid, "success")
549             elif self.work_api == "containers":
550                 return (upload_workflow(self, tool, job_order,
551                                         self.project_uuid,
552                                         uuid=existing_uuid,
553                                         submit_runner_ram=runtimeContext.submit_runner_ram,
554                                         name=runtimeContext.name,
555                                         merged_map=merged_map),
556                         "success")
557
558         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
559         self.eval_timeout = runtimeContext.eval_timeout
560
561         runtimeContext = runtimeContext.copy()
562         runtimeContext.use_container = True
563         runtimeContext.tmpdir_prefix = "tmp"
564         runtimeContext.work_api = self.work_api
565
566         if self.work_api == "containers":
567             if self.ignore_docker_for_reuse:
568                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
569             runtimeContext.outdir = "/var/spool/cwl"
570             runtimeContext.docker_outdir = "/var/spool/cwl"
571             runtimeContext.tmpdir = "/tmp"
572             runtimeContext.docker_tmpdir = "/tmp"
573         elif self.work_api == "jobs":
574             if runtimeContext.priority != DEFAULT_PRIORITY:
575                 raise Exception("--priority not implemented for jobs API.")
576             runtimeContext.outdir = "$(task.outdir)"
577             runtimeContext.docker_outdir = "$(task.outdir)"
578             runtimeContext.tmpdir = "$(task.tmpdir)"
579
580         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
581             raise Exception("--priority must be in the range 1..1000.")
582
583         runnerjob = None
584         if runtimeContext.submit:
585             # Submit a runner job to run the workflow for us.
586             if self.work_api == "containers":
587                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
588                     runtimeContext.runnerjob = tool.tool["id"]
589                     runnerjob = tool.job(job_order,
590                                          self.output_callback,
591                                          runtimeContext).next()
592                 else:
593                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
594                                                 self.output_name,
595                                                 self.output_tags,
596                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
597                                                 name=runtimeContext.name,
598                                                 on_error=runtimeContext.on_error,
599                                                 submit_runner_image=runtimeContext.submit_runner_image,
600                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
601                                                 merged_map=merged_map,
602                                                 priority=runtimeContext.priority,
603                                                 secret_store=self.secret_store)
604             elif self.work_api == "jobs":
605                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
606                                       self.output_name,
607                                       self.output_tags,
608                                       submit_runner_ram=runtimeContext.submit_runner_ram,
609                                       name=runtimeContext.name,
610                                       on_error=runtimeContext.on_error,
611                                       submit_runner_image=runtimeContext.submit_runner_image,
612                                       merged_map=merged_map)
613         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
614             # Create pipeline for local run
615             self.pipeline = self.api.pipeline_instances().create(
616                 body={
617                     "owner_uuid": self.project_uuid,
618                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
619                     "components": {},
620                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
621             logger.info("Pipeline instance %s", self.pipeline["uuid"])
622
623         if runnerjob and not runtimeContext.wait:
624             submitargs = runtimeContext.copy()
625             submitargs.submit = False
626             runnerjob.run(submitargs)
627             return (runnerjob.uuid, "success")
628
629         current_container = get_current_container(self.api, self.num_retries, logger)
630         if current_container:
631             logger.info("Running inside container %s", current_container.get("uuid"))
632
633         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
634         self.polling_thread = threading.Thread(target=self.poll_states)
635         self.polling_thread.start()
636
637         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
638
639         try:
640             self.workflow_eval_lock.acquire()
641             if runnerjob:
642                 jobiter = iter((runnerjob,))
643             else:
644                 if runtimeContext.cwl_runner_job is not None:
645                     self.uuid = runtimeContext.cwl_runner_job.get('uuid')
646                 jobiter = tool.job(job_order,
647                                    self.output_callback,
648                                    runtimeContext)
649
650             # Holds the lock while this code runs and releases it when
651             # it is safe to do so in self.workflow_eval_lock.wait(),
652             # at which point on_message can update job state and
653             # process output callbacks.
654
655             loopperf = Perf(metrics, "jobiter")
656             loopperf.__enter__()
657             for runnable in jobiter:
658                 loopperf.__exit__()
659
660                 if self.stop_polling.is_set():
661                     break
662
663                 if self.task_queue.error is not None:
664                     raise self.task_queue.error
665
666                 if runnable:
667                     with Perf(metrics, "run"):
668                         self.start_run(runnable, runtimeContext)
669                 else:
670                     if (self.task_queue.in_flight + len(self.processes)) > 0:
671                         self.workflow_eval_lock.wait(3)
672                     else:
673                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
674                         break
675                 loopperf.__enter__()
676             loopperf.__exit__()
677
678             while (self.task_queue.in_flight + len(self.processes)) > 0:
679                 if self.task_queue.error is not None:
680                     raise self.task_queue.error
681                 self.workflow_eval_lock.wait(3)
682
683         except UnsupportedRequirement:
684             raise
685         except:
686             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
687                 logger.error("Interrupted, workflow will be cancelled")
688             else:
689                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
690             if self.pipeline:
691                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
692                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
693             if runnerjob and runnerjob.uuid and self.work_api == "containers":
694                 self.api.container_requests().update(uuid=runnerjob.uuid,
695                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
696         finally:
697             self.workflow_eval_lock.release()
698             self.task_queue.drain()
699             self.stop_polling.set()
700             self.polling_thread.join()
701             self.task_queue.join()
702
703         if self.final_status == "UnsupportedRequirement":
704             raise UnsupportedRequirement("Check log for details.")
705
706         if self.final_output is None:
707             raise WorkflowException("Workflow did not return a result.")
708
709         if runtimeContext.submit and isinstance(runnerjob, Runner):
710             logger.info("Final output collection %s", runnerjob.final_output)
711         else:
712             if self.output_name is None:
713                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
714             if self.output_tags is None:
715                 self.output_tags = ""
716
717             storage_classes = runtimeContext.storage_classes.strip().split(",")
718             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
719             self.set_crunch_output()
720
721         if runtimeContext.compute_checksum:
722             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
723             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
724
725         if self.trash_intermediate and self.final_status == "success":
726             self.trash_intermediate_output()
727
728         return (self.final_output, self.final_status)