Merge branch '19464-capture-git-info' refs #19464
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import subprocess
21 import time
22 import urllib
23
24 from cwltool.errors import WorkflowException
25 import cwltool.workflow
26 from schema_salad.sourceline import SourceLine
27 import schema_salad.validate as validate
28 from schema_salad.ref_resolver import file_uri, uri_file_path
29
30 import arvados
31 import arvados.config
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
34
35 import arvados_cwl.util
36 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
37 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
38 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from cwltool.task_queue import TaskQueue
44 from .context import ArvLoadingContext, ArvRuntimeContext
45 from ._version import __version__
46
47 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
48 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
49 from cwltool.command_line_tool import compute_checksums
50 from cwltool.load_tool import load_tool
51
52 logger = logging.getLogger('arvados.cwl-runner')
53 metrics = logging.getLogger('arvados.cwl-runner.metrics')
54
55 DEFAULT_PRIORITY = 500
56
57 class RuntimeStatusLoggingHandler(logging.Handler):
58     """
59     Intercepts logging calls and report them as runtime statuses on runner
60     containers.
61     """
62     def __init__(self, runtime_status_update_func):
63         super(RuntimeStatusLoggingHandler, self).__init__()
64         self.runtime_status_update = runtime_status_update_func
65         self.updatingRuntimeStatus = False
66
67     def emit(self, record):
68         kind = None
69         if record.levelno >= logging.ERROR:
70             kind = 'error'
71         elif record.levelno >= logging.WARNING:
72             kind = 'warning'
73         if kind is not None and self.updatingRuntimeStatus is not True:
74             self.updatingRuntimeStatus = True
75             try:
76                 log_msg = record.getMessage()
77                 if '\n' in log_msg:
78                     # If the logged message is multi-line, use its first line as status
79                     # and the rest as detail.
80                     status, detail = log_msg.split('\n', 1)
81                     self.runtime_status_update(
82                         kind,
83                         "%s: %s" % (record.name, status),
84                         detail
85                     )
86                 else:
87                     self.runtime_status_update(
88                         kind,
89                         "%s: %s" % (record.name, record.getMessage())
90                     )
91             finally:
92                 self.updatingRuntimeStatus = False
93
94
95 class ArvCwlExecutor(object):
96     """Execute a CWL tool or workflow, submit work (using containers API),
97     wait for them to complete, and report output.
98
99     """
100
101     def __init__(self, api_client,
102                  arvargs=None,
103                  keep_client=None,
104                  num_retries=4,
105                  thread_count=4,
106                  stdout=sys.stdout):
107
108         if arvargs is None:
109             arvargs = argparse.Namespace()
110             arvargs.work_api = None
111             arvargs.output_name = None
112             arvargs.output_tags = None
113             arvargs.thread_count = 1
114             arvargs.collection_cache_size = None
115
116         self.api = api_client
117         self.processes = {}
118         self.workflow_eval_lock = threading.Condition(threading.RLock())
119         self.final_output = None
120         self.final_status = None
121         self.num_retries = num_retries
122         self.uuid = None
123         self.stop_polling = threading.Event()
124         self.poll_api = None
125         self.pipeline = None
126         self.final_output_collection = None
127         self.output_name = arvargs.output_name
128         self.output_tags = arvargs.output_tags
129         self.project_uuid = None
130         self.intermediate_output_ttl = 0
131         self.intermediate_output_collections = []
132         self.trash_intermediate = False
133         self.thread_count = arvargs.thread_count
134         self.poll_interval = 12
135         self.loadingContext = None
136         self.should_estimate_cache_size = True
137         self.fs_access = None
138         self.secret_store = None
139         self.stdout = stdout
140
141         if keep_client is not None:
142             self.keep_client = keep_client
143         else:
144             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
145
146         if arvargs.collection_cache_size:
147             collection_cache_size = arvargs.collection_cache_size*1024*1024
148             self.should_estimate_cache_size = False
149         else:
150             collection_cache_size = 256*1024*1024
151
152         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
153                                                 cap=collection_cache_size)
154
155         self.fetcher_constructor = partial(CollectionFetcher,
156                                            api_client=self.api,
157                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
158                                            num_retries=self.num_retries)
159
160         self.work_api = None
161         expected_api = ["containers"]
162         for api in expected_api:
163             try:
164                 methods = self.api._rootDesc.get('resources')[api]['methods']
165                 if ('httpMethod' in methods['create'] and
166                     (arvargs.work_api == api or arvargs.work_api is None)):
167                     self.work_api = api
168                     break
169             except KeyError:
170                 pass
171
172         if not self.work_api:
173             if arvargs.work_api is None:
174                 raise Exception("No supported APIs")
175             else:
176                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
177
178         if self.work_api == "jobs":
179             logger.error("""
180 *******************************
181 The 'jobs' API is no longer supported.
182 *******************************""")
183             exit(1)
184
185         self.loadingContext = ArvLoadingContext(vars(arvargs))
186         self.loadingContext.fetcher_constructor = self.fetcher_constructor
187         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
188         self.loadingContext.construct_tool_object = self.arv_make_tool
189
190         # Add a custom logging handler to the root logger for runtime status reporting
191         # if running inside a container
192         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
193             root_logger = logging.getLogger('')
194
195             # Remove existing RuntimeStatusLoggingHandlers if they exist
196             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
197             root_logger.handlers = handlers
198
199             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
200             root_logger.addHandler(handler)
201
202         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
203         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
204                                                      collection_cache=self.collection_cache)
205
206         validate_cluster_target(self, self.toplevel_runtimeContext)
207
208
209     def arv_make_tool(self, toolpath_object, loadingContext):
210         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
211             return ArvadosCommandTool(self, toolpath_object, loadingContext)
212         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
213             return ArvadosWorkflow(self, toolpath_object, loadingContext)
214         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
215             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
216         else:
217             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
218
219     def output_callback(self, out, processStatus):
220         with self.workflow_eval_lock:
221             if processStatus == "success":
222                 logger.info("Overall process status is %s", processStatus)
223                 state = "Complete"
224             else:
225                 logger.error("Overall process status is %s", processStatus)
226                 state = "Failed"
227             if self.pipeline:
228                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
229                                                         body={"state": state}).execute(num_retries=self.num_retries)
230             self.final_status = processStatus
231             self.final_output = out
232             self.workflow_eval_lock.notifyAll()
233
234
235     def start_run(self, runnable, runtimeContext):
236         self.task_queue.add(partial(runnable.run, runtimeContext),
237                             self.workflow_eval_lock, self.stop_polling)
238
239     def process_submitted(self, container):
240         with self.workflow_eval_lock:
241             self.processes[container.uuid] = container
242
243     def process_done(self, uuid, record):
244         with self.workflow_eval_lock:
245             j = self.processes[uuid]
246             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
247             self.task_queue.add(partial(j.done, record),
248                                 self.workflow_eval_lock, self.stop_polling)
249             del self.processes[uuid]
250
251     def runtime_status_update(self, kind, message, detail=None):
252         """
253         Updates the runtime_status field on the runner container.
254         Called when there's a need to report errors, warnings or just
255         activity statuses, for example in the RuntimeStatusLoggingHandler.
256         """
257
258         if kind not in ('error', 'warning'):
259             # Ignore any other status kind
260             return
261
262         with self.workflow_eval_lock:
263             current = None
264             try:
265                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
266             except Exception as e:
267                 logger.info("Couldn't get current container: %s", e)
268             if current is None:
269                 return
270             runtime_status = current.get('runtime_status', {})
271
272             original_updatemessage = updatemessage = runtime_status.get(kind, "")
273             if not updatemessage:
274                 updatemessage = message
275
276             # Subsequent messages tacked on in detail
277             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
278             maxlines = 40
279             if updatedetail.count("\n") < maxlines:
280                 if updatedetail:
281                     updatedetail += "\n"
282                 updatedetail += message + "\n"
283
284                 if detail:
285                     updatedetail += detail + "\n"
286
287                 if updatedetail.count("\n") >= maxlines:
288                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
289
290             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
291                 # don't waste time doing an update if nothing changed
292                 # (usually because we exceeded the max lines)
293                 return
294
295             runtime_status.update({
296                 kind: updatemessage,
297                 kind+'Detail': updatedetail,
298             })
299
300             try:
301                 self.api.containers().update(uuid=current['uuid'],
302                                             body={
303                                                 'runtime_status': runtime_status,
304                                             }).execute(num_retries=self.num_retries)
305             except Exception as e:
306                 logger.info("Couldn't update runtime_status: %s", e)
307
308     def wrapped_callback(self, cb, obj, st):
309         with self.workflow_eval_lock:
310             cb(obj, st)
311             self.workflow_eval_lock.notifyAll()
312
313     def get_wrapped_callback(self, cb):
314         return partial(self.wrapped_callback, cb)
315
316     def on_message(self, event):
317         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
318             uuid = event["object_uuid"]
319             if event["properties"]["new_attributes"]["state"] == "Running":
320                 with self.workflow_eval_lock:
321                     j = self.processes[uuid]
322                     if j.running is False:
323                         j.running = True
324                         j.update_pipeline_component(event["properties"]["new_attributes"])
325                         logger.info("%s %s is Running", self.label(j), uuid)
326             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
327                 self.process_done(uuid, event["properties"]["new_attributes"])
328
329     def label(self, obj):
330         return "[%s %s]" % (self.work_api[0:-1], obj.name)
331
332     def poll_states(self):
333         """Poll status of containers listed in the processes dict.
334
335         Runs in a separate thread.
336         """
337
338         try:
339             remain_wait = self.poll_interval
340             while True:
341                 if remain_wait > 0:
342                     self.stop_polling.wait(remain_wait)
343                 if self.stop_polling.is_set():
344                     break
345                 with self.workflow_eval_lock:
346                     keys = list(self.processes)
347                 if not keys:
348                     remain_wait = self.poll_interval
349                     continue
350
351                 begin_poll = time.time()
352                 if self.work_api == "containers":
353                     table = self.poll_api.container_requests()
354
355                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
356
357                 while keys:
358                     page = keys[:pageSize]
359                     try:
360                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
361                     except Exception:
362                         logger.exception("Error checking states on API server: %s")
363                         remain_wait = self.poll_interval
364                         continue
365
366                     for p in proc_states["items"]:
367                         self.on_message({
368                             "object_uuid": p["uuid"],
369                             "event_type": "update",
370                             "properties": {
371                                 "new_attributes": p
372                             }
373                         })
374                     keys = keys[pageSize:]
375
376                 finish_poll = time.time()
377                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
378         except:
379             logger.exception("Fatal error in state polling thread.")
380             with self.workflow_eval_lock:
381                 self.processes.clear()
382                 self.workflow_eval_lock.notifyAll()
383         finally:
384             self.stop_polling.set()
385
386     def add_intermediate_output(self, uuid):
387         if uuid:
388             self.intermediate_output_collections.append(uuid)
389
390     def trash_intermediate_output(self):
391         logger.info("Cleaning up intermediate output collections")
392         for i in self.intermediate_output_collections:
393             try:
394                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
395             except Exception:
396                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
397             except (KeyboardInterrupt, SystemExit):
398                 break
399
400     def check_features(self, obj, parentfield=""):
401         if isinstance(obj, dict):
402             if obj.get("class") == "DockerRequirement":
403                 if obj.get("dockerOutputDirectory"):
404                     if not obj.get("dockerOutputDirectory").startswith('/'):
405                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
406                             "Option 'dockerOutputDirectory' must be an absolute path.")
407             if obj.get("class") == "InplaceUpdateRequirement":
408                 if obj["inplaceUpdate"] and parentfield == "requirements":
409                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
410             for k,v in viewitems(obj):
411                 self.check_features(v, parentfield=k)
412         elif isinstance(obj, list):
413             for i,v in enumerate(obj):
414                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
415                     self.check_features(v, parentfield=parentfield)
416
417     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
418         outputObj = copy.deepcopy(outputObj)
419
420         files = []
421         def capture(fileobj):
422             files.append(fileobj)
423
424         adjustDirObjs(outputObj, capture)
425         adjustFileObjs(outputObj, capture)
426
427         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
428
429         final = arvados.collection.Collection(api_client=self.api,
430                                               keep_client=self.keep_client,
431                                               num_retries=self.num_retries)
432
433         for k,v in generatemapper.items():
434             if v.type == "Directory" and v.resolved.startswith("_:"):
435                     continue
436             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
437                 with final.open(v.target, "wb") as f:
438                     f.write(v.resolved.encode("utf-8"))
439                     continue
440
441             if not v.resolved.startswith("keep:"):
442                 raise Exception("Output source is not in keep or a literal")
443             sp = v.resolved.split("/")
444             srccollection = sp[0][5:]
445             try:
446                 reader = self.collection_cache.get(srccollection)
447                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
448                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
449             except arvados.errors.ArgumentError as e:
450                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
451                 raise
452             except IOError as e:
453                 logger.error("While preparing output collection: %s", e)
454                 raise
455
456         def rewrite(fileobj):
457             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
458             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
459                 if k in fileobj:
460                     del fileobj[k]
461
462         adjustDirObjs(outputObj, rewrite)
463         adjustFileObjs(outputObj, rewrite)
464
465         with final.open("cwl.output.json", "w") as f:
466             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
467             f.write(res)
468
469
470         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
471                        ensure_unique_name=True, properties=output_properties)
472
473         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
474                     final.api_response()["name"],
475                     final.manifest_locator())
476
477         final_uuid = final.manifest_locator()
478         tags = tagsString.split(',')
479         for tag in tags:
480              self.api.links().create(body={
481                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
482                 }).execute(num_retries=self.num_retries)
483
484         def finalcollection(fileobj):
485             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
486
487         adjustDirObjs(outputObj, finalcollection)
488         adjustFileObjs(outputObj, finalcollection)
489
490         return (outputObj, final)
491
492     def set_crunch_output(self):
493         if self.work_api == "containers":
494             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
495             if current is None:
496                 return
497             try:
498                 self.api.containers().update(uuid=current['uuid'],
499                                              body={
500                                                  'output': self.final_output_collection.portable_data_hash(),
501                                                  'output_properties': self.final_output_collection.get_properties(),
502                                              }).execute(num_retries=self.num_retries)
503                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
504                                               body={
505                                                   'is_trashed': True
506                                               }).execute(num_retries=self.num_retries)
507             except Exception:
508                 logger.exception("Setting container output")
509                 raise
510
511     def apply_reqs(self, job_order_object, tool):
512         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
513             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
514                 raise WorkflowException(
515                     "`cwl:requirements` in the input object is not part of CWL "
516                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
517                     "can set the cwlVersion to v1.1 or greater and re-run with "
518                     "--enable-dev.")
519             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
520             for req in job_reqs:
521                 tool.requirements.append(req)
522
523     @staticmethod
524     def get_git_info(tool):
525         in_a_git_repo = False
526         cwd = None
527         filepath = None
528
529         if tool.tool["id"].startswith("file://"):
530             # check if git is installed
531             try:
532                 filepath = uri_file_path(tool.tool["id"])
533                 cwd = os.path.dirname(filepath)
534                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
535                 in_a_git_repo = True
536             except Exception as e:
537                 pass
538
539         gitproperties = {}
540
541         if in_a_git_repo:
542             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
543             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
544             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
545             git_branch = subprocess.run(["git", "branch", "--show-current"], cwd=cwd, capture_output=True, text=True).stdout
546             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
547             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
548             git_describe = subprocess.run(["git", "describe", "--always"], cwd=cwd, capture_output=True, text=True).stdout
549             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
550             git_path = filepath[len(git_toplevel):]
551
552             gitproperties = {
553                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
554                 "http://arvados.org/cwl#gitDate": git_date.strip(),
555                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
556                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
557                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
558                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
559                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
560                 "http://arvados.org/cwl#gitPath": git_path.strip(),
561             }
562         else:
563             for g in ("http://arvados.org/cwl#gitCommit",
564                       "http://arvados.org/cwl#gitDate",
565                       "http://arvados.org/cwl#gitCommitter",
566                       "http://arvados.org/cwl#gitBranch",
567                       "http://arvados.org/cwl#gitOrigin",
568                       "http://arvados.org/cwl#gitStatus",
569                       "http://arvados.org/cwl#gitDescribe",
570                       "http://arvados.org/cwl#gitPath"):
571                 if g in tool.metadata:
572                     gitproperties[g] = tool.metadata[g]
573
574         return gitproperties
575
576     def set_container_request_properties(self, container, properties):
577         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
578         for cr in resp["items"]:
579             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
580             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
581
582     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
583         self.debug = runtimeContext.debug
584
585         git_info = self.get_git_info(updated_tool)
586         if git_info:
587             logger.info("Git provenance")
588             for g in git_info:
589                 if git_info[g]:
590                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
591
592         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
593         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
594         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
595         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
596
597         updated_tool.visit(self.check_features)
598
599         self.pipeline = None
600         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
601         self.secret_store = runtimeContext.secret_store
602
603         self.trash_intermediate = runtimeContext.trash_intermediate
604         if self.trash_intermediate and self.work_api != "containers":
605             raise Exception("--trash-intermediate is only supported with --api=containers.")
606
607         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
608         if self.intermediate_output_ttl and self.work_api != "containers":
609             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
610         if self.intermediate_output_ttl < 0:
611             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
612
613         if runtimeContext.submit_request_uuid and self.work_api != "containers":
614             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
615
616         runtimeContext = runtimeContext.copy()
617
618         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
619         if runtimeContext.storage_classes == "default":
620             runtimeContext.storage_classes = default_storage_classes
621         if runtimeContext.intermediate_storage_classes == "default":
622             runtimeContext.intermediate_storage_classes = default_storage_classes
623
624         if not runtimeContext.name:
625             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
626             if git_info.get("http://arvados.org/cwl#gitDescribe"):
627                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
628             runtimeContext.name = self.name
629
630         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
631             # When creating or updating workflow record, by default
632             # always copy dependencies and ensure Docker images are up
633             # to date.
634             runtimeContext.copy_deps = True
635             runtimeContext.match_local_docker = True
636
637         if runtimeContext.update_workflow and self.project_uuid is None:
638             # If we are updating a workflow, make sure anything that
639             # gets uploaded goes into the same parent project, unless
640             # an alternate --project-uuid was provided.
641             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
642             runtimeContext.project_uuid = existing_wf["owner_uuid"]
643
644         self.project_uuid = runtimeContext.project_uuid
645
646         # Upload local file references in the job order.
647         with Perf(metrics, "upload_job_order"):
648             job_order = upload_job_order(self, "%s input" % runtimeContext.name,
649                                          updated_tool, job_order, runtimeContext)
650
651         # the last clause means: if it is a command line tool, and we
652         # are going to wait for the result, and always_submit_runner
653         # is false, then we don't submit a runner process.
654
655         submitting = (runtimeContext.update_workflow or
656                       runtimeContext.create_workflow or
657                       (runtimeContext.submit and not
658                        (updated_tool.tool["class"] == "CommandLineTool" and
659                         runtimeContext.wait and
660                         not runtimeContext.always_submit_runner)))
661
662         loadingContext = self.loadingContext.copy()
663         loadingContext.do_validate = False
664         loadingContext.disable_js_validation = True
665         if submitting:
666             loadingContext.do_update = False
667             # Document may have been auto-updated. Reload the original
668             # document with updating disabled because we want to
669             # submit the document with its original CWL version, not
670             # the auto-updated one.
671             with Perf(metrics, "load_tool original"):
672                 tool = load_tool(updated_tool.tool["id"], loadingContext)
673         else:
674             tool = updated_tool
675
676         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
677         # Also uploads docker images.
678         logger.info("Uploading workflow dependencies")
679         with Perf(metrics, "upload_workflow_deps"):
680             merged_map = upload_workflow_deps(self, tool, runtimeContext)
681
682         # Recreate process object (ArvadosWorkflow or
683         # ArvadosCommandTool) because tool document may have been
684         # updated by upload_workflow_deps in ways that modify
685         # inheritance of hints or requirements.
686         loadingContext.loader = tool.doc_loader
687         loadingContext.avsc_names = tool.doc_schema
688         loadingContext.metadata = tool.metadata
689         with Perf(metrics, "load_tool"):
690             tool = load_tool(tool.tool, loadingContext)
691
692         if runtimeContext.update_workflow or runtimeContext.create_workflow:
693             # Create a pipeline template or workflow record and exit.
694             if self.work_api == "containers":
695                 uuid = upload_workflow(self, tool, job_order,
696                                        runtimeContext.project_uuid,
697                                        runtimeContext,
698                                        uuid=runtimeContext.update_workflow,
699                                        submit_runner_ram=runtimeContext.submit_runner_ram,
700                                        name=runtimeContext.name,
701                                        merged_map=merged_map,
702                                        submit_runner_image=runtimeContext.submit_runner_image,
703                                        git_info=git_info)
704                 self.stdout.write(uuid + "\n")
705                 return (None, "success")
706
707         self.apply_reqs(job_order, tool)
708
709         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
710         self.eval_timeout = runtimeContext.eval_timeout
711
712         runtimeContext.use_container = True
713         runtimeContext.tmpdir_prefix = "tmp"
714         runtimeContext.work_api = self.work_api
715
716         if not self.output_name:
717              self.output_name = "Output from workflow %s" % runtimeContext.name
718
719         self.output_name  = cleanup_name_for_collection(self.output_name)
720
721         if self.work_api == "containers":
722             if self.ignore_docker_for_reuse:
723                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
724             runtimeContext.outdir = "/var/spool/cwl"
725             runtimeContext.docker_outdir = "/var/spool/cwl"
726             runtimeContext.tmpdir = "/tmp"
727             runtimeContext.docker_tmpdir = "/tmp"
728
729         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
730             raise Exception("--priority must be in the range 1..1000.")
731
732         if self.should_estimate_cache_size:
733             visited = set()
734             estimated_size = [0]
735             def estimate_collection_cache(obj):
736                 if obj.get("location", "").startswith("keep:"):
737                     m = pdh_size.match(obj["location"][5:])
738                     if m and m.group(1) not in visited:
739                         visited.add(m.group(1))
740                         estimated_size[0] += int(m.group(2))
741             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
742             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
743             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
744
745         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
746
747         runnerjob = None
748         if runtimeContext.submit:
749             # Submit a runner job to run the workflow for us.
750             if self.work_api == "containers":
751                 if submitting:
752                     tool = RunnerContainer(self, updated_tool,
753                                            tool, loadingContext, runtimeContext.enable_reuse,
754                                            self.output_name,
755                                            self.output_tags,
756                                            submit_runner_ram=runtimeContext.submit_runner_ram,
757                                            name=runtimeContext.name,
758                                            on_error=runtimeContext.on_error,
759                                            submit_runner_image=runtimeContext.submit_runner_image,
760                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
761                                            merged_map=merged_map,
762                                            priority=runtimeContext.priority,
763                                            secret_store=self.secret_store,
764                                            collection_cache_size=runtimeContext.collection_cache_size,
765                                            collection_cache_is_default=self.should_estimate_cache_size,
766                                            git_info=git_info)
767                 else:
768                     runtimeContext.runnerjob = tool.tool["id"]
769
770         if runtimeContext.cwl_runner_job is not None:
771             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
772
773         jobiter = tool.job(job_order,
774                            self.output_callback,
775                            runtimeContext)
776
777         if runtimeContext.submit and not runtimeContext.wait:
778             runnerjob = next(jobiter)
779             runnerjob.run(runtimeContext)
780             self.stdout.write(runnerjob.uuid+"\n")
781             return (None, "success")
782
783         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
784         if current_container:
785             logger.info("Running inside container %s", current_container.get("uuid"))
786             self.set_container_request_properties(current_container, git_info)
787
788         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
789         self.polling_thread = threading.Thread(target=self.poll_states)
790         self.polling_thread.start()
791
792         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
793
794         try:
795             self.workflow_eval_lock.acquire()
796
797             # Holds the lock while this code runs and releases it when
798             # it is safe to do so in self.workflow_eval_lock.wait(),
799             # at which point on_message can update job state and
800             # process output callbacks.
801
802             loopperf = Perf(metrics, "jobiter")
803             loopperf.__enter__()
804             for runnable in jobiter:
805                 loopperf.__exit__()
806
807                 if self.stop_polling.is_set():
808                     break
809
810                 if self.task_queue.error is not None:
811                     raise self.task_queue.error
812
813                 if runnable:
814                     with Perf(metrics, "run"):
815                         self.start_run(runnable, runtimeContext)
816                 else:
817                     if (self.task_queue.in_flight + len(self.processes)) > 0:
818                         self.workflow_eval_lock.wait(3)
819                     else:
820                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
821                         break
822
823                 if self.stop_polling.is_set():
824                     break
825
826                 loopperf.__enter__()
827             loopperf.__exit__()
828
829             while (self.task_queue.in_flight + len(self.processes)) > 0:
830                 if self.task_queue.error is not None:
831                     raise self.task_queue.error
832                 self.workflow_eval_lock.wait(3)
833
834         except UnsupportedRequirement:
835             raise
836         except:
837             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
838                 logger.error("Interrupted, workflow will be cancelled")
839             elif isinstance(sys.exc_info()[1], WorkflowException):
840                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
841             else:
842                 logger.exception("Workflow execution failed")
843
844             if self.pipeline:
845                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
846                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
847
848             if self.work_api == "containers" and not current_container:
849                 # Not running in a crunch container, so cancel any outstanding processes.
850                 for p in self.processes:
851                     try:
852                         self.api.container_requests().update(uuid=p,
853                                                              body={"priority": "0"}
854                         ).execute(num_retries=self.num_retries)
855                     except Exception:
856                         pass
857         finally:
858             self.workflow_eval_lock.release()
859             self.task_queue.drain()
860             self.stop_polling.set()
861             self.polling_thread.join()
862             self.task_queue.join()
863
864         if self.final_status == "UnsupportedRequirement":
865             raise UnsupportedRequirement("Check log for details.")
866
867         if self.final_output is None:
868             raise WorkflowException("Workflow did not return a result.")
869
870         if runtimeContext.submit and isinstance(tool, Runner):
871             logger.info("Final output collection %s", tool.final_output)
872             if workbench2 or workbench1:
873                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
874         else:
875             if self.output_tags is None:
876                 self.output_tags = ""
877
878             storage_classes = ""
879             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
880             if storage_class_req and storage_class_req.get("finalStorageClass"):
881                 storage_classes = aslist(storage_class_req["finalStorageClass"])
882             else:
883                 storage_classes = runtimeContext.storage_classes.strip().split(",")
884
885             output_properties = {}
886             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
887             if output_properties_req:
888                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
889                 for pr in output_properties_req["outputProperties"]:
890                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
891
892             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
893                                                                                           self.output_tags, output_properties,
894                                                                                           self.final_output)
895             self.set_crunch_output()
896
897         if runtimeContext.compute_checksum:
898             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
899             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
900
901         if self.trash_intermediate and self.final_status == "success":
902             self.trash_intermediate_output()
903
904         return (self.final_output, self.final_status)