projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
14510: Estimate collection cache size wip
[arvados.git]
/
sdk
/
cwl
/
arvados_cwl
/
runner.py
diff --git
a/sdk/cwl/arvados_cwl/runner.py
b/sdk/cwl/arvados_cwl/runner.py
index 9c422a42f76975d300952d004f5e2849d283c2a3..c1a98e745672b65829326afdf6c2fb6313db1bbc 100644
(file)
--- a/
sdk/cwl/arvados_cwl/runner.py
+++ b/
sdk/cwl/arvados_cwl/runner.py
@@
-7,7
+7,7
@@
import urlparse
from functools import partial
import logging
import json
from functools import partial
import logging
import json
-import subprocess
+import subprocess
32 as subprocess
from collections import namedtuple
from StringIO import StringIO
from collections import namedtuple
from StringIO import StringIO
@@
-26,7
+26,7
@@
from cwltool.pack import pack
import arvados.collection
import ruamel.yaml as yaml
import arvados.collection
import ruamel.yaml as yaml
-from .arvdocker import arv_docker_get_image
+import arvados_cwl.arvdocker
from .pathmapper import ArvPathMapper, trim_listing
from ._version import __version__
from . import done
from .pathmapper import ArvPathMapper, trim_listing
from ._version import __version__
from . import done
@@
-129,7
+129,10
@@
def upload_dependencies(arvrunner, name, document_loader,
sc = []
def only_real(obj):
sc = []
def only_real(obj):
- if obj.get("location", "").startswith("file:"):
+ # Only interested in local files than need to be uploaded,
+ # don't include file literals, keep references, etc.
+ sp = obj.get("location", "").split(":")
+ if len(sp) > 1 and sp[0] in ("file", "http", "https"):
sc.append(obj)
visit_class(sc_result, ("File", "Directory"), only_real)
sc.append(obj)
visit_class(sc_result, ("File", "Directory"), only_real)
@@
-168,8
+171,13
@@
def upload_dependencies(arvrunner, name, document_loader,
visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
- for d in discovered:
- sc.extend(discovered[d])
+ for d in list(discovered.keys()):
+ # Only interested in discovered secondaryFiles which are local
+ # files that need to be uploaded.
+ if d.startswith("file:"):
+ sc.extend(discovered[d])
+ else:
+ del discovered[d]
mapper = ArvPathMapper(arvrunner, sc, "",
"keep:%s",
mapper = ArvPathMapper(arvrunner, sc, "",
"keep:%s",
@@
-207,9
+215,9
@@
def upload_docker(arvrunner, tool):
# TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
# TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+ arv
ados_cwl.arvdocker.arv
_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
else:
else:
- arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
+ arv
ados_cwl.arvdocker.arv
_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
@@
-236,6
+244,8
@@
def packed_workflow(arvrunner, tool, merged_map):
v["location"] = merged_map[cur_id].resolved[v["location"]]
if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
v["location"] = merged_map[cur_id].resolved[v["location"]]
if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+ if v.get("class") == "DockerRequirement":
+ v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
@@
-316,10
+326,10
@@
def arvados_jobs_image(arvrunner, img):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+
return arvados_cwl.arvdocker.
arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
- return img
+
def upload_workflow_collection(arvrunner, name, packed):
collection = arvados.collection.Collection(api_client=arvrunner.api,
def upload_workflow_collection(arvrunner, name, packed):
collection = arvados.collection.Collection(api_client=arvrunner.api,
@@
-354,7
+364,7
@@
class Runner(object):
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
intermediate_output_ttl=0, merged_map=None,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
intermediate_output_ttl=0, merged_map=None,
- priority=None, secret_store=None):
+ priority=None, secret_store=None
, collection_cache_size=None
):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
@@
-379,6
+389,7
@@
class Runner(object):
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
+ self.collection_cache_size = 256
runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
if runner_resource_req:
runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
if runner_resource_req:
@@
-386,11
+397,17
@@
class Runner(object):
self.submit_runner_cores = runner_resource_req["coresMin"]
if runner_resource_req.get("ramMin"):
self.submit_runner_ram = runner_resource_req["ramMin"]
self.submit_runner_cores = runner_resource_req["coresMin"]
if runner_resource_req.get("ramMin"):
self.submit_runner_ram = runner_resource_req["ramMin"]
+ if runner_resource_req.get("keep_cache"):
+ self.collection_cache_size = runner_resource_req["keep_cache"]
if submit_runner_ram:
# Command line / initializer overrides default and/or spec from workflow
self.submit_runner_ram = submit_runner_ram
if submit_runner_ram:
# Command line / initializer overrides default and/or spec from workflow
self.submit_runner_ram = submit_runner_ram
+ if collection_cache_size:
+ # Command line / initializer overrides default and/or spec from workflow
+ self.collection_cache_size = collection_cache_size
+
if self.submit_runner_ram <= 0:
raise Exception("Value of submit-runner-ram must be greater than zero")
if self.submit_runner_ram <= 0:
raise Exception("Value of submit-runner-ram must be greater than zero")