self.output_callback,
**kwargs).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
- self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"), on_error=kwargs.get("on_error"))
- else:
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
- self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"), on_error=kwargs.get("on_error"))
+ runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
+ self.output_name,
+ self.output_tags,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs["name"],
+ on_error=kwargs.get("on_error"),
+ submit_runner_image=kwargs.get("submit_runner_image"))
+ elif self.work_api == "jobs":
+ runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
+ self.output_name,
+ self.output_tags,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs["name"],
+ on_error=kwargs.get("on_error"),
+ submit_runner_image=kwargs.get("submit_runner_image"))
if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
# Create pipeline for local run
help="RAM (in MiB) required for the workflow runner job (default 1024)",
default=1024)
+ parser.add_argument("--submit-runner-image", type=str,
+ help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
+ default=None)
+
parser.add_argument("--name", type=str,
help="Name to use for workflow execution instance.",
default=None)
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if not docker_req:
- docker_req = {"dockerImageId": arvados_jobs_image(self.arvrunner)}
+ docker_req = {"dockerImageId": "arvados/jobs"}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
"cwd": "/var/spool/cwl",
"priority": 1,
"state": "Committed",
- "container_image": arvados_jobs_image(self.arvrunner),
+ "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"kind": "json",
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
else:
- runtime_constraints["docker_image"] = arvados_jobs_image(self.arvrunner)
+ runtime_constraints["docker_image"] = "arvados/jobs"
resources = self.builder.resources
if resources is not None:
"repository": "arvados",
"script_parameters": self.job_order,
"runtime_constraints": {
- "docker_image": arvados_jobs_image(self.arvrunner),
+ "docker_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
"min_ram_mb_per_node": self.submit_runner_ram
}
}
return workflowmapper
-def arvados_jobs_image(arvrunner):
- img = "arvados/jobs:"+__version__
+def arvados_jobs_image(arvrunner, img):
+ """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
+
try:
arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
except Exception as e:
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
- name=None, on_error=None):
+ name=None, on_error=None, submit_runner_image=None):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
self.output_tags = output_tags
self.name = name
self.on_error = on_error
+ self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
if submit_runner_ram:
self.submit_runner_ram = submit_runner_ram
'repository': 'arvados',
'script': 'crunchrunner',
'runtime_constraints': {
- 'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'docker_image': 'arvados/jobs',
'min_cores_per_node': 1,
'min_ram_mb_per_node': 1024,
'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
filters=[['repository', '=', 'arvados'],
['script', '=', 'crunchrunner'],
['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
- ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]]
+ ['docker_image_locator', 'in docker', 'arvados/jobs']]
)
# The test passes some fields in builder.resources
'repository': 'arvados',
'script': 'crunchrunner',
'runtime_constraints': {
- 'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'docker_image': 'arvados/jobs',
'min_cores_per_node': 3,
'min_ram_mb_per_node': 3000,
'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
filters=[['repository', '=', 'arvados'],
['script', '=', 'crunchrunner'],
['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
- ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]])
+ ['docker_image_locator', 'in docker', 'arvados/jobs']])
@mock.patch("arvados.collection.CollectionReader")
def test_done(self, reader):
'runtime_constraints': {
'min_scratch_mb_per_node': 2048,
'min_cores_per_node': 1,
- 'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'docker_image': 'arvados/jobs',
'min_ram_mb_per_node': 1024
},
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
filters=[['repository', '=', 'arvados'],
['script', '=', 'crunchrunner'],
['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
- ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]],
+ ['docker_image_locator', 'in docker', 'arvados/jobs']],
find_or_create=True)
mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
}], "items_available": 1, "offset": 0},
{"items": [{"uuid": ""}], "items_available": 1, "offset": 0})
arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
- self.assertEqual("arvados/jobs:"+arvados_cwl.__version__, arvados_cwl.runner.arvados_jobs_image(arvrunner))
+ self.assertEqual("arvados/jobs:"+arvados_cwl.__version__, arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
class TestCreateTemplate(unittest.TestCase):
existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"