19699: Add --prefer-cached-downloads, add tests
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import subprocess
21 import time
22 import urllib
23
24 from cwltool.errors import WorkflowException
25 import cwltool.workflow
26 from schema_salad.sourceline import SourceLine
27 import schema_salad.validate as validate
28 from schema_salad.ref_resolver import file_uri, uri_file_path
29
30 import arvados
31 import arvados.config
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
34
35 import arvados_cwl.util
36 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
37 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
38 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from cwltool.task_queue import TaskQueue
44 from .context import ArvLoadingContext, ArvRuntimeContext
45 from ._version import __version__
46
47 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
48 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
49 from cwltool.command_line_tool import compute_checksums
50 from cwltool.load_tool import load_tool
51
52 logger = logging.getLogger('arvados.cwl-runner')
53 metrics = logging.getLogger('arvados.cwl-runner.metrics')
54
55 DEFAULT_PRIORITY = 500
56
57 class RuntimeStatusLoggingHandler(logging.Handler):
58     """
59     Intercepts logging calls and report them as runtime statuses on runner
60     containers.
61     """
62     def __init__(self, runtime_status_update_func):
63         super(RuntimeStatusLoggingHandler, self).__init__()
64         self.runtime_status_update = runtime_status_update_func
65         self.updatingRuntimeStatus = False
66
67     def emit(self, record):
68         kind = None
69         if record.levelno >= logging.ERROR:
70             kind = 'error'
71         elif record.levelno >= logging.WARNING:
72             kind = 'warning'
73         if kind is not None and self.updatingRuntimeStatus is not True:
74             self.updatingRuntimeStatus = True
75             try:
76                 log_msg = record.getMessage()
77                 if '\n' in log_msg:
78                     # If the logged message is multi-line, use its first line as status
79                     # and the rest as detail.
80                     status, detail = log_msg.split('\n', 1)
81                     self.runtime_status_update(
82                         kind,
83                         "%s: %s" % (record.name, status),
84                         detail
85                     )
86                 else:
87                     self.runtime_status_update(
88                         kind,
89                         "%s: %s" % (record.name, record.getMessage())
90                     )
91             finally:
92                 self.updatingRuntimeStatus = False
93
94
95 class ArvCwlExecutor(object):
96     """Execute a CWL tool or workflow, submit work (using containers API),
97     wait for them to complete, and report output.
98
99     """
100
101     def __init__(self, api_client,
102                  arvargs=None,
103                  keep_client=None,
104                  num_retries=4,
105                  thread_count=4,
106                  stdout=sys.stdout):
107
108         if arvargs is None:
109             arvargs = argparse.Namespace()
110             arvargs.work_api = None
111             arvargs.output_name = None
112             arvargs.output_tags = None
113             arvargs.thread_count = 1
114             arvargs.collection_cache_size = None
115             arvargs.git_info = True
116             arvargs.submit = False
117             arvargs.defer_downloads = False
118
119         self.api = api_client
120         self.processes = {}
121         self.workflow_eval_lock = threading.Condition(threading.RLock())
122         self.final_output = None
123         self.final_status = None
124         self.num_retries = num_retries
125         self.uuid = None
126         self.stop_polling = threading.Event()
127         self.poll_api = None
128         self.pipeline = None
129         self.final_output_collection = None
130         self.output_name = arvargs.output_name
131         self.output_tags = arvargs.output_tags
132         self.project_uuid = None
133         self.intermediate_output_ttl = 0
134         self.intermediate_output_collections = []
135         self.trash_intermediate = False
136         self.thread_count = arvargs.thread_count
137         self.poll_interval = 12
138         self.loadingContext = None
139         self.should_estimate_cache_size = True
140         self.fs_access = None
141         self.secret_store = None
142         self.stdout = stdout
143         self.fast_submit = False
144         self.git_info = arvargs.git_info
145
146         if keep_client is not None:
147             self.keep_client = keep_client
148         else:
149             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
150
151         if arvargs.collection_cache_size:
152             collection_cache_size = arvargs.collection_cache_size*1024*1024
153             self.should_estimate_cache_size = False
154         else:
155             collection_cache_size = 256*1024*1024
156
157         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
158                                                 cap=collection_cache_size)
159
160         self.fetcher_constructor = partial(CollectionFetcher,
161                                            api_client=self.api,
162                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
163                                            num_retries=self.num_retries)
164
165         self.work_api = None
166         expected_api = ["containers"]
167         for api in expected_api:
168             try:
169                 methods = self.api._rootDesc.get('resources')[api]['methods']
170                 if ('httpMethod' in methods['create'] and
171                     (arvargs.work_api == api or arvargs.work_api is None)):
172                     self.work_api = api
173                     break
174             except KeyError:
175                 pass
176
177         if not self.work_api:
178             if arvargs.work_api is None:
179                 raise Exception("No supported APIs")
180             else:
181                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
182
183         if self.work_api == "jobs":
184             logger.error("""
185 *******************************
186 The 'jobs' API is no longer supported.
187 *******************************""")
188             exit(1)
189
190         self.loadingContext = ArvLoadingContext(vars(arvargs))
191         self.loadingContext.fetcher_constructor = self.fetcher_constructor
192         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
193         self.loadingContext.construct_tool_object = self.arv_make_tool
194
195         # Add a custom logging handler to the root logger for runtime status reporting
196         # if running inside a container
197         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
198             root_logger = logging.getLogger('')
199
200             # Remove existing RuntimeStatusLoggingHandlers if they exist
201             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
202             root_logger.handlers = handlers
203
204             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
205             root_logger.addHandler(handler)
206
207         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
208         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
209                                                      collection_cache=self.collection_cache)
210
211         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
212
213         validate_cluster_target(self, self.toplevel_runtimeContext)
214
215
216     def arv_make_tool(self, toolpath_object, loadingContext):
217         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
218             return ArvadosCommandTool(self, toolpath_object, loadingContext)
219         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
220             return ArvadosWorkflow(self, toolpath_object, loadingContext)
221         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
222             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
223         else:
224             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
225
226     def output_callback(self, out, processStatus):
227         with self.workflow_eval_lock:
228             if processStatus == "success":
229                 logger.info("Overall process status is %s", processStatus)
230                 state = "Complete"
231             else:
232                 logger.error("Overall process status is %s", processStatus)
233                 state = "Failed"
234             if self.pipeline:
235                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
236                                                         body={"state": state}).execute(num_retries=self.num_retries)
237             self.final_status = processStatus
238             self.final_output = out
239             self.workflow_eval_lock.notifyAll()
240
241
242     def start_run(self, runnable, runtimeContext):
243         self.task_queue.add(partial(runnable.run, runtimeContext),
244                             self.workflow_eval_lock, self.stop_polling)
245
246     def process_submitted(self, container):
247         with self.workflow_eval_lock:
248             self.processes[container.uuid] = container
249
250     def process_done(self, uuid, record):
251         with self.workflow_eval_lock:
252             j = self.processes[uuid]
253             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
254             self.task_queue.add(partial(j.done, record),
255                                 self.workflow_eval_lock, self.stop_polling)
256             del self.processes[uuid]
257
258     def runtime_status_update(self, kind, message, detail=None):
259         """
260         Updates the runtime_status field on the runner container.
261         Called when there's a need to report errors, warnings or just
262         activity statuses, for example in the RuntimeStatusLoggingHandler.
263         """
264
265         if kind not in ('error', 'warning'):
266             # Ignore any other status kind
267             return
268
269         with self.workflow_eval_lock:
270             current = None
271             try:
272                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
273             except Exception as e:
274                 logger.info("Couldn't get current container: %s", e)
275             if current is None:
276                 return
277             runtime_status = current.get('runtime_status', {})
278
279             original_updatemessage = updatemessage = runtime_status.get(kind, "")
280             if not updatemessage:
281                 updatemessage = message
282
283             # Subsequent messages tacked on in detail
284             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
285             maxlines = 40
286             if updatedetail.count("\n") < maxlines:
287                 if updatedetail:
288                     updatedetail += "\n"
289                 updatedetail += message + "\n"
290
291                 if detail:
292                     updatedetail += detail + "\n"
293
294                 if updatedetail.count("\n") >= maxlines:
295                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
296
297             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
298                 # don't waste time doing an update if nothing changed
299                 # (usually because we exceeded the max lines)
300                 return
301
302             runtime_status.update({
303                 kind: updatemessage,
304                 kind+'Detail': updatedetail,
305             })
306
307             try:
308                 self.api.containers().update(uuid=current['uuid'],
309                                             body={
310                                                 'runtime_status': runtime_status,
311                                             }).execute(num_retries=self.num_retries)
312             except Exception as e:
313                 logger.info("Couldn't update runtime_status: %s", e)
314
315     def wrapped_callback(self, cb, obj, st):
316         with self.workflow_eval_lock:
317             cb(obj, st)
318             self.workflow_eval_lock.notifyAll()
319
320     def get_wrapped_callback(self, cb):
321         return partial(self.wrapped_callback, cb)
322
323     def on_message(self, event):
324         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
325             uuid = event["object_uuid"]
326             if event["properties"]["new_attributes"]["state"] == "Running":
327                 with self.workflow_eval_lock:
328                     j = self.processes[uuid]
329                     if j.running is False:
330                         j.running = True
331                         j.update_pipeline_component(event["properties"]["new_attributes"])
332                         logger.info("%s %s is Running", self.label(j), uuid)
333             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
334                 self.process_done(uuid, event["properties"]["new_attributes"])
335
336     def label(self, obj):
337         return "[%s %s]" % (self.work_api[0:-1], obj.name)
338
339     def poll_states(self):
340         """Poll status of containers listed in the processes dict.
341
342         Runs in a separate thread.
343         """
344
345         try:
346             remain_wait = self.poll_interval
347             while True:
348                 if remain_wait > 0:
349                     self.stop_polling.wait(remain_wait)
350                 if self.stop_polling.is_set():
351                     break
352                 with self.workflow_eval_lock:
353                     keys = list(self.processes)
354                 if not keys:
355                     remain_wait = self.poll_interval
356                     continue
357
358                 begin_poll = time.time()
359                 if self.work_api == "containers":
360                     table = self.poll_api.container_requests()
361
362                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
363
364                 while keys:
365                     page = keys[:pageSize]
366                     try:
367                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
368                     except Exception as e:
369                         logger.exception("Error checking states on API server: %s", e)
370                         remain_wait = self.poll_interval
371                         continue
372
373                     for p in proc_states["items"]:
374                         self.on_message({
375                             "object_uuid": p["uuid"],
376                             "event_type": "update",
377                             "properties": {
378                                 "new_attributes": p
379                             }
380                         })
381                     keys = keys[pageSize:]
382
383                 finish_poll = time.time()
384                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
385         except:
386             logger.exception("Fatal error in state polling thread.")
387             with self.workflow_eval_lock:
388                 self.processes.clear()
389                 self.workflow_eval_lock.notifyAll()
390         finally:
391             self.stop_polling.set()
392
393     def add_intermediate_output(self, uuid):
394         if uuid:
395             self.intermediate_output_collections.append(uuid)
396
397     def trash_intermediate_output(self):
398         logger.info("Cleaning up intermediate output collections")
399         for i in self.intermediate_output_collections:
400             try:
401                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
402             except Exception:
403                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
404             except (KeyboardInterrupt, SystemExit):
405                 break
406
407     def check_features(self, obj, parentfield=""):
408         if isinstance(obj, dict):
409             if obj.get("class") == "DockerRequirement":
410                 if obj.get("dockerOutputDirectory"):
411                     if not obj.get("dockerOutputDirectory").startswith('/'):
412                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
413                             "Option 'dockerOutputDirectory' must be an absolute path.")
414             if obj.get("class") == "InplaceUpdateRequirement":
415                 if obj["inplaceUpdate"] and parentfield == "requirements":
416                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
417             for k,v in viewitems(obj):
418                 self.check_features(v, parentfield=k)
419         elif isinstance(obj, list):
420             for i,v in enumerate(obj):
421                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
422                     self.check_features(v, parentfield=parentfield)
423
424     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
425         outputObj = copy.deepcopy(outputObj)
426
427         files = []
428         def capture(fileobj):
429             files.append(fileobj)
430
431         adjustDirObjs(outputObj, capture)
432         adjustFileObjs(outputObj, capture)
433
434         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
435
436         final = arvados.collection.Collection(api_client=self.api,
437                                               keep_client=self.keep_client,
438                                               num_retries=self.num_retries)
439
440         for k,v in generatemapper.items():
441             if v.type == "Directory" and v.resolved.startswith("_:"):
442                     continue
443             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
444                 with final.open(v.target, "wb") as f:
445                     f.write(v.resolved.encode("utf-8"))
446                     continue
447
448             if not v.resolved.startswith("keep:"):
449                 raise Exception("Output source is not in keep or a literal")
450             sp = v.resolved.split("/")
451             srccollection = sp[0][5:]
452             try:
453                 reader = self.collection_cache.get(srccollection)
454                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
455                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
456             except arvados.errors.ArgumentError as e:
457                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
458                 raise
459             except IOError as e:
460                 logger.error("While preparing output collection: %s", e)
461                 raise
462
463         def rewrite(fileobj):
464             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
465             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
466                 if k in fileobj:
467                     del fileobj[k]
468
469         adjustDirObjs(outputObj, rewrite)
470         adjustFileObjs(outputObj, rewrite)
471
472         with final.open("cwl.output.json", "w") as f:
473             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
474             f.write(res)
475
476
477         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
478                        ensure_unique_name=True, properties=output_properties)
479
480         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
481                     final.api_response()["name"],
482                     final.manifest_locator())
483
484         final_uuid = final.manifest_locator()
485         tags = tagsString.split(',')
486         for tag in tags:
487              self.api.links().create(body={
488                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
489                 }).execute(num_retries=self.num_retries)
490
491         def finalcollection(fileobj):
492             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
493
494         adjustDirObjs(outputObj, finalcollection)
495         adjustFileObjs(outputObj, finalcollection)
496
497         return (outputObj, final)
498
499     def set_crunch_output(self):
500         if self.work_api == "containers":
501             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
502             if current is None:
503                 return
504             try:
505                 self.api.containers().update(uuid=current['uuid'],
506                                              body={
507                                                  'output': self.final_output_collection.portable_data_hash(),
508                                                  'output_properties': self.final_output_collection.get_properties(),
509                                              }).execute(num_retries=self.num_retries)
510                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
511                                               body={
512                                                   'is_trashed': True
513                                               }).execute(num_retries=self.num_retries)
514             except Exception:
515                 logger.exception("Setting container output")
516                 raise
517
518     def apply_reqs(self, job_order_object, tool):
519         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
520             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
521                 raise WorkflowException(
522                     "`cwl:requirements` in the input object is not part of CWL "
523                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
524                     "can set the cwlVersion to v1.1 or greater and re-run with "
525                     "--enable-dev.")
526             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
527             for req in job_reqs:
528                 tool.requirements.append(req)
529
530     @staticmethod
531     def get_git_info(tool):
532         in_a_git_repo = False
533         cwd = None
534         filepath = None
535
536         if tool.tool["id"].startswith("file://"):
537             # check if git is installed
538             try:
539                 filepath = uri_file_path(tool.tool["id"])
540                 cwd = os.path.dirname(filepath)
541                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
542                 in_a_git_repo = True
543             except Exception as e:
544                 pass
545
546         gitproperties = {}
547
548         if in_a_git_repo:
549             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
550             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
551             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
552             git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
553             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
554             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
555             git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
556             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
557             git_path = filepath[len(git_toplevel):]
558
559             gitproperties = {
560                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
561                 "http://arvados.org/cwl#gitDate": git_date.strip(),
562                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
563                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
564                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
565                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
566                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
567                 "http://arvados.org/cwl#gitPath": git_path.strip(),
568             }
569         else:
570             for g in ("http://arvados.org/cwl#gitCommit",
571                       "http://arvados.org/cwl#gitDate",
572                       "http://arvados.org/cwl#gitCommitter",
573                       "http://arvados.org/cwl#gitBranch",
574                       "http://arvados.org/cwl#gitOrigin",
575                       "http://arvados.org/cwl#gitStatus",
576                       "http://arvados.org/cwl#gitDescribe",
577                       "http://arvados.org/cwl#gitPath"):
578                 if g in tool.metadata:
579                     gitproperties[g] = tool.metadata[g]
580
581         return gitproperties
582
583     def set_container_request_properties(self, container, properties):
584         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
585         for cr in resp["items"]:
586             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
587             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
588
589     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
590         self.debug = runtimeContext.debug
591
592         git_info = self.get_git_info(updated_tool) if self.git_info else {}
593         if git_info:
594             logger.info("Git provenance")
595             for g in git_info:
596                 if git_info[g]:
597                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
598
599         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
600         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
601         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
602         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
603
604         if not self.fast_submit:
605             updated_tool.visit(self.check_features)
606
607         self.pipeline = None
608         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
609         self.secret_store = runtimeContext.secret_store
610
611         self.trash_intermediate = runtimeContext.trash_intermediate
612         if self.trash_intermediate and self.work_api != "containers":
613             raise Exception("--trash-intermediate is only supported with --api=containers.")
614
615         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
616         if self.intermediate_output_ttl and self.work_api != "containers":
617             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
618         if self.intermediate_output_ttl < 0:
619             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
620
621         if runtimeContext.submit_request_uuid and self.work_api != "containers":
622             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
623
624         runtimeContext = runtimeContext.copy()
625
626         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
627         if runtimeContext.storage_classes == "default":
628             runtimeContext.storage_classes = default_storage_classes
629         if runtimeContext.intermediate_storage_classes == "default":
630             runtimeContext.intermediate_storage_classes = default_storage_classes
631
632         if not runtimeContext.name:
633             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
634             if git_info.get("http://arvados.org/cwl#gitDescribe"):
635                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
636             runtimeContext.name = self.name
637
638         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
639             # When creating or updating workflow record, by default
640             # always copy dependencies and ensure Docker images are up
641             # to date.
642             runtimeContext.copy_deps = True
643             runtimeContext.match_local_docker = True
644
645         if runtimeContext.update_workflow and self.project_uuid is None:
646             # If we are updating a workflow, make sure anything that
647             # gets uploaded goes into the same parent project, unless
648             # an alternate --project-uuid was provided.
649             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
650             runtimeContext.project_uuid = existing_wf["owner_uuid"]
651
652         self.project_uuid = runtimeContext.project_uuid
653
654         # Upload local file references in the job order.
655         with Perf(metrics, "upload_job_order"):
656             job_order = upload_job_order(self, "%s input" % runtimeContext.name,
657                                          updated_tool, job_order, runtimeContext)
658
659         # the last clause means: if it is a command line tool, and we
660         # are going to wait for the result, and always_submit_runner
661         # is false, then we don't submit a runner process.
662
663         submitting = (runtimeContext.update_workflow or
664                       runtimeContext.create_workflow or
665                       (runtimeContext.submit and not
666                        (updated_tool.tool["class"] == "CommandLineTool" and
667                         runtimeContext.wait and
668                         not runtimeContext.always_submit_runner)))
669
670         loadingContext = self.loadingContext.copy()
671         loadingContext.do_validate = False
672         loadingContext.disable_js_validation = True
673         if submitting and not self.fast_submit:
674             loadingContext.do_update = False
675             # Document may have been auto-updated. Reload the original
676             # document with updating disabled because we want to
677             # submit the document with its original CWL version, not
678             # the auto-updated one.
679             with Perf(metrics, "load_tool original"):
680                 tool = load_tool(updated_tool.tool["id"], loadingContext)
681         else:
682             tool = updated_tool
683
684         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
685         # Also uploads docker images.
686         if not self.fast_submit:
687             logger.info("Uploading workflow dependencies")
688             with Perf(metrics, "upload_workflow_deps"):
689                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
690         else:
691             merged_map = {}
692
693         # Recreate process object (ArvadosWorkflow or
694         # ArvadosCommandTool) because tool document may have been
695         # updated by upload_workflow_deps in ways that modify
696         # inheritance of hints or requirements.
697         loadingContext.loader = tool.doc_loader
698         loadingContext.avsc_names = tool.doc_schema
699         loadingContext.metadata = tool.metadata
700         with Perf(metrics, "load_tool"):
701             tool = load_tool(tool.tool, loadingContext)
702
703         if runtimeContext.update_workflow or runtimeContext.create_workflow:
704             # Create a pipeline template or workflow record and exit.
705             if self.work_api == "containers":
706                 uuid = upload_workflow(self, tool, job_order,
707                                        runtimeContext.project_uuid,
708                                        runtimeContext,
709                                        uuid=runtimeContext.update_workflow,
710                                        submit_runner_ram=runtimeContext.submit_runner_ram,
711                                        name=runtimeContext.name,
712                                        merged_map=merged_map,
713                                        submit_runner_image=runtimeContext.submit_runner_image,
714                                        git_info=git_info)
715                 self.stdout.write(uuid + "\n")
716                 return (None, "success")
717
718         self.apply_reqs(job_order, tool)
719
720         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
721         self.eval_timeout = runtimeContext.eval_timeout
722
723         runtimeContext.use_container = True
724         runtimeContext.tmpdir_prefix = "tmp"
725         runtimeContext.work_api = self.work_api
726
727         if not self.output_name:
728              self.output_name = "Output from workflow %s" % runtimeContext.name
729
730         self.output_name  = cleanup_name_for_collection(self.output_name)
731
732         if self.work_api == "containers":
733             if self.ignore_docker_for_reuse:
734                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
735             runtimeContext.outdir = "/var/spool/cwl"
736             runtimeContext.docker_outdir = "/var/spool/cwl"
737             runtimeContext.tmpdir = "/tmp"
738             runtimeContext.docker_tmpdir = "/tmp"
739
740         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
741             raise Exception("--priority must be in the range 1..1000.")
742
743         if self.should_estimate_cache_size:
744             visited = set()
745             estimated_size = [0]
746             def estimate_collection_cache(obj):
747                 if obj.get("location", "").startswith("keep:"):
748                     m = pdh_size.match(obj["location"][5:])
749                     if m and m.group(1) not in visited:
750                         visited.add(m.group(1))
751                         estimated_size[0] += int(m.group(2))
752             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
753             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
754             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
755
756         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
757
758         runnerjob = None
759         if runtimeContext.submit:
760             # Submit a runner job to run the workflow for us.
761             if self.work_api == "containers":
762                 if submitting:
763                     tool = RunnerContainer(self, updated_tool,
764                                            tool, loadingContext, runtimeContext.enable_reuse,
765                                            self.output_name,
766                                            self.output_tags,
767                                            submit_runner_ram=runtimeContext.submit_runner_ram,
768                                            name=runtimeContext.name,
769                                            on_error=runtimeContext.on_error,
770                                            submit_runner_image=runtimeContext.submit_runner_image,
771                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
772                                            merged_map=merged_map,
773                                            priority=runtimeContext.priority,
774                                            secret_store=self.secret_store,
775                                            collection_cache_size=runtimeContext.collection_cache_size,
776                                            collection_cache_is_default=self.should_estimate_cache_size,
777                                            git_info=git_info)
778                 else:
779                     runtimeContext.runnerjob = tool.tool["id"]
780
781         if runtimeContext.cwl_runner_job is not None:
782             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
783
784         jobiter = tool.job(job_order,
785                            self.output_callback,
786                            runtimeContext)
787
788         if runtimeContext.submit and not runtimeContext.wait:
789             runnerjob = next(jobiter)
790             runnerjob.run(runtimeContext)
791             self.stdout.write(runnerjob.uuid+"\n")
792             return (None, "success")
793
794         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
795         if current_container:
796             logger.info("Running inside container %s", current_container.get("uuid"))
797             self.set_container_request_properties(current_container, git_info)
798
799         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
800         self.polling_thread = threading.Thread(target=self.poll_states)
801         self.polling_thread.start()
802
803         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
804
805         try:
806             self.workflow_eval_lock.acquire()
807
808             # Holds the lock while this code runs and releases it when
809             # it is safe to do so in self.workflow_eval_lock.wait(),
810             # at which point on_message can update job state and
811             # process output callbacks.
812
813             loopperf = Perf(metrics, "jobiter")
814             loopperf.__enter__()
815             for runnable in jobiter:
816                 loopperf.__exit__()
817
818                 if self.stop_polling.is_set():
819                     break
820
821                 if self.task_queue.error is not None:
822                     raise self.task_queue.error
823
824                 if runnable:
825                     with Perf(metrics, "run"):
826                         self.start_run(runnable, runtimeContext)
827                 else:
828                     if (self.task_queue.in_flight + len(self.processes)) > 0:
829                         self.workflow_eval_lock.wait(3)
830                     else:
831                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
832                         break
833
834                 if self.stop_polling.is_set():
835                     break
836
837                 loopperf.__enter__()
838             loopperf.__exit__()
839
840             while (self.task_queue.in_flight + len(self.processes)) > 0:
841                 if self.task_queue.error is not None:
842                     raise self.task_queue.error
843                 self.workflow_eval_lock.wait(3)
844
845         except UnsupportedRequirement:
846             raise
847         except:
848             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
849                 logger.error("Interrupted, workflow will be cancelled")
850             elif isinstance(sys.exc_info()[1], WorkflowException):
851                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
852             else:
853                 logger.exception("Workflow execution failed")
854
855             if self.pipeline:
856                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
857                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
858
859             if self.work_api == "containers" and not current_container:
860                 # Not running in a crunch container, so cancel any outstanding processes.
861                 for p in self.processes:
862                     try:
863                         self.api.container_requests().update(uuid=p,
864                                                              body={"priority": "0"}
865                         ).execute(num_retries=self.num_retries)
866                     except Exception:
867                         pass
868         finally:
869             self.workflow_eval_lock.release()
870             self.task_queue.drain()
871             self.stop_polling.set()
872             self.polling_thread.join()
873             self.task_queue.join()
874
875         if self.final_status == "UnsupportedRequirement":
876             raise UnsupportedRequirement("Check log for details.")
877
878         if self.final_output is None:
879             raise WorkflowException("Workflow did not return a result.")
880
881         if runtimeContext.submit and isinstance(tool, Runner):
882             logger.info("Final output collection %s", tool.final_output)
883             if workbench2 or workbench1:
884                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
885         else:
886             if self.output_tags is None:
887                 self.output_tags = ""
888
889             storage_classes = ""
890             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
891             if storage_class_req and storage_class_req.get("finalStorageClass"):
892                 storage_classes = aslist(storage_class_req["finalStorageClass"])
893             else:
894                 storage_classes = runtimeContext.storage_classes.strip().split(",")
895
896             output_properties = {}
897             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
898             if output_properties_req:
899                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
900                 for pr in output_properties_req["outputProperties"]:
901                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
902
903             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
904                                                                                           self.output_tags, output_properties,
905                                                                                           self.final_output)
906             self.set_crunch_output()
907
908         if runtimeContext.compute_checksum:
909             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
910             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
911
912         if self.trash_intermediate and self.final_status == "success":
913             self.trash_intermediate_output()
914
915         return (self.final_output, self.final_status)