]> git.arvados.org - arvados.git/blob - sdk/cwl/arvados_cwl/executor.py
22320: Fix maxBlockSize restored to wrong value after tests.
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import subprocess
15 import time
16 import urllib
17
18 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 from schema_salad.sourceline import SourceLine, cmap
21 import schema_salad.validate as validate
22 from schema_salad.ref_resolver import file_uri, uri_file_path
23
24 import arvados
25 import arvados.config
26 import arvados.util
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 import arvados_cwl.util
31 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
32 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map, print_keep_deps, ArvSecretStore
33 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from cwltool.task_queue import TaskQueue
39 from .context import ArvLoadingContext, ArvRuntimeContext
40 from ._version import __version__
41
42 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
43 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
44 from cwltool.command_line_tool import compute_checksums
45 from cwltool.load_tool import load_tool
46
47 logger = logging.getLogger('arvados.cwl-runner')
48 metrics = logging.getLogger('arvados.cwl-runner.metrics')
49
50 DEFAULT_PRIORITY = 500
51
52 class RuntimeStatusLoggingHandler(logging.Handler):
53     """
54     Intercepts logging calls and report them as runtime statuses on runner
55     containers.
56     """
57     def __init__(self, runtime_status_update_func):
58         super(RuntimeStatusLoggingHandler, self).__init__()
59         self.runtime_status_update = runtime_status_update_func
60         self.updatingRuntimeStatus = False
61
62     def emit(self, record):
63         kind = None
64         if record.levelno >= logging.ERROR:
65             kind = 'error'
66         elif record.levelno >= logging.WARNING:
67             kind = 'warning'
68         if kind == 'warning' and record.name in ("salad", "crunchstat_summary"):
69             # Don't send validation warnings to runtime status,
70             # they're noisy and unhelpful.
71             return
72         if kind is not None and self.updatingRuntimeStatus is not True:
73             self.updatingRuntimeStatus = True
74             try:
75                 log_msg = record.getMessage()
76                 if '\n' in log_msg:
77                     # If the logged message is multi-line, use its first line as status
78                     # and the rest as detail.
79                     status, detail = log_msg.split('\n', 1)
80                     self.runtime_status_update(
81                         kind,
82                         "%s: %s" % (record.name, status),
83                         detail
84                     )
85                 else:
86                     self.runtime_status_update(
87                         kind,
88                         "%s: %s" % (record.name, record.getMessage())
89                     )
90             finally:
91                 self.updatingRuntimeStatus = False
92
93
94 class ArvCwlExecutor(object):
95     """Execute a CWL tool or workflow, submit work (using containers API),
96     wait for them to complete, and report output.
97
98     """
99
100     def __init__(self, api_client,
101                  arvargs=None,
102                  keep_client=None,
103                  num_retries=4,
104                  thread_count=4,
105                  stdout=sys.stdout):
106
107         if arvargs is None:
108             arvargs = argparse.Namespace()
109             arvargs.work_api = None
110             arvargs.output_name = None
111             arvargs.output_tags = None
112             arvargs.thread_count = 1
113             arvargs.collection_cache_size = None
114             arvargs.git_info = True
115             arvargs.submit = False
116             arvargs.defer_downloads = False
117
118         self.api = api_client
119         self.processes = {}
120         self.workflow_eval_lock = threading.Condition(threading.RLock())
121         self.final_output = None
122         self.final_status = None
123         self.num_retries = num_retries
124         self.uuid = None
125         self.stop_polling = threading.Event()
126         self.poll_api = None
127         self.pipeline = None
128         self.final_output_collection = None
129         self.output_name = arvargs.output_name
130         self.output_tags = arvargs.output_tags
131         self.project_uuid = None
132         self.intermediate_output_ttl = 0
133         self.intermediate_output_collections = []
134         self.trash_intermediate = False
135         self.thread_count = arvargs.thread_count
136         self.poll_interval = 12
137         self.loadingContext = None
138         self.should_estimate_cache_size = True
139         self.fs_access = None
140         self.secret_store = None
141         self.stdout = stdout
142         self.fast_submit = False
143         self.git_info = arvargs.git_info
144         self.debug = False
145
146         if keep_client is not None:
147             self.keep_client = keep_client
148         else:
149             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
150
151         if arvargs.collection_cache_size:
152             collection_cache_size = arvargs.collection_cache_size*1024*1024
153             self.should_estimate_cache_size = False
154         else:
155             collection_cache_size = 256*1024*1024
156
157         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
158                                                 cap=collection_cache_size)
159
160         self.fetcher_constructor = partial(CollectionFetcher,
161                                            api_client=self.api,
162                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
163                                            num_retries=self.num_retries)
164
165         self.work_api = None
166         expected_api = ["containers"]
167         for api in expected_api:
168             try:
169                 methods = self.api._rootDesc.get('resources')[api]['methods']
170                 if ('httpMethod' in methods['create'] and
171                     (arvargs.work_api == api or arvargs.work_api is None)):
172                     self.work_api = api
173                     break
174             except KeyError:
175                 pass
176
177         if not self.work_api:
178             if arvargs.work_api is None:
179                 raise Exception("No supported APIs")
180             else:
181                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
182
183         if self.work_api == "jobs":
184             logger.error("""
185 *******************************
186 The 'jobs' API is no longer supported.
187 *******************************""")
188             exit(1)
189
190         self.loadingContext = ArvLoadingContext(vars(arvargs))
191         self.loadingContext.fetcher_constructor = self.fetcher_constructor
192         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
193         self.loadingContext.construct_tool_object = self.arv_make_tool
194
195         # Add a custom logging handler to the root logger for runtime status reporting
196         # if running inside a container
197         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
198             root_logger = logging.getLogger('')
199
200             # Remove existing RuntimeStatusLoggingHandlers if they exist
201             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
202             root_logger.handlers = handlers
203
204             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
205             root_logger.addHandler(handler)
206
207         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
208         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
209                                                      collection_cache=self.collection_cache)
210         self.toplevel_runtimeContext.secret_store = ArvSecretStore()
211
212         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
213
214         validate_cluster_target(self, self.toplevel_runtimeContext)
215
216
217     def arv_make_tool(self, toolpath_object, loadingContext):
218         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
219             return ArvadosCommandTool(self, toolpath_object, loadingContext)
220         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
221             return ArvadosWorkflow(self, toolpath_object, loadingContext)
222         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
223             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
224         else:
225             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
226
227     def output_callback(self, out, processStatus):
228         with self.workflow_eval_lock:
229             if processStatus == "success":
230                 logger.info("Overall process status is %s", processStatus)
231                 state = "Complete"
232             else:
233                 logger.error("Overall process status is %s", processStatus)
234                 state = "Failed"
235             if self.pipeline:
236                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
237                                                         body={"state": state}).execute(num_retries=self.num_retries)
238             self.final_status = processStatus
239             self.final_output = out
240             self.workflow_eval_lock.notify_all()
241
242
243     def start_run(self, runnable, runtimeContext):
244         self.task_queue.add(partial(runnable.run, runtimeContext),
245                             self.workflow_eval_lock, self.stop_polling)
246
247     def process_submitted(self, container):
248         with self.workflow_eval_lock:
249             self.processes[container.uuid] = container
250
251     def process_done(self, uuid, record):
252         with self.workflow_eval_lock:
253             j = self.processes[uuid]
254             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
255             self.task_queue.add(partial(j.done, record),
256                                 self.workflow_eval_lock, self.stop_polling)
257             del self.processes[uuid]
258
259     def runtime_status_update(self, kind, message, detail=None):
260         """
261         Updates the runtime_status field on the runner container.
262         Called when there's a need to report errors, warnings or just
263         activity statuses, for example in the RuntimeStatusLoggingHandler.
264         """
265
266         if kind not in ('error', 'warning', 'activity'):
267             # Ignore any other status kind
268             return
269
270         with self.workflow_eval_lock:
271             current = None
272             try:
273                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
274             except Exception as e:
275                 logger.info("Couldn't get current container: %s", e)
276             if current is None:
277                 return
278             runtime_status = current.get('runtime_status', {})
279
280             original_updatemessage = updatemessage = runtime_status.get(kind, "")
281             if kind == "activity" or not updatemessage:
282                 updatemessage = message
283
284             # Subsequent messages tacked on in detail
285             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
286             maxlines = 40
287             if updatedetail.count("\n") < maxlines:
288                 if updatedetail:
289                     updatedetail += "\n"
290                 updatedetail += message + "\n"
291
292                 if detail:
293                     updatedetail += detail + "\n"
294
295                 if updatedetail.count("\n") >= maxlines:
296                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
297
298             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
299                 # don't waste time doing an update if nothing changed
300                 # (usually because we exceeded the max lines)
301                 return
302
303             runtime_status.update({
304                 kind: updatemessage,
305                 kind+'Detail': updatedetail,
306             })
307
308             try:
309                 self.api.containers().update(uuid=current['uuid'],
310                                             body={
311                                                 'runtime_status': runtime_status,
312                                             }).execute(num_retries=self.num_retries)
313             except Exception as e:
314                 logger.info("Couldn't update runtime_status: %s", e)
315
316     def wrapped_callback(self, cb, obj, st):
317         with self.workflow_eval_lock:
318             cb(obj, st)
319             self.workflow_eval_lock.notifyAll()
320
321     def get_wrapped_callback(self, cb):
322         return partial(self.wrapped_callback, cb)
323
324     def on_message(self, event):
325         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
326             uuid = event["object_uuid"]
327             if event["properties"]["new_attributes"]["state"] == "Running":
328                 with self.workflow_eval_lock:
329                     j = self.processes[uuid]
330                     if j.running is False:
331                         j.running = True
332                         j.update_pipeline_component(event["properties"]["new_attributes"])
333                         logger.info("%s %s is Running", self.label(j), uuid)
334             elif event["properties"]["new_attributes"]["state"] == "Final":
335                 # underlying container is completed or cancelled
336                 self.process_done(uuid, event["properties"]["new_attributes"])
337             elif (event["properties"]["new_attributes"]["state"] == "Committed" and
338                   event["properties"]["new_attributes"]["priority"] == 0):
339                 # cancelled before it got a chance to run, remains in
340                 # comitted state but isn't going to run so treat it as
341                 # cancelled.
342                 self.process_done(uuid, event["properties"]["new_attributes"])
343
344
345     def label(self, obj):
346         return "[%s %s]" % (self.work_api[0:-1], obj.name)
347
348     def poll_states(self):
349         """Poll status of containers listed in the processes dict.
350
351         Runs in a separate thread.
352         """
353
354         try:
355             remain_wait = self.poll_interval
356             while True:
357                 if remain_wait > 0:
358                     self.stop_polling.wait(remain_wait)
359                 if self.stop_polling.is_set():
360                     break
361                 with self.workflow_eval_lock:
362                     keys = list(self.processes)
363                 if not keys:
364                     remain_wait = self.poll_interval
365                     continue
366
367                 begin_poll = time.time()
368                 if self.work_api == "containers":
369                     table = self.poll_api.container_requests()
370
371                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
372
373                 while keys:
374                     page = keys[:pageSize]
375                     try:
376                         proc_states = table.list(filters=[["uuid", "in", page]], select=["uuid", "container_uuid", "state", "log_uuid",
377                                                                                          "output_uuid", "modified_at", "properties",
378                                                                                          "runtime_constraints", "priority"]).execute(num_retries=self.num_retries)
379                     except Exception as e:
380                         logger.warning("Temporary error checking states on API server: %s", e)
381                         remain_wait = self.poll_interval
382                         continue
383
384                     for p in proc_states["items"]:
385                         self.on_message({
386                             "object_uuid": p["uuid"],
387                             "event_type": "update",
388                             "properties": {
389                                 "new_attributes": p
390                             }
391                         })
392                     keys = keys[pageSize:]
393
394                 finish_poll = time.time()
395                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
396         except:
397             logger.exception("Fatal error in state polling thread.")
398             with self.workflow_eval_lock:
399                 self.processes.clear()
400                 self.workflow_eval_lock.notifyAll()
401         finally:
402             self.stop_polling.set()
403
404     def add_intermediate_output(self, uuid):
405         if uuid:
406             self.intermediate_output_collections.append(uuid)
407
408     def trash_intermediate_output(self):
409         logger.info("Cleaning up intermediate output collections")
410         for i in self.intermediate_output_collections:
411             try:
412                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
413             except Exception:
414                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
415             except (KeyboardInterrupt, SystemExit):
416                 break
417
418     def check_features(self, obj, parentfield=""):
419         if isinstance(obj, dict):
420             if obj.get("class") == "DockerRequirement":
421                 if obj.get("dockerOutputDirectory"):
422                     if not obj.get("dockerOutputDirectory").startswith('/'):
423                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
424                             "Option 'dockerOutputDirectory' must be an absolute path.")
425             if obj.get("class") == "InplaceUpdateRequirement":
426                 if obj["inplaceUpdate"] and parentfield == "requirements":
427                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
428             for k,v in obj.items():
429                 self.check_features(v, parentfield=k)
430         elif isinstance(obj, list):
431             for i,v in enumerate(obj):
432                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
433                     self.check_features(v, parentfield=parentfield)
434
435     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
436         outputObj = copy.deepcopy(outputObj)
437
438         files = []
439         def captureFile(fileobj):
440             files.append(fileobj)
441
442         def captureDir(dirobj):
443             if dirobj["location"].startswith("keep:") and 'listing' in dirobj:
444                 del dirobj['listing']
445             files.append(dirobj)
446
447         adjustDirObjs(outputObj, captureDir)
448         adjustFileObjs(outputObj, captureFile)
449
450         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
451
452         final = arvados.collection.Collection(api_client=self.api,
453                                               keep_client=self.keep_client,
454                                               num_retries=self.num_retries)
455
456         for k,v in generatemapper.items():
457             if v.type == "Directory" and v.resolved.startswith("_:"):
458                     continue
459             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
460                 with final.open(v.target, "wb") as f:
461                     f.write(v.resolved.encode("utf-8"))
462                     continue
463
464             if not v.resolved.startswith("keep:"):
465                 raise Exception("Output source is not in keep or a literal")
466             sp = v.resolved.split("/")
467             srccollection = sp[0][5:]
468             try:
469                 reader = self.collection_cache.get(srccollection)
470                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
471                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
472             except arvados.errors.ArgumentError as e:
473                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
474                 raise
475             except IOError as e:
476                 logger.error("While preparing output collection: %s", e)
477                 raise
478
479         def rewrite(fileobj):
480             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
481             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
482                 if k in fileobj:
483                     del fileobj[k]
484
485         adjustDirObjs(outputObj, rewrite)
486         adjustFileObjs(outputObj, rewrite)
487
488         with final.open("cwl.output.json", "w") as f:
489             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
490             f.write(res)
491
492
493         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
494                        ensure_unique_name=True, properties=output_properties)
495
496         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
497                     final.api_response()["name"],
498                     final.manifest_locator())
499
500         final_uuid = final.manifest_locator()
501         tags = tagsString.split(',')
502         for tag in tags:
503              self.api.links().create(body={
504                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
505                 }).execute(num_retries=self.num_retries)
506
507         def finalcollection(fileobj):
508             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
509
510         adjustDirObjs(outputObj, finalcollection)
511         adjustFileObjs(outputObj, finalcollection)
512
513         return (outputObj, final)
514
515     def set_crunch_output(self):
516         if self.work_api == "containers":
517             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
518             if current is None:
519                 return
520             try:
521                 self.api.containers().update(uuid=current['uuid'],
522                                              body={
523                                                  'output': self.final_output_collection.portable_data_hash(),
524                                                  'output_properties': self.final_output_collection.get_properties(),
525                                              }).execute(num_retries=self.num_retries)
526                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
527                                               body={
528                                                   'is_trashed': True
529                                               }).execute(num_retries=self.num_retries)
530             except Exception:
531                 logger.exception("Setting container output")
532                 raise
533
534     def apply_reqs(self, job_order_object, tool):
535         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
536             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
537                 raise WorkflowException(
538                     "`cwl:requirements` in the input object is not part of CWL "
539                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
540                     "can set the cwlVersion to v1.1 or greater and re-run with "
541                     "--enable-dev.")
542             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
543             for req in job_reqs:
544                 tool.requirements.append(req)
545
546     @staticmethod
547     def get_git_info(tool):
548         in_a_git_repo = False
549         cwd = None
550         filepath = None
551
552         if tool.tool["id"].startswith("file://"):
553             # check if git is installed
554             try:
555                 filepath = uri_file_path(tool.tool["id"])
556                 cwd = os.path.dirname(filepath)
557                 subprocess.run(
558                     ["git", "log", "--format=%H", "-n1", "HEAD"],
559                     cwd=cwd,
560                     check=True,
561                     stdout=subprocess.DEVNULL,
562                 )
563                 in_a_git_repo = True
564             except Exception as e:
565                 pass
566
567         gitproperties = {}
568
569         if in_a_git_repo:
570             def git_output(cmd):
571                 return subprocess.run(
572                     cmd,
573                     cwd=cwd,
574                     stdout=subprocess.PIPE,
575                     universal_newlines=True,
576                 ).stdout.strip()
577             git_commit = git_output(["git", "log", "--format=%H", "-n1", "HEAD"])
578             git_date = git_output(["git", "log", "--format=%cD", "-n1", "HEAD"])
579             git_committer = git_output(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"])
580             git_branch = git_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
581             git_origin = git_output(["git", "remote", "get-url", "origin"])
582             git_status = git_output(["git", "status", "--untracked-files=no", "--porcelain"])
583             git_describe = git_output(["git", "describe", "--always", "--tags"])
584             git_toplevel = git_output(["git", "rev-parse", "--show-toplevel"])
585             git_path = filepath[len(git_toplevel):]
586
587             gitproperties = {
588                 "http://arvados.org/cwl#gitCommit": git_commit,
589                 "http://arvados.org/cwl#gitDate": git_date,
590                 "http://arvados.org/cwl#gitCommitter": git_committer,
591                 "http://arvados.org/cwl#gitBranch": git_branch,
592                 "http://arvados.org/cwl#gitOrigin": git_origin,
593                 "http://arvados.org/cwl#gitStatus": git_status,
594                 "http://arvados.org/cwl#gitDescribe": git_describe,
595                 "http://arvados.org/cwl#gitPath": git_path,
596             }
597         else:
598             for g in ("http://arvados.org/cwl#gitCommit",
599                       "http://arvados.org/cwl#gitDate",
600                       "http://arvados.org/cwl#gitCommitter",
601                       "http://arvados.org/cwl#gitBranch",
602                       "http://arvados.org/cwl#gitOrigin",
603                       "http://arvados.org/cwl#gitStatus",
604                       "http://arvados.org/cwl#gitDescribe",
605                       "http://arvados.org/cwl#gitPath"):
606                 if g in tool.metadata:
607                     gitproperties[g] = tool.metadata[g]
608
609         return gitproperties
610
611     def set_container_request_properties(self, container, properties):
612         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
613         for cr in resp["items"]:
614             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
615             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
616
617     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
618         self.debug = runtimeContext.debug
619
620         self.runtime_status_update("activity", "initialization")
621
622         git_info = self.get_git_info(updated_tool) if self.git_info else {}
623         if git_info:
624             logger.info("Git provenance")
625             for g in git_info:
626                 if git_info[g]:
627                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
628
629         runtimeContext.git_info = git_info
630
631         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
632         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
633         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
634         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
635
636         if not self.fast_submit:
637             updated_tool.visit(self.check_features)
638
639         self.pipeline = None
640         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
641         self.secret_store = runtimeContext.secret_store
642
643         self.trash_intermediate = runtimeContext.trash_intermediate
644         if self.trash_intermediate and self.work_api != "containers":
645             raise Exception("--trash-intermediate is only supported with --api=containers.")
646
647         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
648         if self.intermediate_output_ttl and self.work_api != "containers":
649             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
650         if self.intermediate_output_ttl < 0:
651             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
652
653         if runtimeContext.submit_request_uuid and self.work_api != "containers":
654             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
655
656         runtimeContext = runtimeContext.copy()
657
658         if not runtimeContext.name:
659             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
660             if git_info.get("http://arvados.org/cwl#gitDescribe"):
661                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
662             runtimeContext.name = self.name
663
664         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
665             # When creating or updating workflow record, by default
666             # always copy dependencies and ensure Docker images are up
667             # to date.
668             runtimeContext.copy_deps = True
669             runtimeContext.match_local_docker = True
670
671         if runtimeContext.print_keep_deps:
672             runtimeContext.copy_deps = False
673             runtimeContext.match_local_docker = False
674
675         if runtimeContext.update_workflow and self.project_uuid is None:
676             # If we are updating a workflow, make sure anything that
677             # gets uploaded goes into the same parent project, unless
678             # an alternate --project-uuid was provided.
679             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
680             runtimeContext.project_uuid = existing_wf["owner_uuid"]
681
682         self.project_uuid = runtimeContext.project_uuid
683
684         self.runtime_status_update("activity", "data transfer")
685
686         # Upload local file references in the job order.
687         with Perf(metrics, "upload_job_order"):
688             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
689                                          updated_tool, job_order, runtimeContext)
690
691         # determine if we are submitting or directly executing the workflow.
692         #
693         # the last clause means: if it is a command line tool, and we
694         # are going to wait for the result, and always_submit_runner
695         # is false, then we don't submit a runner process.
696
697         submitting = (runtimeContext.submit and not
698                        (updated_tool.tool["class"] == "CommandLineTool" and
699                         runtimeContext.wait and
700                         not runtimeContext.always_submit_runner))
701
702         loadingContext = self.loadingContext.copy()
703         loadingContext.do_validate = False
704         loadingContext.disable_js_validation = True
705         tool = updated_tool
706
707         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
708         # Also uploads docker images.
709         if not self.fast_submit:
710             logger.info("Uploading workflow dependencies")
711             with Perf(metrics, "upload_workflow_deps"):
712                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
713         else:
714             # in the fast submit case, we are running a workflow that
715             # has already been uploaded to Arvados, so we assume all
716             # the dependencies have been pinned to keep references and
717             # there is nothing to do.
718             merged_map = {}
719
720         loadingContext.loader = tool.doc_loader
721         loadingContext.avsc_names = tool.doc_schema
722         loadingContext.metadata = tool.metadata
723         loadingContext.skip_resolve_all = True
724
725         workflow_wrapper = None
726         if (submitting and not self.fast_submit) or runtimeContext.update_workflow or runtimeContext.create_workflow or runtimeContext.print_keep_deps:
727             # upload workflow and get back the workflow wrapper
728
729             workflow_wrapper = upload_workflow(self, tool, job_order,
730                                                runtimeContext.project_uuid,
731                                                runtimeContext,
732                                                uuid=runtimeContext.update_workflow,
733                                                submit_runner_ram=runtimeContext.submit_runner_ram,
734                                                name=runtimeContext.name,
735                                                merged_map=merged_map,
736                                                submit_runner_image=runtimeContext.submit_runner_image,
737                                                git_info=git_info,
738                                                set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow),
739                                                jobmapper=jobmapper)
740
741             if runtimeContext.update_workflow or runtimeContext.create_workflow:
742                 # We're registering the workflow, so create or update
743                 # the workflow record and then exit.
744                 uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
745                                             runtimeContext.project_uuid, runtimeContext.update_workflow)
746                 self.stdout.write(uuid + "\n")
747                 return (None, "success")
748
749             if runtimeContext.print_keep_deps:
750                 # Just find and print out all the collection dependencies and exit
751                 print_keep_deps(self, runtimeContext, merged_map, tool)
752                 return (None, "success")
753
754             # Did not register a workflow, we're going to submit
755             # it instead.
756             loadingContext.loader.idx.clear()
757             loadingContext.loader.idx["_:main"] = workflow_wrapper
758             workflow_wrapper["id"] = "_:main"
759
760             # Reload the minimal wrapper workflow.
761             self.fast_submit = True
762             tool = load_tool(workflow_wrapper, loadingContext)
763             loadingContext.loader.idx["_:main"] = workflow_wrapper
764
765         if not submitting:
766             # If we are going to run the workflow now (rather than
767             # submit it), we need to update the workflow document
768             # replacing file references with keep references.  If we
769             # are just going to construct a run submission, we don't
770             # need to do this.
771             update_from_merged_map(tool, merged_map)
772
773         self.apply_reqs(job_order, tool)
774
775         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
776         self.eval_timeout = runtimeContext.eval_timeout
777
778         runtimeContext.use_container = True
779         runtimeContext.tmpdir_prefix = "tmp"
780         runtimeContext.work_api = self.work_api
781
782         if not self.output_name:
783              self.output_name = "Output from workflow %s" % runtimeContext.name
784
785         self.output_name  = cleanup_name_for_collection(self.output_name)
786
787         if self.work_api == "containers":
788             if self.ignore_docker_for_reuse:
789                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
790             runtimeContext.outdir = "/var/spool/cwl"
791             runtimeContext.docker_outdir = "/var/spool/cwl"
792             runtimeContext.tmpdir = "/tmp"
793             runtimeContext.docker_tmpdir = "/tmp"
794
795         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
796             raise Exception("--priority must be in the range 1..1000.")
797
798         if self.should_estimate_cache_size:
799             visited = set()
800             estimated_size = [0]
801             def estimate_collection_cache(obj):
802                 if obj.get("location", "").startswith("keep:"):
803                     m = pdh_size.match(obj["location"][5:])
804                     if m and m.group(1) not in visited:
805                         visited.add(m.group(1))
806                         estimated_size[0] += int(m.group(2))
807             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
808             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
809             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
810
811         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
812
813         runnerjob = None
814         if runtimeContext.submit:
815             # We are submitting instead of running immediately.
816             #
817             # Create a "Runner job" that when run() is invoked,
818             # creates the container request to run the workflow.
819             if self.work_api == "containers":
820                 if submitting:
821                     loadingContext.metadata = updated_tool.metadata.copy()
822                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
823                                            self.output_name,
824                                            self.output_tags,
825                                            submit_runner_ram=runtimeContext.submit_runner_ram,
826                                            name=runtimeContext.name,
827                                            on_error=runtimeContext.on_error,
828                                            submit_runner_image=runtimeContext.submit_runner_image,
829                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
830                                            merged_map=merged_map,
831                                            priority=runtimeContext.priority,
832                                            secret_store=self.secret_store,
833                                            collection_cache_size=runtimeContext.collection_cache_size,
834                                            collection_cache_is_default=self.should_estimate_cache_size,
835                                            git_info=git_info)
836                 else:
837                     runtimeContext.runnerjob = tool.tool["id"]
838
839         if runtimeContext.cwl_runner_job is not None:
840             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
841
842         jobiter = tool.job(job_order,
843                            self.output_callback,
844                            runtimeContext)
845
846         if runtimeContext.submit and not runtimeContext.wait:
847             # User provided --no-wait so submit the container request,
848             # get the container request uuid, print it out, and exit.
849             runnerjob = next(jobiter)
850             runnerjob.run(runtimeContext)
851             self.stdout.write(runnerjob.uuid+"\n")
852             return (None, "success")
853
854         # We either running the workflow directly, or submitting it
855         # and will wait for a final result.
856
857         self.runtime_status_update("activity", "workflow execution")
858
859         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
860         if current_container:
861             logger.info("Running inside container %s", current_container.get("uuid"))
862             self.set_container_request_properties(current_container, git_info)
863
864         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
865         self.polling_thread = threading.Thread(target=self.poll_states)
866         self.polling_thread.start()
867
868         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
869
870         try:
871             self.workflow_eval_lock.acquire()
872
873             # Holds the lock while this code runs and releases it when
874             # it is safe to do so in self.workflow_eval_lock.wait(),
875             # at which point on_message can update job state and
876             # process output callbacks.
877
878             loopperf = Perf(metrics, "jobiter")
879             loopperf.__enter__()
880             for runnable in jobiter:
881                 loopperf.__exit__()
882
883                 if self.stop_polling.is_set():
884                     break
885
886                 if self.task_queue.error is not None:
887                     raise self.task_queue.error
888
889                 if runnable:
890                     with Perf(metrics, "run"):
891                         self.start_run(runnable, runtimeContext)
892                 else:
893                     if (self.task_queue.in_flight + len(self.processes)) > 0:
894                         self.workflow_eval_lock.wait(3)
895                     else:
896                         if self.final_status is None:
897                             logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
898                         break
899
900                 if self.stop_polling.is_set():
901                     break
902
903                 loopperf.__enter__()
904             loopperf.__exit__()
905
906             while (self.task_queue.in_flight + len(self.processes)) > 0:
907                 if self.task_queue.error is not None:
908                     raise self.task_queue.error
909                 self.workflow_eval_lock.wait(3)
910
911         except UnsupportedRequirement:
912             raise
913         except:
914             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
915                 logger.error("Interrupted, workflow will be cancelled", exc_info=self.debug)
916             elif isinstance(sys.exc_info()[1], WorkflowException):
917                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=self.debug)
918             else:
919                 logger.exception("Workflow execution failed")
920
921             if self.pipeline:
922                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
923                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
924
925             if self.work_api == "containers" and not current_container:
926                 # Not running in a crunch container, so cancel any outstanding processes.
927                 for p in self.processes:
928                     try:
929                         self.api.container_requests().update(uuid=p,
930                                                              body={"priority": "0"}
931                         ).execute(num_retries=self.num_retries)
932                     except Exception:
933                         pass
934         finally:
935             self.workflow_eval_lock.release()
936             self.task_queue.drain()
937             self.stop_polling.set()
938             self.polling_thread.join()
939             self.task_queue.join()
940
941         if self.final_status == "UnsupportedRequirement":
942             raise UnsupportedRequirement("Check log for details.")
943
944         if self.final_output is None:
945             raise WorkflowException("Workflow did not return a result.")
946
947         if runtimeContext.usage_report_notes:
948             logger.info("Steps with low resource utilization (possible optimization opportunities):")
949             for x in runtimeContext.usage_report_notes:
950                 logger.info("  %s", x)
951
952         if runtimeContext.submit and isinstance(tool, Runner):
953             logger.info("Final output collection %s", tool.final_output)
954             if workbench2 or workbench1:
955                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
956         else:
957             if self.output_tags is None:
958                 self.output_tags = ""
959
960             storage_classes = ""
961             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
962             if storage_class_req and storage_class_req.get("finalStorageClass"):
963                 storage_classes = aslist(storage_class_req["finalStorageClass"])
964             else:
965                 storage_classes = (
966                     runtimeContext.storage_classes
967                     or list(arvados.util.iter_storage_classes(self.api.config()))
968                 )
969
970             output_properties = {}
971             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
972             if output_properties_req:
973                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
974                 for pr in output_properties_req["outputProperties"]:
975                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
976
977             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
978                                                                                           self.output_tags, output_properties,
979                                                                                           self.final_output)
980             self.set_crunch_output()
981
982         if runtimeContext.compute_checksum:
983             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
984             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
985
986         if self.trash_intermediate and self.final_status == "success":
987             self.trash_intermediate_output()
988
989         return (self.final_output, self.final_status)
990
991 def blank_secrets(job_order_object, process):
992     secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets")
993     pass