+from future import standard_library
+standard_library.install_aliases()
+from builtins import object
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
import copy
-import cStringIO
+import io
import functools
import hashlib
import json
@mock.patch("arvados.keep.KeepClient")
@mock.patch("arvados.events.subscribe")
def wrapped(self, events, keep_client1, keep_client2, keepdocker, *args, **kwargs):
- class Stubs:
+ class Stubs(object):
pass
stubs = Stubs()
stubs.events = events
return CollectionExecute(created_collections[uuid])
def collection_getstub(created_collections, uuid):
- for v in created_collections.itervalues():
+ for v in created_collections.values():
if uuid in (v["uuid"], v["portable_data_hash"]):
return CollectionExecute(v)
return '999999999999999999999999999999d4+99'
arvdock.side_effect = get_image
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=jobs", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
@mock.patch("time.sleep")
@stubs
def test_submit_no_reuse(self, stubs, tm):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=jobs", "--debug", "--disable-reuse",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
@mock.patch("time.sleep")
@stubs
def test_submit_on_error(self, stubs, tm):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=jobs", "--debug", "--on-error=stop",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
@mock.patch("time.sleep")
@stubs
def test_submit_runner_ram(self, stubs, tm):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--submit-runner-ram=2048",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
@mock.patch("time.sleep")
@stubs
def test_submit_invalid_runner_ram(self, stubs, tm):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--submit-runner-ram=-2048",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
def test_submit_output_name(self, stubs, tm):
output_name = "test_output_name"
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--output-name", output_name,
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
@mock.patch("time.sleep")
@stubs
def test_submit_pipeline_name(self, stubs, tm):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--name=hello job 123",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
def test_submit_output_tags(self, stubs, tm):
output_tags = "tag0,tag1,tag2"
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--output-tags", output_tags,
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
@stubs
def test_submit_container(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
@stubs
def test_submit_container_no_reuse(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--disable-reuse",
@stubs
def test_submit_container_reuse_disabled_by_workflow(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
@stubs
def test_submit_container_on_error(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--on-error=stop",
def test_submit_container_output_name(self, stubs):
output_name = "test_output_name"
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--output-name", output_name,
@stubs
def test_submit_storage_classes(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--debug", "--submit", "--no-wait", "--api=containers", "--storage-classes=foo",
@stubs
def test_submit_container_output_ttl(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--intermediate-output-ttl", "3600",
@stubs
def test_submit_container_trash_intermediate(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--trash-intermediate",
def test_submit_container_output_tags(self, stubs):
output_tags = "tag0,tag1,tag2"
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--output-tags", output_tags,
@stubs
def test_submit_container_runner_ram(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--submit-runner-ram=2048",
@mock.patch("time.sleep")
@stubs
def test_submit_file_keepref(self, stubs, tm, collectionReader):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "blorp.txt")
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
@mock.patch("time.sleep")
@stubs
def test_submit_keepref(self, stubs, tm, reader):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
with open("tests/wf/expect_arvworkflow.cwl") as f:
reader().open().__enter__().read.return_value = f.read()
@mock.patch("time.sleep")
@stubs
def test_submit_jobs_keepref(self, stubs, tm, reader):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
with open("tests/wf/expect_arvworkflow.cwl") as f:
reader().open().__enter__().read.return_value = f.read()
@mock.patch("time.sleep")
@stubs
def test_submit_arvworkflow(self, stubs, tm):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
with open("tests/wf/expect_arvworkflow.cwl") as f:
stubs.api.workflows().get().execute.return_value = {"definition": f.read(), "name": "a test workflow"}
@stubs
def test_submit_container_name(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--name=hello container 123",
@stubs
def test_submit_missing_input(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
self.assertEqual(exited, 0)
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job_missing.json"],
@stubs
def test_submit_container_project(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--project-uuid="+project_uuid,
@stubs
def test_submit_container_eval_timeout(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--eval-timeout=60",
@stubs
def test_submit_container_collection_cache(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--collection-cache-size=500",
@stubs
def test_submit_container_thread_count(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--thread-count=20",
@stubs
def test_submit_job_runner_image(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=jobs", "--debug", "--submit-runner-image=arvados/jobs:123",
@stubs
def test_submit_container_runner_image(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--submit-runner-image=arvados/jobs:123",
@stubs
def test_submit_priority(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--priority=669",
@stubs
def test_submit_wf_runner_resources(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
@stubs
def test_submit_secrets(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
"state": "Queued"
}
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
try:
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--submit-request-uuid=zzzzz-xvhdp-yyyyyyyyyyyyyyy",
@stubs
def test_submit_container_cluster_id(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
stubs.api._rootDesc["remoteHosts"]["zbbbb"] = "123"
try:
exited = arvados_cwl.main(
@stubs
def test_submit_validate_cluster_id(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
stubs.api._rootDesc["remoteHosts"]["zbbbb"] = "123"
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--submit-runner-cluster=zcccc",
def test_create(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--create-workflow", "--debug",
def test_create_name(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--create-workflow", "--debug",
def test_update_name(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--update-workflow", self.existing_template_uuid,
def test_create(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--create-workflow", "--debug",
def test_create_name(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--create-workflow", "--debug",
@stubs
def test_incompatible_api(self, stubs):
- capture_stderr = cStringIO.StringIO()
+ capture_stderr = io.StringIO()
logging.getLogger('arvados.cwl-runner').addHandler(
logging.StreamHandler(capture_stderr))
@stubs
def test_update(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
@stubs
def test_update_name(self, stubs):
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
def test_create_collection_per_tool(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
- capture_stdout = cStringIO.StringIO()
+ capture_stdout = io.StringIO()
exited = arvados_cwl.main(
["--create-workflow", "--debug",
exited = arvados_cwl.main(
["--create-template",
"tests/wf/inputs_test.cwl", "tests/order/empty_order.json"],
- cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
+ io.StringIO(), sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
stubs.api.pipeline_templates().create.assert_called_with(
exited = arvados_cwl.main(
["--create-template",
"tests/wf/inputs_test.cwl", "tests/order/inputs_test_order.json"],
- cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
+ io.StringIO(), sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
expect_template = copy.deepcopy(self.expect_template)