owner_uuid: @object.uuid
}
})
- redirect_to root_url(api_token: resp[:api_token])
+ redirect_to root_url(api_token: "v2/#{resp[:uuid]}/#{resp[:api_token]}")
end
def home
assert_match /\/users\/welcome/, @response.redirect_url
end
+ test "'log in as user' feature uses a v2 token" do
+ post :sudo, {
+ id: api_fixture('users')['active']['uuid']
+ }, session_for('admin_trustedclient')
+ assert_response :redirect
+ assert_match /api_token=v2%2F/, @response.redirect_url
+ end
+
test "request shell access" do
user = api_fixture('users')['spectator']
# clean up the docker build environment
cd "$WORKSPACE"
+if [[ -z "$ARVADOS_BUILDING_VERSION" ]] && ! [[ -z "$version_tag" ]]; then
+ ARVADOS_BUILDING_VERSION="$version_tag"
+ ARVADOS_BUILDING_ITERATION="1"
+fi
+
python_sdk_ts=$(cd sdk/python && timestamp_from_git)
cwl_runner_ts=$(cd sdk/cwl && timestamp_from_git)
echo cwl_runner_version $cwl_runner_version python_sdk_version $python_sdk_version
+if [[ "${python_sdk_version}" != "${ARVADOS_BUILDING_VERSION}" ]]; then
+ python_sdk_version="${python_sdk_version}-2"
+else
+ python_sdk_version="${ARVADOS_BUILDING_VERSION}-${ARVADOS_BUILDING_ITERATION}"
+fi
+
+cwl_runner_version_orig=$cwl_runner_version
+
+if [[ "${cwl_runner_version}" != "${ARVADOS_BUILDING_VERSION}" ]]; then
+ cwl_runner_version="${cwl_runner_version}-4"
+else
+ cwl_runner_version="${ARVADOS_BUILDING_VERSION}-${ARVADOS_BUILDING_ITERATION}"
+fi
+
cd docker/jobs
docker build $NOCACHE \
- --build-arg python_sdk_version=${python_sdk_version}-2 \
- --build-arg cwl_runner_version=${cwl_runner_version}-4 \
- -t arvados/jobs:$cwl_runner_version .
+ --build-arg python_sdk_version=${python_sdk_version} \
+ --build-arg cwl_runner_version=${cwl_runner_version} \
+ -t arvados/jobs:$cwl_runner_version_orig .
ECODE=$?
FORCE=-f
fi
if ! [[ -z "$version_tag" ]]; then
- docker tag $FORCE arvados/jobs:$cwl_runner_version arvados/jobs:"$version_tag"
+ docker tag $FORCE arvados/jobs:$cwl_runner_version_orig arvados/jobs:"$version_tag"
else
- docker tag $FORCE arvados/jobs:$cwl_runner_version arvados/jobs:latest
+ docker tag $FORCE arvados/jobs:$cwl_runner_version_orig arvados/jobs:latest
fi
ECODE=$?
if ! [[ -z "$version_tag" ]]; then
docker_push arvados/jobs:"$version_tag"
else
- docker_push arvados/jobs:$cwl_runner_version
+ docker_push arvados/jobs:$cwl_runner_version_orig
docker_push arvados/jobs:latest
fi
title "upload arvados images finished (`timer`)"
TODO: extract this information based on git commit messages and generate changelogs / release notes automatically.
{% endcomment %}
+h3. v1.3.0 (2018-12-05)
+
+This release includes several database migrations, which will be executed automatically as part of the API server upgrade. On large Arvados installations, these migrations will take a while. We've seen the upgrade take 30 minutes or more on installations with a lot of collections.
+
+The @arvados-controller@ component now requires the /etc/arvados/config.yml file to be present. See <a href="{{ site.baseurl }}/install/install-controller.html#configuration">the @arvados-controller@ installation instructions</a>.
+
+Support for the deprecated "jobs" API is broken in this release. Users who rely on it should not upgrade. This will be fixed in an upcoming 1.3.1 patch release, however users are "encouraged to migrate":upgrade-crunch2.html as support for the "jobs" API will be dropped in an upcoming release. Users who are already using the "containers" API are not affected.
+
h3. v1.2.1 (2018-11-26)
There are no special upgrade notes for this release.
</code></pre>
</notextile>
-h3. Configure arvados-controller
+h3(#configuration). Configure arvados-controller
Create the cluster configuration file @/etc/arvados/config.yml@ using the following template.
# apt.arvados.org
-deb http://apt.arvados.org/ jessie main
deb http://apt.arvados.org/ jessie-dev main
"errors"
"log"
"os/exec"
+ "regexp"
"sort"
+ "strconv"
"strings"
"time"
AvailableTypes []arvados.InstanceType
}
+var pdhRegexp = regexp.MustCompile(`^[0-9a-f]{32}\+(\d+)$`)
+
+// estimateDockerImageSize estimates how much disk space will be used
+// by a Docker image, given the PDH of a collection containing a
+// Docker image that was created by "arv-keepdocker". Returns
+// estimated number of bytes of disk space that should be reserved.
+func estimateDockerImageSize(collectionPDH string) int64 {
+ m := pdhRegexp.FindStringSubmatch(collectionPDH)
+ if m == nil {
+ log.Printf("estimateDockerImageSize: '%v' did not match pdhRegexp, returning 0", collectionPDH)
+ return 0
+ }
+ n, err := strconv.ParseInt(m[1], 10, 64)
+ if err != nil || n < 122 {
+ log.Printf("estimateDockerImageSize: short manifest %v or error (%v), returning 0", n, err)
+ return 0
+ }
+ // To avoid having to fetch the collection, take advantage of
+ // the fact that the manifest storing a container image
+ // uploaded by arv-keepdocker has a predictable format, which
+ // allows us to estimate the size of the image based on just
+ // the size of the manifest.
+ //
+ // Use the following heuristic:
+ // - Start with the length of the mainfest (n)
+ // - Subtract 80 characters for the filename and file segment
+ // - Divide by 42 to get the number of block identifiers ('hash\+size\ ' is 32+1+8+1)
+ // - Assume each block is full, multiply by 64 MiB
+ return ((n - 80) / 42) * (64 * 1024 * 1024)
+}
+
+// EstimateScratchSpace estimates how much available disk space (in
+// bytes) is needed to run the container by summing the capacity
+// requested by 'tmp' mounts plus disk space required to load the
+// Docker image.
+func EstimateScratchSpace(ctr *arvados.Container) (needScratch int64) {
+ for _, m := range ctr.Mounts {
+ if m.Kind == "tmp" {
+ needScratch += m.Capacity
+ }
+ }
+
+ // Account for disk space usage by Docker, assumes the following behavior:
+ // - Layer tarballs are buffered to disk during "docker load".
+ // - Individual layer tarballs are extracted from buffered
+ // copy to the filesystem
+ dockerImageSize := estimateDockerImageSize(ctr.ContainerImage)
+
+ // The buffer is only needed during image load, so make sure
+ // the baseline scratch space at least covers dockerImageSize,
+ // and assume it will be released to the job afterwards.
+ if needScratch < dockerImageSize {
+ needScratch = dockerImageSize
+ }
+
+ // Now reserve space for the extracted image on disk.
+ needScratch += dockerImageSize
+
+ return
+}
+
// ChooseInstanceType returns the cheapest available
// arvados.InstanceType big enough to run ctr.
func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
return
}
- needScratch := int64(0)
- for _, m := range ctr.Mounts {
- if m.Kind == "tmp" {
- needScratch += m.Capacity
- }
- }
+ needScratch := EstimateScratchSpace(ctr)
needVCPUs := ctr.RuntimeConstraints.VCPUs
c.Check(best.Scratch >= 2*GiB, check.Equals, true)
c.Check(best.Preemptible, check.Equals, true)
}
+
+func (*NodeSizeSuite) TestScratchForDockerImage(c *check.C) {
+ n := EstimateScratchSpace(&arvados.Container{
+ ContainerImage: "d5025c0f29f6eef304a7358afa82a822+342",
+ })
+ // Actual image is 371.1 MiB (according to workbench)
+ // Estimated size is 384 MiB (402653184 bytes)
+ // Want to reserve 2x the estimated size, so 805306368 bytes
+ c.Check(n, check.Equals, int64(805306368))
+
+ n = EstimateScratchSpace(&arvados.Container{
+ ContainerImage: "d5025c0f29f6eef304a7358afa82a822+-342",
+ })
+ // Parse error will return 0
+ c.Check(n, check.Equals, int64(0))
+
+ n = EstimateScratchSpace(&arvados.Container{
+ ContainerImage: "d5025c0f29f6eef304a7358afa82a822+34",
+ })
+ // Short manifest will return 0
+ c.Check(n, check.Equals, int64(0))
+}
help=argparse.SUPPRESS)
parser.add_argument("--thread-count", type=int,
- default=4, help="Number of threads to use for job submit and output collection.")
+ default=1, help="Number of threads to use for job submit and output collection.")
parser.add_argument("--http-timeout", type=int,
default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
"properties": {}
}
- if self.tool.tool.get("id", "").startswith("keep:"):
- sp = self.tool.tool["id"].split('/')
+ if self.embedded_tool.tool.get("id", "").startswith("keep:"):
+ sp = self.embedded_tool.tool["id"].split('/')
workflowcollection = sp[0][5:]
workflowname = "/".join(sp[1:])
workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
"portable_data_hash": "%s" % workflowcollection
}
else:
- packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
"content": packed
}
- if self.tool.tool.get("id", "").startswith("arvwf:"):
- container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
+ if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+ container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
# --local means execute the workflow instead of submitting a container request
a pipeline template or pipeline instance.
"""
- if self.tool.tool["id"].startswith("keep:"):
- self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
+ if self.embedded_tool.tool["id"].startswith("keep:"):
+ self.job_order["cwl:tool"] = self.embedded_tool.tool["id"][5:]
else:
- packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
}
def __init__(self, runner, tool, job_order, enable_reuse, uuid,
- submit_runner_ram=0, name=None, merged_map=None):
+ submit_runner_ram=0, name=None, merged_map=None,
+ loadingContext=None):
self.runner = runner
- self.tool = tool
+ self.embedded_tool = tool
self.job = RunnerJob(
runner=runner,
tool=tool,
- job_order=job_order,
enable_reuse=enable_reuse,
output_name=None,
output_tags=None,
submit_runner_ram=submit_runner_ram,
name=name,
- merged_map=merged_map)
+ merged_map=merged_map,
+ loadingContext=loadingContext)
+ self.job.job_order = job_order
self.uuid = uuid
def pipeline_component_spec(self):
job_params = spec['script_parameters']
spec['script_parameters'] = {}
- for param in self.tool.tool['inputs']:
+ for param in self.embedded_tool.tool['inputs']:
param = copy.deepcopy(param)
# Data type and "required" flag...
#
# SPDX-License-Identifier: Apache-2.0
-from cwltool.command_line_tool import CommandLineTool
+from cwltool.command_line_tool import CommandLineTool, ExpressionTool
from cwltool.builder import Builder
from .arvjob import ArvadosJob
from .arvcontainer import ArvadosContainer
runtimeContext.tmpdir = "$(task.tmpdir)"
runtimeContext.docker_tmpdir = "$(task.tmpdir)"
return super(ArvadosCommandTool, self).job(joborder, output_callback, runtimeContext)
+
+class ArvadosExpressionTool(ExpressionTool):
+ def __init__(self, arvrunner, toolpath_object, loadingContext):
+ super(ArvadosExpressionTool, self).__init__(toolpath_object, loadingContext)
+ self.arvrunner = arvrunner
+
+ def job(self,
+ job_order, # type: Mapping[Text, Text]
+ output_callback, # type: Callable[[Any, Any], Any]
+ runtimeContext # type: RuntimeContext
+ ):
+ return super(ArvadosExpressionTool, self).job(job_order, self.arvrunner.get_wrapped_callback(output_callback), runtimeContext)
raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
if not dyn:
self.static_resource_req.append(req)
+ if req["class"] == "DockerRequirement":
+ if "http://arvados.org/cwl#dockerCollectionPDH" in req:
+ del req["http://arvados.org/cwl#dockerCollectionPDH"]
visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
arvargs.output_name = output_name
arvargs.output_tags = output_tags
arvargs.thread_count = 1
+ arvargs.collection_cache_size = None
runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache(
api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
from .arvcontainer import RunnerContainer
from .arvjob import RunnerJob, RunnerTemplate
from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
-from .arvtool import ArvadosCommandTool, validate_cluster_target
+from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
from .perf import Perf
return ArvadosCommandTool(self, toolpath_object, loadingContext)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
return ArvadosWorkflow(self, toolpath_object, loadingContext)
+ elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
+ return ArvadosExpressionTool(self, toolpath_object, loadingContext)
else:
- return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
+ raise Exception("Unknown tool %s" % toolpath_object.get("class"))
def output_callback(self, out, processStatus):
with self.workflow_eval_lock:
uuid=existing_uuid,
submit_runner_ram=runtimeContext.submit_runner_ram,
name=runtimeContext.name,
- merged_map=merged_map)
+ merged_map=merged_map,
+ loadingContext=loadingContext)
tmpl.save()
# cwltool.main will write our return value to stdout.
return (tmpl.uuid, "success")
if self.work_api == "containers":
if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
runtimeContext.runnerjob = tool.tool["id"]
- runnerjob = tool.job(job_order,
- self.output_callback,
- runtimeContext).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
+ tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
submit_runner_ram=runtimeContext.submit_runner_ram,
collection_cache_size=runtimeContext.collection_cache_size,
collection_cache_is_default=self.should_estimate_cache_size)
elif self.work_api == "jobs":
- runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
+ tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
submit_runner_ram=runtimeContext.submit_runner_ram,
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
logger.info("Pipeline instance %s", self.pipeline["uuid"])
- if runnerjob and not runtimeContext.wait:
- submitargs = runtimeContext.copy()
- submitargs.submit = False
- runnerjob.run(submitargs)
+ if runtimeContext.cwl_runner_job is not None:
+ self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+
+ jobiter = tool.job(job_order,
+ self.output_callback,
+ runtimeContext)
+
+ if runtimeContext.submit and not runtimeContext.wait:
+ runnerjob = jobiter.next()
+ runnerjob.run(runtimeContext)
return (runnerjob.uuid, "success")
current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
try:
self.workflow_eval_lock.acquire()
- if runnerjob:
- jobiter = iter((runnerjob,))
- else:
- if runtimeContext.cwl_runner_job is not None:
- self.uuid = runtimeContext.cwl_runner_job.get('uuid')
- jobiter = tool.job(job_order,
- self.output_callback,
- runtimeContext)
# Holds the lock while this code runs and releases it when
# it is safe to do so in self.workflow_eval_lock.wait(),
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
- if runnerjob and runnerjob.uuid and self.work_api == "containers":
- self.api.container_requests().update(uuid=runnerjob.uuid,
+ if runtimeContext.submit and isinstance(tool, Runner):
+ runnerjob = tool
+ if runnerjob.uuid and self.work_api == "containers":
+ self.api.container_requests().update(uuid=runnerjob.uuid,
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.workflow_eval_lock.release()
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
- if runtimeContext.submit and isinstance(runnerjob, Runner):
- logger.info("Final output collection %s", runnerjob.final_output)
+ if runtimeContext.submit and isinstance(tool, Runner):
+ logger.info("Final output collection %s", tool.final_output)
else:
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
+from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
return collection.portable_data_hash()
-class Runner(object):
+class Runner(Process):
"""Base class for runner processes, which submit an instance of
arvados-cwl-runner and wait for the final result."""
- def __init__(self, runner, tool, job_order, enable_reuse,
+ def __init__(self, runner, tool, loadingContext, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
intermediate_output_ttl=0, merged_map=None,
priority=None, secret_store=None,
collection_cache_size=256,
collection_cache_is_default=True):
+
+ super(Runner, self).__init__(tool.tool, loadingContext)
+
self.arvrunner = runner
- self.tool = tool
- self.job_order = job_order
+ self.embedded_tool = tool
+ self.job_order = None
self.running = False
if enable_reuse:
# If reuse is permitted by command line arguments but
# disabled by the workflow itself, disable it.
- reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
+ reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
self.enable_reuse = enable_reuse
self.submit_runner_ram = 1024 # defaut 1 GiB
self.collection_cache_size = collection_cache_size
- runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+ runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
if runner_resource_req:
if runner_resource_req.get("coresMin"):
self.submit_runner_cores = runner_resource_req["coresMin"]
self.merged_map = merged_map or {}
+ def job(self,
+ job_order, # type: Mapping[Text, Text]
+ output_callbacks, # type: Callable[[Any, Any], Any]
+ runtimeContext # type: RuntimeContext
+ ): # type: (...) -> Generator[Any, None, None]
+ self.job_order = job_order
+ self._init_job(job_order, runtimeContext)
+ yield self
+
def update_pipeline_component(self, record):
pass
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20181116032456',
- 'schema-salad==2.7.20181116024232',
+ 'cwltool==1.0.20181201184214',
+ 'schema-salad==3.0.20181129082112',
'typing >= 3.6.4',
'ruamel.yaml >=0.15.54, <=0.15.77',
'arvados-python-client>=1.2.1.20181130020805',
--- /dev/null
+{
+ "x": {
+ "class": "File",
+ "path": "input/blorp.txt"
+ },
+ "y": {
+ "class": "Directory",
+ "location": "keep:99999999999999999999999999999998+99",
+ "listing": [{
+ "class": "File",
+ "location": "keep:99999999999999999999999999999998+99/file1.txt"
+ }]
+ }
+}
'state': 'Committed',
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
expect_container["command"] = [
'arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--disable-reuse', "--collection-cache-size=256",
'--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["command"] = [
'arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--disable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=256",
'--debug', '--on-error=stop',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=256",
"--output-name="+output_name, '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=256", "--debug",
"--storage-classes=foo", '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=256", '--debug',
'--on-error=continue',
"--intermediate-output-ttl=3600",
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=256",
'--debug', '--on-error=continue',
"--trash-intermediate",
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=256",
"--output-tags="+output_tags, '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
'container_image': '999999999999999999999999999999d3+99',
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
'/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'container_image': "999999999999999999999999999999d3+99",
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_missing_input(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+
+ capture_stdout = cStringIO.StringIO()
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job_missing.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 1)
+
+
@stubs
def test_submit_container_project(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
expect_container["owner_uuid"] = project_uuid
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- "--eval-timeout=20", "--thread-count=4",
+ "--eval-timeout=20", "--thread-count=1",
'--enable-reuse', "--collection-cache-size=256", '--debug',
'--on-error=continue',
'--project-uuid='+project_uuid,
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=60.0', '--thread-count=4',
+ '--eval-timeout=60.0', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=256",
'--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=500",
'--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
}
expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
- '--eval-timeout=20', '--thread-count=4',
+ '--eval-timeout=20', '--thread-count=1',
'--enable-reuse', "--collection-cache-size=512", '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
"--no-log-timestamps",
"--disable-validate",
"--eval-timeout=20",
- '--thread-count=4',
+ '--thread-count=1',
"--enable-reuse",
"--collection-cache-size=256",
'--debug',
--- /dev/null
+package arvados
+
+import (
+ "context"
+ "sync"
+)
+
+// A contextGroup is a context-aware variation on sync.WaitGroup. It
+// provides a child context for the added funcs to use, so they can
+// exit early if another added func returns an error. Its Wait()
+// method returns the first error returned by any added func.
+//
+// Example:
+//
+// err := errors.New("oops")
+// cg := newContextGroup()
+// defer cg.Cancel()
+// cg.Go(func() error {
+// someFuncWithContext(cg.Context())
+// return nil
+// })
+// cg.Go(func() error {
+// return err // this cancels cg.Context()
+// })
+// return cg.Wait() // returns err after both goroutines have ended
+type contextGroup struct {
+ ctx context.Context
+ cancel context.CancelFunc
+ wg sync.WaitGroup
+ err error
+ mtx sync.Mutex
+}
+
+// newContextGroup returns a new contextGroup. The caller must
+// eventually call the Cancel() method of the returned contextGroup.
+func newContextGroup(ctx context.Context) *contextGroup {
+ ctx, cancel := context.WithCancel(ctx)
+ return &contextGroup{
+ ctx: ctx,
+ cancel: cancel,
+ }
+}
+
+// Cancel cancels the context group.
+func (cg *contextGroup) Cancel() {
+ cg.cancel()
+}
+
+// Context returns a context.Context which will be canceled when all
+// funcs have succeeded or one has failed.
+func (cg *contextGroup) Context() context.Context {
+ return cg.ctx
+}
+
+// Go calls f in a new goroutine. If f returns an error, the
+// contextGroup is canceled.
+//
+// If f notices cg.Context() is done, it should abandon further work
+// and return. In this case, f's return value will be ignored.
+func (cg *contextGroup) Go(f func() error) {
+ cg.mtx.Lock()
+ defer cg.mtx.Unlock()
+ if cg.err != nil {
+ return
+ }
+ cg.wg.Add(1)
+ go func() {
+ defer cg.wg.Done()
+ err := f()
+ cg.mtx.Lock()
+ defer cg.mtx.Unlock()
+ if err != nil && cg.err == nil {
+ cg.err = err
+ cg.cancel()
+ }
+ }()
+}
+
+// Wait waits for all added funcs to return, and returns the first
+// non-nil error.
+//
+// If the parent context is canceled before a func returns an error,
+// Wait returns the parent context's Err().
+//
+// Wait returns nil if all funcs return nil before the parent context
+// is canceled.
+func (cg *contextGroup) Wait() error {
+ cg.wg.Wait()
+ cg.mtx.Lock()
+ defer cg.mtx.Unlock()
+ if cg.err != nil {
+ return cg.err
+ }
+ return cg.ctx.Err()
+}
package arvados
import (
+ "context"
"encoding/json"
"fmt"
"io"
"time"
)
-var maxBlockSize = 1 << 26
+var (
+ maxBlockSize = 1 << 26
+ concurrentWriters = 4 // max goroutines writing to Keep during sync()
+ writeAheadBlocks = 1 // max background jobs flushing to Keep before blocking writes
+)
// A CollectionFileSystem is a FileSystem that can be serialized as a
// manifest and stored as a collection.
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
+ return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
}
func (fs *collectionFileSystem) Size() int64 {
memsize int64 // bytes in memSegments
sync.RWMutex
nullnode
+ throttle *throttle
}
// caller must have lock
// Write some data out to disk to reduce memory use. Caller must have
// write lock.
func (fn *filenode) pruneMemSegments() {
- // TODO: async (don't hold Lock() while waiting for Keep)
// TODO: share code with (*dirnode)sync()
// TODO: pack/flush small blocks too, when fragmented
+ if fn.throttle == nil {
+ // TODO: share a throttle with filesystem
+ fn.throttle = newThrottle(writeAheadBlocks)
+ }
for idx, seg := range fn.segments {
seg, ok := seg.(*memSegment)
- if !ok || seg.Len() < maxBlockSize {
- continue
- }
- locator, _, err := fn.FS().PutB(seg.buf)
- if err != nil {
- // TODO: stall (or return errors from)
- // subsequent writes until flushing
- // starts to succeed
+ if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
continue
}
- fn.memsize -= int64(seg.Len())
- fn.segments[idx] = storedSegment{
- kc: fn.FS(),
- locator: locator,
- size: seg.Len(),
- offset: 0,
- length: seg.Len(),
+ // Setting seg.flushing guarantees seg.buf will not be
+ // modified in place: WriteAt and Truncate will
+ // allocate a new buf instead, if necessary.
+ idx, buf := idx, seg.buf
+ done := make(chan struct{})
+ seg.flushing = done
+ // If lots of background writes are already in
+ // progress, block here until one finishes, rather
+ // than pile up an unlimited number of buffered writes
+ // and network flush operations.
+ fn.throttle.Acquire()
+ go func() {
+ defer close(done)
+ locator, _, err := fn.FS().PutB(buf)
+ fn.throttle.Release()
+ fn.Lock()
+ defer fn.Unlock()
+ if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
+ // A new seg.buf has been allocated.
+ return
+ }
+ seg.flushing = nil
+ if err != nil {
+ // TODO: stall (or return errors from)
+ // subsequent writes until flushing
+ // starts to succeed.
+ return
+ }
+ if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
+ // Segment has been dropped/moved/resized.
+ return
+ }
+ fn.memsize -= int64(len(buf))
+ fn.segments[idx] = storedSegment{
+ kc: fn.FS(),
+ locator: locator,
+ size: len(buf),
+ offset: 0,
+ length: len(buf),
+ }
+ }()
+ }
+}
+
+// Block until all pending pruneMemSegments work is finished. Caller
+// must NOT have lock.
+func (fn *filenode) waitPrune() {
+ var pending []<-chan struct{}
+ fn.Lock()
+ for _, seg := range fn.segments {
+ if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
+ pending = append(pending, seg.flushing)
}
}
+ fn.Unlock()
+ for _, p := range pending {
+ <-p
+ }
}
type dirnode struct {
return dn.treenode.Child(name, replace)
}
+type fnSegmentRef struct {
+ fn *filenode
+ idx int
+}
+
+// commitBlock concatenates the data from the given filenode segments
+// (which must be *memSegments), writes the data out to Keep as a
+// single block, and replaces the filenodes' *memSegments with
+// storedSegments that reference the relevant portions of the new
+// block.
+//
+// Caller must have write lock.
+func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+ if len(refs) == 0 {
+ return nil
+ }
+ throttle.Acquire()
+ defer throttle.Release()
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ block := make([]byte, 0, maxBlockSize)
+ for _, ref := range refs {
+ block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
+ }
+ locator, _, err := dn.fs.PutB(block)
+ if err != nil {
+ return err
+ }
+ off := 0
+ for _, ref := range refs {
+ data := ref.fn.segments[ref.idx].(*memSegment).buf
+ ref.fn.segments[ref.idx] = storedSegment{
+ kc: dn.fs,
+ locator: locator,
+ size: len(block),
+ offset: off,
+ length: len(data),
+ }
+ off += len(data)
+ ref.fn.memsize -= int64(len(data))
+ }
+ return nil
+}
+
// sync flushes in-memory data and remote block references (for the
// children with the given names, which must be children of dn) to
// local persistent storage. Caller must have write lock on dn and the
// named children.
-func (dn *dirnode) sync(names []string) error {
- type shortBlock struct {
- fn *filenode
- idx int
- }
- var pending []shortBlock
- var pendingLen int
+func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
+ cg := newContextGroup(ctx)
+ defer cg.Cancel()
- flush := func(sbs []shortBlock) error {
- if len(sbs) == 0 {
- return nil
- }
- block := make([]byte, 0, maxBlockSize)
- for _, sb := range sbs {
- block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
- }
- locator, _, err := dn.fs.PutB(block)
- if err != nil {
- return err
- }
- off := 0
- for _, sb := range sbs {
- data := sb.fn.segments[sb.idx].(*memSegment).buf
- sb.fn.segments[sb.idx] = storedSegment{
- kc: dn.fs,
- locator: locator,
- size: len(block),
- offset: off,
- length: len(data),
- }
- off += len(data)
- sb.fn.memsize -= int64(len(data))
- }
- return nil
+ goCommit := func(refs []fnSegmentRef) {
+ cg.Go(func() error {
+ return dn.commitBlock(cg.Context(), throttle, refs)
+ })
}
+ var pending []fnSegmentRef
+ var pendingLen int = 0
localLocator := map[string]string{}
for _, name := range names {
fn, ok := dn.inodes[name].(*filenode)
fn.segments[idx] = seg
case *memSegment:
if seg.Len() > maxBlockSize/2 {
- if err := flush([]shortBlock{{fn, idx}}); err != nil {
- return err
- }
+ goCommit([]fnSegmentRef{{fn, idx}})
continue
}
if pendingLen+seg.Len() > maxBlockSize {
- if err := flush(pending); err != nil {
- return err
- }
+ goCommit(pending)
pending = nil
pendingLen = 0
}
- pending = append(pending, shortBlock{fn, idx})
+ pending = append(pending, fnSegmentRef{fn, idx})
pendingLen += seg.Len()
default:
panic(fmt.Sprintf("can't sync segment type %T", seg))
}
}
}
- return flush(pending)
+ goCommit(pending)
+ return cg.Wait()
}
// caller must have write lock.
-func (dn *dirnode) marshalManifest(prefix string) (string, error) {
- var streamLen int64
- type filepart struct {
- name string
- offset int64
- length int64
- }
- var fileparts []filepart
- var subdirs string
- var blocks []string
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+ cg := newContextGroup(ctx)
+ defer cg.Cancel()
if len(dn.inodes) == 0 {
if prefix == "." {
names = append(names, name)
}
sort.Strings(names)
+
+ // Wait for children to finish any pending write operations
+ // before locking them.
for _, name := range names {
node := dn.inodes[name]
- node.Lock()
- defer node.Unlock()
- }
- if err := dn.sync(names); err != nil {
- return "", err
+ if fn, ok := node.(*filenode); ok {
+ fn.waitPrune()
+ }
}
+
+ var dirnames []string
+ var filenames []string
for _, name := range names {
- switch node := dn.inodes[name].(type) {
+ node := dn.inodes[name]
+ node.Lock()
+ defer node.Unlock()
+ switch node := node.(type) {
case *dirnode:
- subdir, err := node.marshalManifest(prefix + "/" + name)
- if err != nil {
- return "", err
- }
- subdirs = subdirs + subdir
+ dirnames = append(dirnames, name)
case *filenode:
+ filenames = append(filenames, name)
+ default:
+ panic(fmt.Sprintf("can't marshal inode type %T", node))
+ }
+ }
+
+ subdirs := make([]string, len(dirnames))
+ rootdir := ""
+ for i, name := range dirnames {
+ i, name := i, name
+ cg.Go(func() error {
+ txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
+ subdirs[i] = txt
+ return err
+ })
+ }
+
+ cg.Go(func() error {
+ var streamLen int64
+ type filepart struct {
+ name string
+ offset int64
+ length int64
+ }
+
+ var fileparts []filepart
+ var blocks []string
+ if err := dn.sync(cg.Context(), throttle, names); err != nil {
+ return err
+ }
+ for _, name := range filenames {
+ node := dn.inodes[name].(*filenode)
if len(node.segments) == 0 {
fileparts = append(fileparts, filepart{name: name})
- break
+ continue
}
for _, seg := range node.segments {
switch seg := seg.(type) {
panic(fmt.Sprintf("can't marshal segment type %T", seg))
}
}
- default:
- panic(fmt.Sprintf("can't marshal inode type %T", node))
}
- }
- var filetokens []string
- for _, s := range fileparts {
- filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
- }
- if len(filetokens) == 0 {
- return subdirs, nil
- } else if len(blocks) == 0 {
- blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
- }
- return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+ var filetokens []string
+ for _, s := range fileparts {
+ filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+ }
+ if len(filetokens) == 0 {
+ return nil
+ } else if len(blocks) == 0 {
+ blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+ }
+ rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
+ return nil
+ })
+ err := cg.Wait()
+ return rootdir + strings.Join(subdirs, ""), err
}
func (dn *dirnode) loadManifest(txt string) error {
type memSegment struct {
buf []byte
+ // If flushing is not nil, then a) buf is being shared by a
+ // pruneMemSegments goroutine, and must be copied on write;
+ // and b) the flushing channel will close when the goroutine
+ // finishes, whether it succeeds or not.
+ flushing <-chan struct{}
}
func (me *memSegment) Len() int {
}
func (me *memSegment) Truncate(n int) {
- if n > cap(me.buf) {
+ if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
newsize := 1024
for newsize < n {
newsize = newsize << 2
}
newbuf := make([]byte, n, newsize)
copy(newbuf, me.buf)
- me.buf = newbuf
+ me.buf, me.flushing = newbuf, nil
} else {
- // Zero unused part when shrinking, in case we grow
- // and start using it again later.
- for i := n; i < len(me.buf); i++ {
+ // reclaim existing capacity, and zero reclaimed part
+ oldlen := len(me.buf)
+ me.buf = me.buf[:n]
+ for i := oldlen; i < n; i++ {
me.buf[i] = 0
}
}
- me.buf = me.buf[:n]
}
func (me *memSegment) WriteAt(p []byte, off int) {
if off+len(p) > len(me.buf) {
panic("overflowed segment")
}
+ if me.flushing != nil {
+ me.buf, me.flushing = append([]byte(nil), me.buf...), nil
+ }
copy(me.buf[off:], p)
}
"runtime"
"strings"
"sync"
+ "sync/atomic"
"testing"
"time"
type keepClientStub struct {
blocks map[string][]byte
refreshable map[string]bool
+ onPut func(bufcopy []byte) // called from PutB, before acquiring lock
sync.RWMutex
}
locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
buf := make([]byte, len(p))
copy(buf, p)
+ if kcs.onPut != nil {
+ kcs.onPut(buf)
+ }
kcs.Lock()
defer kcs.Unlock()
kcs.blocks[locator[:32]] = buf
const ngoroutines = 256
var wg sync.WaitGroup
- for n := 0; n < nfiles; n++ {
+ for n := 0; n < ngoroutines; n++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
c.Assert(err, check.IsNil)
defer f.Close()
- for i := 0; i < ngoroutines; i++ {
+ for i := 0; i < nfiles; i++ {
trunc := rand.Intn(65)
woff := rand.Intn(trunc + 1)
wbytes = wbytes[:rand.Intn(64-woff+1)]
c.Check(string(buf), check.Equals, string(expect))
c.Check(err, check.IsNil)
}
- s.checkMemSize(c, f)
}(n)
}
wg.Wait()
+ for n := 0; n < ngoroutines; n++ {
+ f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ f.(*filehandle).inode.(*filenode).waitPrune()
+ s.checkMemSize(c, f)
+ defer f.Close()
+ }
+
root, err := s.fs.Open("/")
c.Assert(err, check.IsNil)
defer root.Close()
}
func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+ defer func(wab, mbs int) {
+ writeAheadBlocks = wab
+ maxBlockSize = mbs
+ }(writeAheadBlocks, maxBlockSize)
+ writeAheadBlocks = 2
maxBlockSize = 1024
- defer func() { maxBlockSize = 2 << 26 }()
+
+ proceed := make(chan struct{})
+ var started, concurrent int32
+ blk2done := false
+ s.kc.onPut = func([]byte) {
+ atomic.AddInt32(&concurrent, 1)
+ switch atomic.AddInt32(&started, 1) {
+ case 1:
+ // Wait until block 2 starts and finishes, and block 3 starts
+ select {
+ case <-proceed:
+ c.Check(blk2done, check.Equals, true)
+ case <-time.After(time.Second):
+ c.Error("timed out")
+ }
+ case 2:
+ time.Sleep(time.Millisecond)
+ blk2done = true
+ case 3:
+ close(proceed)
+ default:
+ time.Sleep(time.Millisecond)
+ }
+ c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+ }
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
}
return
}
+ f.(*filehandle).inode.(*filenode).waitPrune()
c.Check(currentMemExtents(), check.HasLen, 1)
m, err := fs.MarshalManifest(".")
--- /dev/null
+package arvados
+
+type throttle struct {
+ c chan struct{}
+}
+
+func newThrottle(n int) *throttle {
+ return &throttle{c: make(chan struct{}, n)}
+}
+
+func (t *throttle) Acquire() {
+ t.c <- struct{}{}
+}
+
+func (t *throttle) Release() {
+ <-t.c
+}
class ArvadosFileReaderBase(_FileLikeObjectBase):
def __init__(self, name, mode, num_retries=None):
super(ArvadosFileReaderBase, self).__init__(name, mode)
- self._binary = 'b' in mode
- if sys.version_info >= (3, 0) and not self._binary:
- raise NotImplementedError("text mode {!r} is not implemented".format(mode))
self._filepos = 0
self.num_retries = num_retries
self._readline_cache = (None, None)
def stream_name(self):
return self.arvadosfile.parent.stream_name()
+ def readinto(self, b):
+ data = self.read(len(b))
+ b[:len(data)] = data
+ return len(data)
+
@_FileLikeObjectBase._before_close
@retry_method
def read(self, size=None, num_retries=None):
if not self.closed:
self.arvadosfile.remove_writer(self, flush)
super(ArvadosFileWriter, self).close()
+
+
+class WrappableFile(object):
+ """An interface to an Arvados file that's compatible with io wrappers.
+
+ """
+ def __init__(self, f):
+ self.f = f
+ self.closed = False
+ def close(self):
+ self.closed = True
+ return self.f.close()
+ def flush(self):
+ return self.f.flush()
+ def read(self, *args, **kwargs):
+ return self.f.read(*args, **kwargs)
+ def readable(self):
+ return self.f.readable()
+ def readinto(self, *args, **kwargs):
+ return self.f.readinto(*args, **kwargs)
+ def seek(self, *args, **kwargs):
+ return self.f.seek(*args, **kwargs)
+ def seekable(self):
+ return self.f.seekable()
+ def tell(self):
+ return self.f.tell()
+ def writable(self):
+ return self.f.writable()
+ def write(self, *args, **kwargs):
+ return self.f.write(*args, **kwargs)
from builtins import str
from past.builtins import basestring
from builtins import object
+import ciso8601
+import datetime
+import errno
import functools
+import hashlib
+import io
import logging
import os
import re
-import errno
-import hashlib
-import datetime
-import ciso8601
-import time
+import sys
import threading
+import time
from collections import deque
from stat import *
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
from .keep import KeepLocator, KeepClient
from .stream import StreamReader
from ._normalize_stream import normalize_stream
_logger = logging.getLogger('arvados.collection')
+
+if sys.version_info >= (3, 0):
+ TextIOWrapper = io.TextIOWrapper
+else:
+ class TextIOWrapper(io.TextIOWrapper):
+ """To maintain backward compatibility, cast str to unicode in
+ write('foo').
+
+ """
+ def write(self, data):
+ if isinstance(data, basestring):
+ data = unicode(data)
+ return super(TextIOWrapper, self).write(data)
+
+
class CollectionBase(object):
"""Abstract base class for Collection classes."""
return self.find_or_create(path, COLLECTION)
- def open(self, path, mode="r"):
+ def open(self, path, mode="r", encoding=None):
"""Open a file-like object for access.
:path:
opens for reading and writing. All writes are appended to
the end of the file. Writing does not affect the file pointer for
reading.
+
"""
if not re.search(r'^[rwa][bt]?\+?$', mode):
if mode[0] == 'w':
arvfile.truncate(0)
- return fclass(arvfile, mode=mode, num_retries=self.num_retries)
+ binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+ f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+ if 'b' not in mode:
+ bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+ f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+ return f
def modified(self):
"""Determine if the collection has been modified since last commited."""
return prefix+fn
def write_file(collection, pathprefix, fn, flush=False):
- with open(os.path.join(pathprefix, fn)) as src:
- dst = collection.open(fn, "w")
+ with open(os.path.join(pathprefix, fn), "rb") as src:
+ dst = collection.open(fn, "wb")
r = src.read(1024*128)
while r:
dst.write(r)
keep = ArvadosFileWriterTestCase.MockKeep({
"781e5e245d69b566979b86e28d23f2c7+10": b"0123456789",
})
- c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', keep_client=keep)
- writer = c.open("count.txt", "ab+")
- self.assertEqual(writer.read(20), b"0123456789")
-
- writer.seek(0, os.SEEK_SET)
- writer.write("hello")
- self.assertEqual(writer.read(), b"")
- writer.seek(-5, os.SEEK_CUR)
- self.assertEqual(writer.read(3), b"hel")
- self.assertEqual(writer.read(), b"lo")
- writer.seek(0, os.SEEK_SET)
- self.assertEqual(writer.read(), b"0123456789hello")
-
- writer.seek(0)
- writer.write("world")
- self.assertEqual(writer.read(), b"")
- writer.seek(0)
- self.assertEqual(writer.read(), b"0123456789helloworld")
-
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 fc5e038d38a57032085441e7fe7010b0+10 0:20:count.txt\n", c.portable_manifest_text())
+ for (mode, convert) in (
+ ('a+', lambda data: data.decode(encoding='utf-8')),
+ ('at+', lambda data: data.decode(encoding='utf-8')),
+ ('ab+', lambda data: data)):
+ c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', keep_client=keep)
+ writer = c.open("count.txt", mode)
+ self.assertEqual(writer.read(20), convert(b"0123456789"))
+
+ writer.seek(0, os.SEEK_SET)
+ writer.write(convert(b"hello"))
+ self.assertEqual(writer.read(), convert(b""))
+ if 'b' in mode:
+ writer.seek(-5, os.SEEK_CUR)
+ self.assertEqual(writer.read(3), convert(b"hel"))
+ self.assertEqual(writer.read(), convert(b"lo"))
+ else:
+ with self.assertRaises(IOError):
+ writer.seek(-5, os.SEEK_CUR)
+ with self.assertRaises(IOError):
+ writer.seek(-3, os.SEEK_END)
+ writer.seek(0, os.SEEK_SET)
+ writer.read(7)
+ self.assertEqual(7, writer.tell())
+ self.assertEqual(7, writer.seek(7, os.SEEK_SET))
+
+ writer.seek(0, os.SEEK_SET)
+ self.assertEqual(writer.read(), convert(b"0123456789hello"))
+
+ writer.seek(0)
+ writer.write(convert(b"world"))
+ self.assertEqual(writer.read(), convert(b""))
+ writer.seek(0)
+ self.assertEqual(writer.read(), convert(b"0123456789helloworld"))
+
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 fc5e038d38a57032085441e7fe7010b0+10 0:20:count.txt\n", c.portable_manifest_text())
def test_write_at_beginning(self):
keep = ArvadosFileWriterTestCase.MockKeep({
with c.open('foo', 'wb') as f:
f.write('foo')
for mode in ['r', 'rt', 'r+', 'rt+', 'w', 'wt', 'a', 'at']:
- if sys.version_info >= (3, 0):
- with self.assertRaises(NotImplementedError):
- c.open('foo', mode)
- else:
- with c.open('foo', mode) as f:
- if mode[0] == 'r' and '+' not in mode:
- self.assertEqual('foo', f.read(3))
- else:
- f.write('bar')
- f.seek(-3, os.SEEK_CUR)
- self.assertEqual('bar', f.read(3))
+ with c.open('foo', mode) as f:
+ if mode[0] == 'r' and '+' not in mode:
+ self.assertEqual('foo', f.read(3))
+ else:
+ f.write('bar')
+ f.seek(0, os.SEEK_SET)
+ self.assertEqual('bar', f.read(3))
+
+
+class TextModes(run_test_server.TestCaseWithServers):
+
+ def setUp(self):
+ arvados.config.KEEP_BLOCK_SIZE = 4
+ if sys.version_info < (3, 0):
+ import unicodedata
+ self.sailboat = unicodedata.lookup('SAILBOAT')
+ self.snowman = unicodedata.lookup('SNOWMAN')
+ else:
+ self.sailboat = '\N{SAILBOAT}'
+ self.snowman = '\N{SNOWMAN}'
+
+ def tearDown(self):
+ arvados.config.KEEP_BLOCK_SIZE = 2 ** 26
+
+ def test_read_sailboat_across_block_boundary(self):
+ c = Collection()
+ f = c.open('sailboats', 'wb')
+ data = self.sailboat.encode('utf-8')
+ f.write(data)
+ f.write(data[:1])
+ f.write(data[1:])
+ f.write(b'\n')
+ f.close()
+ self.assertRegex(c.portable_manifest_text(), r'\+4 .*\+3 ')
+
+ f = c.open('sailboats', 'r')
+ string = f.readline()
+ self.assertEqual(string, self.sailboat+self.sailboat+'\n')
+ f.close()
+
+ def test_write_snowman_across_block_boundary(self):
+ c = Collection()
+ f = c.open('snowmany', 'w')
+ data = self.snowman
+ f.write(data+data+'\n'+data+'\n')
+ f.close()
+ self.assertRegex(c.portable_manifest_text(), r'\+4 .*\+4 .*\+3 ')
+
+ f = c.open('snowmany', 'r')
+ self.assertEqual(f.readline(), self.snowman+self.snowman+'\n')
+ self.assertEqual(f.readline(), self.snowman+'\n')
+ f.close()
class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
if !include_trash
if sql_table != "api_client_authorizations"
# Only include records where the owner is not trashed
- sql_conds = "NOT EXISTS(SELECT 1 FROM #{PERMISSION_VIEW} "+
- "WHERE trashed = 1 AND "+
- "(#{sql_table}.owner_uuid = target_uuid)) #{exclude_trashed_records}"
+ sql_conds = "#{sql_table}.owner_uuid NOT IN (SELECT target_uuid FROM #{PERMISSION_VIEW} "+
+ "WHERE trashed = 1) #{exclude_trashed_records}"
end
end
else
# see issue 13208 for details.
# Match a direct read permission link from the user to the record uuid
- direct_check = "EXISTS(SELECT 1 FROM #{PERMISSION_VIEW} "+
- "WHERE user_uuid IN (:user_uuids) AND perm_level >= 1 #{trashed_check} AND target_uuid = #{sql_table}.uuid)"
+ direct_check = "#{sql_table}.uuid IN (SELECT target_uuid FROM #{PERMISSION_VIEW} "+
+ "WHERE user_uuid IN (:user_uuids) AND perm_level >= 1 #{trashed_check})"
# Match a read permission link from the user to the record's owner_uuid
owner_check = ""
if sql_table != "api_client_authorizations" and sql_table != "groups" then
- owner_check = "OR EXISTS(SELECT 1 FROM #{PERMISSION_VIEW} "+
- "WHERE user_uuid IN (:user_uuids) AND perm_level >= 1 #{trashed_check} AND target_uuid = #{sql_table}.owner_uuid AND target_owner_uuid IS NOT NULL) "
+ owner_check = "OR #{sql_table}.owner_uuid IN (SELECT target_uuid FROM #{PERMISSION_VIEW} "+
+ "WHERE user_uuid IN (:user_uuids) AND perm_level >= 1 #{trashed_check} AND target_owner_uuid IS NOT NULL) "
end
links_cond = ""
candidates = candidates.where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints]), md5: true)
log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" }
- candidates = candidates.where('runtime_user_uuid = ? or (runtime_user_uuid is NULL and runtime_auth_scopes is NULL)',
- attrs[:runtime_user_uuid])
- log_reuse_info(candidates) { "after filtering on runtime_user_uuid #{attrs[:runtime_user_uuid].inspect}" }
-
- candidates = candidates.where('runtime_auth_scopes = ? or (runtime_user_uuid is NULL and runtime_auth_scopes is NULL)',
- SafeJSON.dump(attrs[:runtime_auth_scopes].sort))
- log_reuse_info(candidates) { "after filtering on runtime_auth_scopes #{attrs[:runtime_auth_scopes].inspect}" }
-
log_reuse_info { "checking for state=Complete with readable output and log..." }
select_readable_pdh = Collection.
c1, _ = minimal_new(common_attrs.merge({runtime_token: api_client_authorizations(:active).token}))
assert_equal Container::Queued, c1.state
reused = Container.find_reusable(common_attrs.merge(runtime_token_attr(:container_runtime_token)))
- assert_nil reused
+ # See #14584
+ assert_equal c1.uuid, reused.uuid
end
test "find_reusable method with nil runtime_token, then runtime_token with different user" do
c1, _ = minimal_new(common_attrs.merge({runtime_token: nil}))
assert_equal Container::Queued, c1.state
reused = Container.find_reusable(common_attrs.merge(runtime_token_attr(:container_runtime_token)))
- assert_nil reused
+ # See #14584
+ assert_equal c1.uuid, reused.uuid
end
test "find_reusable method with different runtime_token, different scope, same user" do
c1, _ = minimal_new(common_attrs.merge({runtime_token: api_client_authorizations(:runtime_token_limited_scope).token}))
assert_equal Container::Queued, c1.state
reused = Container.find_reusable(common_attrs.merge(runtime_token_attr(:container_runtime_token)))
- assert_nil reused
+ # See #14584
+ assert_equal c1.uuid, reused.uuid
end
test "Container running" do
func (disp *Dispatcher) slurmConstraintArgs(container arvados.Container) []string {
mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
- var disk int64
- for _, m := range container.Mounts {
- if m.Kind == "tmp" {
- disk += m.Capacity
- }
- }
+ disk := dispatchcloud.EstimateScratchSpace(&container)
disk = int64(math.Ceil(float64(disk) / float64(1048576)))
return []string{
fmt.Sprintf("--mem=%d", mem),
func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
var args []string
args = append(args, disp.SbatchArguments...)
- args = append(args, "--job-name="+container.UUID, fmt.Sprintf("--nice=%d", initialNiceValue))
+ args = append(args, "--job-name="+container.UUID, fmt.Sprintf("--nice=%d", initialNiceValue), "--no-requeue")
if disp.cluster == nil {
// no instance types configured
[][]string{{
fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
fmt.Sprintf("--nice=%d", 10000),
+ "--no-requeue",
fmt.Sprintf("--mem=%d", 11445),
fmt.Sprintf("--cpus-per-task=%d", 4),
fmt.Sprintf("--tmp=%d", 45777),
func (s *IntegrationSuite) TestSbatchFail(c *C) {
s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
container := s.integrationTest(c,
- [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
+ [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--no-requeue", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
s.disp.SbatchArguments = defaults
args, err := s.disp.sbatchArgs(container)
- c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--nice=10000", "--mem=239", "--cpus-per-task=2", "--tmp=0"))
+ c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--nice=10000", "--no-requeue", "--mem=239", "--cpus-per-task=2", "--tmp=0"))
c.Check(err, IsNil)
}
}
args, err := s.disp.sbatchArgs(container)
c.Check(err == nil, Equals, trial.err == nil)
if trial.err == nil {
- c.Check(args, DeepEquals, append([]string{"--job-name=123", "--nice=10000"}, trial.sbatchArgs...))
+ c.Check(args, DeepEquals, append([]string{"--job-name=123", "--nice=10000", "--no-requeue"}, trial.sbatchArgs...))
} else {
c.Check(len(err.(dispatchcloud.ConstraintsNotSatisfiableError).AvailableTypes), Equals, len(trial.types))
}
args, err := s.disp.sbatchArgs(container)
c.Check(args, DeepEquals, []string{
- "--job-name=123", "--nice=10000",
+ "--job-name=123", "--nice=10000", "--no-requeue",
"--mem=239", "--cpus-per-task=1", "--tmp=0",
"--partition=blurb,b2",
})