if params["tab_pane"] == "Provenance_graph"
@prov_svg = ProvenanceHelper::create_provenance_graph(@object.provenance, "provenance_svg",
{:request => request,
- :direction => :bottom_up,
+ :direction => :top_down,
:combine_jobs => :script_only}) rescue nil
end
if params["tab_pane"] == "Used_by"
@used_by_svg = ProvenanceHelper::create_provenance_graph(@object.used_by, "used_by_svg",
{:request => request,
- :direction => :top_down,
- :combine_jobs => :script_only,
- :pdata_only => true}) rescue nil
+ :direction => :top_down,
+ :combine_jobs => :script_only,
+ :pdata_only => true}) rescue nil
end
end
end
@svg = ProvenanceHelper::create_provenance_graph nodes, "provenance_svg", {
:request => request,
+ :direction => :top_down,
:all_script_parameters => true,
:script_version_nodes => true}
end
if provenance
@prov_svg = ProvenanceHelper::create_provenance_graph provenance, "provenance_svg", {
:request => request,
+ :direction => :top_down,
:all_script_parameters => true,
:combine_jobs => :script_and_version,
:pips => pips,
#distribution(s)|name|version|iteration|type|architecture|extra fpm arguments
debian8,ubuntu1204,centos7|python-gflags|2.0|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|google-api-python-client|1.4.2|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|oauth2client|1.5.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|google-api-python-client|1.4.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|oauth2client|1.5.2|2|python|all
debian8,ubuntu1204,ubuntu1404,centos7|pyasn1|0.1.7|2|python|all
debian8,ubuntu1204,ubuntu1404,centos7|pyasn1-modules|0.0.5|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|rsa|3.4.2|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|uritemplate|3.0.0|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|httplib2|0.9.2|3|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rsa|3.4.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|uritemplate|3.0.0|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|httplib2|0.9.2|3|python|all
debian8,ubuntu1204,centos7|ws4py|0.3.5|2|python|all
debian8,ubuntu1204,centos7|pykka|1.2.1|2|python|all
debian8,ubuntu1204,ubuntu1404|six|1.10.0|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|ciso8601|1.0.3|3|python|amd64
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|ciso8601|1.0.3|3|python|amd64
debian8,ubuntu1204,centos7|pycrypto|2.6.1|3|python|amd64
-debian8,ubuntu1204,ubuntu1404|backports.ssl_match_hostname|3.5.0.1|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604|backports.ssl_match_hostname|3.5.0.1|2|python|all
debian8,ubuntu1204,ubuntu1404,centos7|llfuse|0.41.1|3|python|amd64
-debian8,ubuntu1204,ubuntu1404,centos7|pycurl|7.19.5.3|3|python|amd64
-debian8,ubuntu1204,ubuntu1404,centos7|pyyaml|3.12|2|python|amd64
-debian8,ubuntu1204,ubuntu1404,centos7|rdflib|4.2.1|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pycurl|7.19.5.3|3|python|amd64
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pyyaml|3.12|2|python|amd64
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rdflib|4.2.1|2|python|all
debian8,ubuntu1204,ubuntu1404,centos7|shellescape|3.4.1|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|mistune|0.7.3|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|typing|3.5.3.0|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|mistune|0.7.3|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|typing|3.5.3.0|2|python|all
debian8,ubuntu1204,ubuntu1404,centos7|avro|1.8.1|2|python|all
debian8,ubuntu1204,ubuntu1404,centos7|ruamel.ordereddict|0.4.9|2|python|amd64
-debian8,ubuntu1204,ubuntu1404,centos7|cachecontrol|0.11.7|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|pathlib2|2.1.0|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|docker-py|1.7.2|2|python3|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|cachecontrol|0.11.7|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pathlib2|2.1.0|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|docker-py|1.7.2|2|python3|all
debian8,ubuntu1204,centos7|six|1.10.0|2|python3|all
debian8,ubuntu1204,ubuntu1404,centos7|requests|2.12.4|2|python3|all
-debian8,ubuntu1204,ubuntu1404,centos7|websocket-client|0.37.0|2|python3|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|websocket-client|0.37.0|2|python3|all
ubuntu1204|requests|2.12.4|2|python|all
ubuntu1204,centos7|contextlib2|0.5.4|2|python|all
ubuntu1204,centos7|isodate|0.5.4|2|python|all
centos7|sparqlwrapper|1.8.0|2|python|all
centos7|html5lib|0.9999999|2|python|all
centos7|keepalive|0.5|2|python|all
-all|lockfile|0.12.2|2|python|all|--epoch 1
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|lockfile|0.12.2|2|python|all|--epoch 1
all|ruamel.yaml|0.13.7|2|python|amd64|--python-setup-py-arguments --single-version-externally-managed
all|cwltest|1.0.20160907111242|3|python|all|--depends 'python-futures >= 3.0.5'
all|rdflib-jsonld|0.4.0|2|python|all
-all: centos7/generated debian8/generated ubuntu1204/generated ubuntu1404/generated
+all: centos7/generated debian8/generated ubuntu1204/generated ubuntu1404/generated ubuntu1604/generated
centos7/generated: common-generated-all
test -d centos7/generated || mkdir centos7/generated
test -d ubuntu1404/generated || mkdir ubuntu1404/generated
cp -rlt ubuntu1404/generated common-generated/*
+ubuntu1604/generated: common-generated-all
+ test -d ubuntu1604/generated || mkdir ubuntu1604/generated
+ cp -rlt ubuntu1604/generated common-generated/*
+
GOTARBALL=go1.7.1.linux-amd64.tar.gz
common-generated-all: common-generated/$(GOTARBALL)
--- /dev/null
+FROM ubuntu:xenial
+MAINTAINER Ward Vandewege <ward@curoverse.com>
+
+# Install dependencies and set up system.
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev libgnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip
+
+# Install RVM
+RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+ curl -L https://get.rvm.io | bash -s stable && \
+ /usr/local/rvm/bin/rvm install 2.3.3 && \
+ /usr/local/rvm/bin/rvm alias create default ruby-2.3.3 && \
+ /usr/local/rvm/bin/rvm-exec default gem install bundler && \
+ /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
+
+# Install golang binary
+ADD generated/go1.7.1.linux-amd64.tar.gz /usr/local/
+RUN ln -s /usr/local/go/bin/go /usr/local/bin/
+
+ENV WORKSPACE /arvados
+CMD ["/usr/local/rvm/bin/rvm-exec", "default", "bash", "/jenkins/run-build-packages.sh", "--target", "ubuntu1604"]
--- /dev/null
+FROM ubuntu:xenial
+MAINTAINER Ward Vandewege <ward@curoverse.com>
+
+# Install RVM
+RUN apt-get update && \
+ DEBIAN_FRONTEND=noninteractive apt-get -y install --no-install-recommends curl ca-certificates && \
+ gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+ curl -L https://get.rvm.io | bash -s stable && \
+ /usr/local/rvm/bin/rvm install 2.3 && \
+ /usr/local/rvm/bin/rvm alias create default ruby-2.3
+
+# udev daemon can't start in a container, so don't try.
+RUN mkdir -p /etc/udev/disabled
+
+RUN echo "deb file:///arvados/packages/ubuntu1604/ /" >>/etc/apt/sources.list
+
+# Add preferences file for the Arvados packages. This pins Arvados
+# packages at priority 501, so that older python dependency versions
+# are preferred in those cases where we need them
+ADD etc-apt-preferences.d-arvados /etc/apt/preferences.d/arvados
--- /dev/null
+Package: *
+Pin: release o=Arvados
+Pin-Priority: 501
dpkg-query --show > "$ARV_PACKAGES_DIR/$1.before"
apt-get -qq update
-apt-get --assume-yes --force-yes install "$1"
+apt-get --assume-yes --allow-unauthenticated install "$1"
dpkg-query --show > "$ARV_PACKAGES_DIR/$1.after"
--command
Build command to execute (default: use built-in Docker image command)
--test-packages
- Run package install test script "test-packages-$target.sh"
+ Run package install test script "test-packages-[target].sh"
--debug
Output debug information (default: false)
--only-build <package>
if [[ -n "$(find $WORKSPACE/packages/$TARGET -name '*.deb')" ]] ; then
(cd $WORKSPACE/packages/$TARGET
- dpkg-scanpackages . 2> >(grep -v 'warning' 1>&2) | gzip -c > Packages.gz
+ dpkg-scanpackages . 2> >(grep -v 'warning' 1>&2) | tee Packages | gzip -c > Packages.gz
+ apt-ftparchive -o APT::FTPArchive::Release::Origin=Arvados release . > Release
)
fi
ubuntu1404)
FORMAT=deb
;;
+ ubuntu1604)
+ FORMAT=deb
+ ;;
centos7)
FORMAT=rpm
PYTHON2_PACKAGE=$(rpm -qf "$(which python$PYTHON2_VERSION)" --queryformat '%{NAME}\n')
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
- <a class="navbar-brand" href="{{ site.baseurl }}/">Arvados Docs</a>
+ <a class="navbar-brand" href="{{ site.baseurl }}/">Arvados™ Docs</a>
</div>
<div class="collapse navbar-collapse" id="bs-navbar-collapse">
<ul class="nav navbar-nav">
---
layout: default
navsection: userguide
-title: Welcome to Arvados!
+title: Welcome to Arvados™!
...
_If you are new to Arvados, please try the Quickstart on <a href="http://doc.arvados.org">the documentation homepage</a> instead of this detailed User Guide._
if arvargs.debug:
logger.setLevel(logging.DEBUG)
+ logging.getLogger('arvados').setLevel(logging.DEBUG)
if arvargs.quiet:
logger.setLevel(logging.WARN)
+ logging.getLogger('arvados').setLevel(logging.WARN)
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
if arvargs.metrics:
self.uuid = response["uuid"]
self.arvrunner.processes[self.uuid] = self
- logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-
if response["state"] == "Final":
+ logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
self.done(response)
+ else:
+ logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
except Exception as e:
logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
self.output_callback({}, "permanentFail")
import logging
import sys
import threading
+import copy
from schema_salad.sourceline import SourceLine
"""Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
+ dockerRequirement = copy.deepcopy(dockerRequirement)
dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
if hasattr(dockerRequirement, 'lc'):
dockerRequirement.lc.data["dockerImageId"] = dockerRequirement.lc.data["dockerPull"]
import arvados.collection
from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing, upload_workflow_collection
from .pathmapper import InitialWorkDirPathMapper
from .perf import Perf
from . import done
self.update_pipeline_component(response)
- logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
+ if response["state"] == "Complete":
+ logger.info("%s reused job %s", self.arvrunner.label(self), response["uuid"])
with Perf(metrics, "done %s" % self.name):
self.done(response)
+ else:
+ logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
except Exception as e:
logger.exception("%s error" % (self.arvrunner.label(self)))
self.output_callback({}, "permanentFail")
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
- def upload_workflow_collection(self, packed):
- collection = arvados.collection.Collection(api_client=self.arvrunner.api,
- keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries)
- with collection.open("workflow.cwl", "w") as f:
- f.write(yaml.round_trip_dump(packed))
-
- filters = [["portable_data_hash", "=", collection.portable_data_hash()],
- ["name", "like", self.name+"%"]]
- if self.arvrunner.project_uuid:
- filters.append(["owner_uuid", "=", self.arvrunner.project_uuid])
- exists = self.arvrunner.api.collections().list(filters=filters).execute(num_retries=self.arvrunner.num_retries)
-
- if exists["items"]:
- logger.info("Using collection %s", exists["items"][0]["uuid"])
- else:
- collection.save_new(name=self.name,
- owner_uuid=self.arvrunner.project_uuid,
- ensure_unique_name=True,
- num_retries=self.arvrunner.num_retries)
- logger.info("Uploaded to %s", collection.manifest_locator())
-
- return collection.portable_data_hash()
-
def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
"""Create an Arvados job specification for this workflow.
self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
else:
packed = packed_workflow(self.arvrunner, self.tool)
- wf_pdh = self.upload_workflow_collection(packed)
+ wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
adjustDirObjs(self.job_order, trim_listing)
import ruamel.yaml as yaml
-from .runner import upload_dependencies, trim_listing, packed_workflow
+from .runner import upload_dependencies, trim_listing, packed_workflow, upload_workflow_collection
from .arvtool import ArvadosCommandTool
from .perf import Perf
call = arvRunner.api.workflows().create(body=body)
return call.execute(num_retries=arvRunner.num_retries)["uuid"]
+def dedup_reqs(reqs):
+ dedup = {}
+ for r in reversed(reqs):
+ if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
+ dedup[r["class"]] = r
+ return [dedup[r] for r in sorted(dedup.keys())]
+
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
self.arvrunner = arvrunner
self.work_api = kwargs["work_api"]
+ self.wf_pdh = None
def job(self, joborder, output_callback, **kwargs):
kwargs["work_api"] = self.work_api
document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
with Perf(metrics, "subworkflow upload_deps"):
- workflowobj["requirements"] = self.requirements + workflowobj.get("requirements", [])
- workflowobj["hints"] = self.hints + workflowobj.get("hints", [])
- packed = pack(document_loader, workflowobj, uri, self.metadata)
-
- upload_dependencies(self.arvrunner,
- kwargs.get("name", ""),
- document_loader,
- packed,
- uri,
- False)
-
upload_dependencies(self.arvrunner,
os.path.basename(joborder.get("id", "#")),
document_loader,
joborder.get("id", "#"),
False)
+ if self.wf_pdh is None:
+ workflowobj["requirements"] = dedup_reqs(self.requirements)
+ workflowobj["hints"] = dedup_reqs(self.hints)
+
+ packed = pack(document_loader, workflowobj, uri, self.metadata)
+
+ upload_dependencies(self.arvrunner,
+ kwargs.get("name", ""),
+ document_loader,
+ packed,
+ uri,
+ False)
+
with Perf(metrics, "subworkflow adjust"):
joborder_keepmount = copy.deepcopy(joborder)
adjustFileObjs(joborder_keepmount, keepmount)
adjustDirObjs(joborder_keepmount, keepmount)
- adjustFileObjs(packed, keepmount)
- adjustDirObjs(packed, keepmount)
+
+ if self.wf_pdh is None:
+ adjustFileObjs(packed, keepmount)
+ adjustDirObjs(packed, keepmount)
+ self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
wf_runner = cmap({
"class": "CommandLineTool",
"class": "InitialWorkDirRequirement",
"listing": [{
"entryname": "workflow.cwl",
- "entry": yaml.round_trip_dump(packed).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+ "entry": {
+ "class": "File",
+ "location": "keep:%s/workflow.cwl" % self.wf_pdh
+ }
}, {
"entryname": "cwl.input.yml",
- "entry": yaml.round_trip_dump(joborder_keepmount).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+ "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
}]
}],
"hints": workflowobj["hints"],
# type: (List[Any], unicode) -> None
uploadfiles = set()
- for k,v in self.arvrunner.get_uploaded().iteritems():
- self._pathmap[k] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+ already_uploaded = self.arvrunner.get_uploaded()
+ for k in referenced_files:
+ loc = k["location"]
+ if loc in already_uploaded:
+ v = already_uploaded[loc]
+ self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
for srcobj in referenced_files:
self.visit(srcobj, uploadfiles)
def upload_docker(arvrunner, tool):
- """Visitor which uploads Docker images referenced in CommandLineTool objects."""
+ """Uploads Docker images used in CommandLineTool objects."""
+
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+ elif isinstance(tool, cwltool.workflow.Workflow):
+ for s in tool.steps:
+ upload_docker(arvrunner, s.embedded_tool)
def packed_workflow(arvrunner, tool):
"""Create a packed workflow.
def upload_workflow_deps(arvrunner, tool):
# Ensure that Docker images needed by this workflow are available
- tool.visit(partial(upload_docker, arvrunner))
+
+ upload_docker(arvrunner, tool)
document_loader = tool.doc_loader
raise Exception("Docker image %s is not available\n%s" % (img, e) )
return img
+def upload_workflow_collection(arvrunner, name, packed):
+ collection = arvados.collection.Collection(api_client=arvrunner.api,
+ keep_client=arvrunner.keep_client,
+ num_retries=arvrunner.num_retries)
+ with collection.open("workflow.cwl", "w") as f:
+ f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
+
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", name+"%"]]
+ if arvrunner.project_uuid:
+ filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+ exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
+
+ if exists["items"]:
+ logger.info("Using collection %s", exists["items"][0]["uuid"])
+ else:
+ collection.save_new(name=name,
+ owner_uuid=arvrunner.project_uuid,
+ ensure_unique_name=True,
+ num_retries=arvrunner.num_retries)
+ logger.info("Uploaded to %s", collection.manifest_locator())
+
+ return collection.portable_data_hash()
+
+
class Runner(object):
"""Base class for runner processes, which submit an instance of
arvados-cwl-runner and wait for the final result."""
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
'cwltool==1.0.20170119234115',
- 'schema-salad==2.2.20170119151016',
+ 'schema-salad==2.2.20170126160727',
'ruamel.yaml==0.13.7',
'arvados-python-client>=0.1.20170112173420',
'setuptools'
find_or_create=True)
mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
- mockcollection().open().__enter__().write.assert_has_calls([mock.call('sleeptime: 5')])
+ mockcollection().open().__enter__().write.assert_has_calls([mock.call(
+'''{
+ "sleeptime": 5
+}''')])
def test_default_work_api(self):
arvados_cwl.add_arv_hints()
'class': 'Directory'
},
'cwl:tool':
- 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main'
+ '4db32e8a15aa48ea084b2f38108f406d+60/workflow.cwl#main'
},
'repository': 'arvados',
'script_version': 'master',
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main',
+ 'cwl:tool': '4db32e8a15aa48ea084b2f38108f406d+60/workflow.cwl#main',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
class TestSubmit(unittest.TestCase):
+ @mock.patch("arvados_cwl.runner.arv_docker_get_image")
@mock.patch("time.sleep")
@stubs
- def test_submit(self, stubs, tm):
+ def test_submit(self, stubs, tm, arvdock):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=jobs", "--debug",
}), ensure_unique_name=True),
mock.call().execute()])
+ arvdock.assert_has_calls([
+ mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8"}, True, None),
+ mock.call(stubs.api, {'dockerPull': 'arvados/jobs:'+arvados_cwl.__version__}, True, None)
+ ])
+
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
},
'script_parameters': {
'cwl:tool':
- '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl#main',
+ '00e281847a33e1c0df93161d70a6fc5d+60/workflow.cwl#main',
'optionalFloatInput': None,
'fileInput': {
'type': 'File',
-cwlVersion: v1.0
-$graph:
-- class: Workflow
- id: '#main'
- inputs:
- - type: int
- id: '#main/sleeptime'
- outputs:
- - type: string
- outputSource: '#main/sleep1/out'
- id: '#main/out'
- steps:
- - in:
- - valueFrom: |
- ${
- return String(inputs.sleeptime) + "b";
+{
+ "$graph": [
+ {
+ "class": "Workflow",
+ "hints": [],
+ "id": "#main",
+ "inputs": [
+ {
+ "id": "#main/sleeptime",
+ "type": "int"
}
- id: '#main/sleep1/blurb'
- - source: '#main/sleeptime'
- id: '#main/sleep1/sleeptime'
- out: ['#main/sleep1/out']
- run:
- class: CommandLineTool
- inputs:
- - type: int
- inputBinding: {position: 1}
- id: '#main/sleep1/sleeptime'
- outputs:
- - type: string
- outputBinding:
- outputEval: out
- id: '#main/sleep1/out'
- baseCommand: sleep
- id: '#main/sleep1'
- requirements:
- - {class: InlineJavascriptRequirement}
- - {class: ScatterFeatureRequirement}
- - {class: StepInputExpressionRequirement}
- - {class: SubworkflowFeatureRequirement}
- hints:
- - class: http://arvados.org/cwl#RunInSingleContainer
\ No newline at end of file
+ ],
+ "outputs": [
+ {
+ "id": "#main/out",
+ "outputSource": "#main/sleep1/out",
+ "type": "string"
+ }
+ ],
+ "requirements": [
+ {
+ "class": "InlineJavascriptRequirement"
+ },
+ {
+ "class": "ScatterFeatureRequirement"
+ },
+ {
+ "class": "StepInputExpressionRequirement"
+ },
+ {
+ "class": "SubworkflowFeatureRequirement"
+ }
+ ],
+ "steps": [
+ {
+ "id": "#main/sleep1",
+ "in": [
+ {
+ "id": "#main/sleep1/blurb",
+ "valueFrom": "${\n return String(inputs.sleeptime) + \"b\";\n}\n"
+ },
+ {
+ "id": "#main/sleep1/sleeptime",
+ "source": "#main/sleeptime"
+ }
+ ],
+ "out": [
+ "#main/sleep1/out"
+ ],
+ "run": {
+ "baseCommand": "sleep",
+ "class": "CommandLineTool",
+ "inputs": [
+ {
+ "id": "#main/sleep1/sleeptime",
+ "inputBinding": {
+ "position": 1
+ },
+ "type": "int"
+ }
+ ],
+ "outputs": [
+ {
+ "id": "#main/sleep1/out",
+ "outputBinding": {
+ "outputEval": "out"
+ },
+ "type": "string"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ ],
+ "cwlVersion": "v1.0"
+}
\ No newline at end of file
--- /dev/null
+package config
+
+import (
+ "errors"
+ "os"
+
+ "github.com/ghodss/yaml"
+)
+
+// DumpAndExit writes the given config to stdout as YAML. If an error
+// occurs, that error is returned. Otherwise, the program exits 0.
+//
+// Example:
+//
+// log.Fatal(DumpAndExit(cfg))
+func DumpAndExit(cfg interface{}) error {
+ y, err := yaml.Marshal(cfg)
+ if err != nil {
+ return err
+ }
+ _, err = os.Stdout.Write(y)
+ if err != nil {
+ return err
+ }
+ os.Exit(0)
+ return errors.New("exit failed!?")
+}
--- /dev/null
+package keepclient
+
+import (
+ "io/ioutil"
+ "sort"
+ "sync"
+ "time"
+)
+
+var DefaultBlockCache = &BlockCache{}
+
+type BlockCache struct {
+ // Maximum number of blocks to keep in the cache. If 0, a
+ // default size (currently 4) is used instead.
+ MaxBlocks int
+
+ cache map[string]*cacheBlock
+ mtx sync.Mutex
+ setupOnce sync.Once
+}
+
+const defaultMaxBlocks = 4
+
+// Sweep deletes the least recently used blocks from the cache until
+// there are no more than MaxBlocks left.
+func (c *BlockCache) Sweep() {
+ max := c.MaxBlocks
+ if max < defaultMaxBlocks {
+ max = defaultMaxBlocks
+ }
+ c.mtx.Lock()
+ defer c.mtx.Unlock()
+ if len(c.cache) <= max {
+ return
+ }
+ lru := make([]time.Time, 0, len(c.cache))
+ for _, b := range c.cache {
+ lru = append(lru, b.lastUse)
+ }
+ sort.Sort(sort.Reverse(timeSlice(lru)))
+ threshold := lru[max]
+ for loc, b := range c.cache {
+ if !b.lastUse.After(threshold) {
+ delete(c.cache, loc)
+ }
+ }
+}
+
+// Get returns data from the cache, first retrieving it from Keep if
+// necessary.
+func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
+ c.setupOnce.Do(c.setup)
+ cacheKey := locator[:32]
+ c.mtx.Lock()
+ b, ok := c.cache[cacheKey]
+ if !ok || b.err != nil {
+ b = &cacheBlock{
+ fetched: make(chan struct{}),
+ lastUse: time.Now(),
+ }
+ c.cache[cacheKey] = b
+ go func() {
+ rdr, _, _, err := kc.Get(locator)
+ var data []byte
+ if err == nil {
+ data, err = ioutil.ReadAll(rdr)
+ }
+ c.mtx.Lock()
+ b.data, b.err = data, err
+ c.mtx.Unlock()
+ close(b.fetched)
+ go c.Sweep()
+ }()
+ }
+ c.mtx.Unlock()
+
+ // Wait (with mtx unlocked) for the fetch goroutine to finish,
+ // in case it hasn't already.
+ <-b.fetched
+
+ c.mtx.Lock()
+ b.lastUse = time.Now()
+ c.mtx.Unlock()
+ return b.data, b.err
+}
+
+func (c *BlockCache) setup() {
+ c.cache = make(map[string]*cacheBlock)
+}
+
+type timeSlice []time.Time
+
+func (ts timeSlice) Len() int { return len(ts) }
+
+func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
+
+func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
+
+type cacheBlock struct {
+ data []byte
+ err error
+ fetched chan struct{}
+ lastUse time.Time
+}
import (
"errors"
+ "fmt"
"io"
"os"
"git.curoverse.com/arvados.git/sdk/go/manifest"
)
-// ReadCloserWithLen extends io.ReadCloser with a Len() method that
-// returns the total number of bytes available to read.
-type ReadCloserWithLen interface {
- io.ReadCloser
+// A Reader implements, io.Reader, io.Seeker, and io.Closer, and has a
+// Len() method that returns the total number of bytes available to
+// read.
+type Reader interface {
+ io.Reader
+ io.Seeker
+ io.Closer
Len() uint64
}
// parameter when retrieving the collection record).
var ErrNoManifest = errors.New("Collection has no manifest")
-// CollectionFileReader returns a ReadCloserWithLen that reads file
-// content from a collection. The filename must be given relative to
-// the root of the collection, without a leading "./".
-func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) {
+// CollectionFileReader returns a Reader that reads file content from
+// a collection. The filename must be given relative to the root of
+// the collection, without a leading "./".
+func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (Reader, error) {
mText, ok := collection["manifest_text"].(string)
if !ok {
return nil, ErrNoManifest
return kc.ManifestFileReader(m, filename)
}
-func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) {
- rdrChan := make(chan *cfReader)
- go kc.queueSegmentsToGet(m, filename, rdrChan)
- r, ok := <-rdrChan
- if !ok {
- return nil, os.ErrNotExist
- }
- return r, nil
-}
-
-// Send segments for the specified file to r.toGet. Send a *cfReader
-// to rdrChan if the specified file is found (even if it's empty).
-// Then, close rdrChan.
-func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
- defer close(rdrChan)
-
- // q is a queue of FileSegments that we have received but
- // haven't yet been able to send to toGet.
- var q []*manifest.FileSegment
- var r *cfReader
- for seg := range m.FileSegmentIterByName(filename) {
- if r == nil {
- // We've just discovered that the requested
- // filename does appear in the manifest, so we
- // can return a real reader (not nil) from
- // CollectionFileReader().
- r = newCFReader(kc)
- rdrChan <- r
- }
- q = append(q, seg)
- r.totalSize += uint64(seg.Len)
- // Send toGet as many segments as we can until it
- // blocks.
- Q:
- for len(q) > 0 {
- select {
- case r.toGet <- q[0]:
- q = q[1:]
- default:
- break Q
- }
- }
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (Reader, error) {
+ f := &file{
+ kc: kc,
}
- if r == nil {
- // File not found.
- return
+ err := f.load(m, filename)
+ if err != nil {
+ return nil, err
}
- close(r.countDone)
- for _, seg := range q {
- r.toGet <- seg
- }
- close(r.toGet)
+ return f, nil
}
-type cfReader struct {
- keepClient *KeepClient
-
- // doGet() reads FileSegments from toGet, gets the data from
- // Keep, and sends byte slices to toRead to be consumed by
- // Read().
- toGet chan *manifest.FileSegment
-
- // toRead is a buffered channel, sized to fit one full Keep
- // block. This lets us verify checksums without having a
- // store-and-forward delay between blocks: by the time the
- // caller starts receiving data from block N, cfReader is
- // starting to fetch block N+1. A larger buffer would be
- // useful for a caller whose read speed varies a lot.
- toRead chan []byte
-
- // bytes ready to send next time someone calls Read()
- buf []byte
-
- // Total size of the file being read. Not safe to read this
- // until countDone is closed.
- totalSize uint64
- countDone chan struct{}
-
- // First error encountered.
- err error
-
- // errNotNil is closed IFF err contains a non-nil error.
- // Receiving from it will block until an error occurs.
- errNotNil chan struct{}
+type file struct {
+ kc *KeepClient
+ segments []*manifest.FileSegment
+ size int64 // total file size
+ offset int64 // current read offset
+
+ // current/latest segment accessed -- might or might not match pos
+ seg *manifest.FileSegment
+ segStart int64 // position of segment relative to file
+ segData []byte
+ segNext []*manifest.FileSegment
+ readaheadDone bool
+}
- // rdrClosed is closed IFF the reader's Close() method has
- // been called. Any goroutines associated with the reader will
- // stop and free up resources when they notice this channel is
- // closed.
- rdrClosed chan struct{}
+// Close implements io.Closer.
+func (f *file) Close() error {
+ f.kc = nil
+ f.segments = nil
+ f.segData = nil
+ return nil
}
-func (r *cfReader) Read(outbuf []byte) (int, error) {
- if r.Error() != nil {
- // Short circuit: the caller might as well find out
- // now that we hit an error, even if there's buffered
- // data we could return.
- return 0, r.Error()
+// Read implements io.Reader.
+func (f *file) Read(buf []byte) (int, error) {
+ if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
+ // f.seg does not cover the current read offset
+ // (f.pos). Iterate over f.segments to find the one
+ // that does.
+ f.seg = nil
+ f.segStart = 0
+ f.segData = nil
+ f.segNext = f.segments
+ for len(f.segNext) > 0 {
+ seg := f.segNext[0]
+ f.segNext = f.segNext[1:]
+ segEnd := f.segStart + int64(seg.Len)
+ if segEnd > f.offset {
+ f.seg = seg
+ break
+ }
+ f.segStart = segEnd
+ }
+ f.readaheadDone = false
+ }
+ if f.seg == nil {
+ return 0, io.EOF
}
- for len(r.buf) == 0 {
- // Private buffer was emptied out by the last Read()
- // (or this is the first Read() and r.buf is nil).
- // Read from r.toRead until we get a non-empty slice
- // or hit an error.
- var ok bool
- r.buf, ok = <-r.toRead
- if r.Error() != nil {
- // Error encountered while waiting for bytes
- return 0, r.Error()
- } else if !ok {
- // No more bytes to read, no error encountered
- return 0, io.EOF
+ if f.segData == nil {
+ data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
+ if err != nil {
+ return 0, err
+ }
+ if len(data) < f.seg.Offset+f.seg.Len {
+ return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
}
+ f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
}
- // Copy as much as possible from our private buffer to the
- // caller's buffer
- n := len(r.buf)
- if len(r.buf) > len(outbuf) {
- n = len(outbuf)
+ // dataOff and dataLen denote a portion of f.segData
+ // corresponding to a portion of the file at f.offset.
+ dataOff := int(f.offset - f.segStart)
+ dataLen := f.seg.Len - dataOff
+
+ if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
+ // If we have already read more than just the first
+ // few bytes of this file, and we have already
+ // consumed a noticeable portion of this segment, and
+ // there's more data for this file in the next segment
+ // ... then there's a good chance we are going to need
+ // the data for that next segment soon. Start getting
+ // it into the cache now.
+ go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
+ f.readaheadDone = true
}
- copy(outbuf[:n], r.buf[:n])
-
- // Next call to Read() will continue where we left off
- r.buf = r.buf[n:]
+ n := len(buf)
+ if n > dataLen {
+ n = dataLen
+ }
+ copy(buf[:n], f.segData[dataOff:dataOff+n])
+ f.offset += int64(n)
return n, nil
}
-// Close releases resources. It returns a non-nil error if an error
-// was encountered by the reader.
-func (r *cfReader) Close() error {
- close(r.rdrClosed)
- return r.Error()
-}
-
-// Error returns an error if one has been encountered, otherwise
-// nil. It is safe to call from any goroutine.
-func (r *cfReader) Error() error {
- select {
- case <-r.errNotNil:
- return r.err
+// Seek implements io.Seeker.
+func (f *file) Seek(offset int64, whence int) (int64, error) {
+ var want int64
+ switch whence {
+ case io.SeekStart:
+ want = offset
+ case io.SeekCurrent:
+ want = f.offset + offset
+ case io.SeekEnd:
+ want = f.size + offset
default:
- return nil
+ return f.offset, fmt.Errorf("invalid whence %d", whence)
+ }
+ if want < 0 {
+ return f.offset, fmt.Errorf("attempted seek to %d", want)
+ }
+ if want > f.size {
+ want = f.size
}
+ f.offset = want
+ return f.offset, nil
}
-// Len returns the total number of bytes in the file being read. If
-// necessary, it waits for manifest parsing to finish.
-func (r *cfReader) Len() uint64 {
- // Wait for all segments to be counted
- <-r.countDone
- return r.totalSize
+// Len returns the file size in bytes.
+func (f *file) Len() uint64 {
+ return uint64(f.size)
}
-func (r *cfReader) doGet() {
- defer close(r.toRead)
-GET:
- for fs := range r.toGet {
- rdr, _, _, err := r.keepClient.Get(fs.Locator)
- if err != nil {
- r.err = err
- close(r.errNotNil)
- return
- }
- var buf = make([]byte, fs.Offset+fs.Len)
- _, err = io.ReadFull(rdr, buf)
- errClosing := rdr.Close()
- if err == nil {
- err = errClosing
- }
- if err != nil {
- r.err = err
- close(r.errNotNil)
- return
- }
- for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen {
- if bOff+bLen > fs.Offset+fs.Len {
- bLen = fs.Offset + fs.Len - bOff
- }
- select {
- case r.toRead <- buf[bOff : bOff+bLen]:
- case <-r.rdrClosed:
- // Reader is closed: no point sending
- // anything more to toRead.
- break GET
- }
- }
- // It is possible that r.rdrClosed is closed but we
- // never noticed because r.toRead was also ready in
- // every select{} above. Here we check before wasting
- // a keepclient.Get() call.
- select {
- case <-r.rdrClosed:
- break GET
- default:
- }
+func (f *file) load(m manifest.Manifest, path string) error {
+ f.segments = nil
+ f.size = 0
+ for seg := range m.FileSegmentIterByName(path) {
+ f.segments = append(f.segments, seg)
+ f.size += int64(seg.Len)
}
- // In case we exited the above loop early: before returning,
- // drain the toGet channel so its sender doesn't sit around
- // blocking forever.
- for range r.toGet {
+ if f.segments == nil {
+ return os.ErrNotExist
}
-}
-
-func newCFReader(kc *KeepClient) (r *cfReader) {
- r = new(cfReader)
- r.keepClient = kc
- r.rdrClosed = make(chan struct{})
- r.errNotNil = make(chan struct{})
- r.toGet = make(chan *manifest.FileSegment, 2)
- r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize)
- r.countDone = make(chan struct{})
- go r.doGet()
- return
+ return nil
}
{mt: mt, f: "segmented/frob", want: "frob"},
{mt: mt, f: "segmented/oof", want: "oof"},
} {
+ c.Logf("%#v", testCase)
rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
switch want := testCase.want.(type) {
case error:
c.Check(n, check.Equals, 0)
c.Check(err, check.Equals, io.EOF)
}
+
+ for a := len(want) - 2; a >= 0; a-- {
+ for b := a + 1; b <= len(want); b++ {
+ offset, err := rdr.Seek(int64(a), io.SeekStart)
+ c.Logf("...a=%d, b=%d", a, b)
+ c.Check(err, check.IsNil)
+ c.Check(offset, check.Equals, int64(a))
+ buf := make([]byte, b-a)
+ n, err := io.ReadFull(rdr, buf)
+ c.Check(n, check.Equals, b-a)
+ c.Check(string(buf), check.Equals, want[a:b])
+ }
+ }
+ offset, err := rdr.Seek(-1, io.SeekStart)
+ c.Check(err, check.NotNil)
+ c.Check(offset, check.Equals, int64(len(want)))
+
c.Check(rdr.Close(), check.Equals, nil)
}
}
}
func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
+ s.kc.BlockCache = &BlockCache{}
s.kc.PutB([]byte("foo"))
+ s.kc.PutB([]byte("bar"))
+ s.kc.PutB([]byte("baz"))
mt := ". "
- for i := 0; i < 1000; i++ {
- mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 "
+ for i := 0; i < 300; i++ {
+ mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3 "
}
- mt += "0:3000:foo1000.txt\n"
+ mt += "0:2700:foo900.txt\n"
// Grab the stub server's lock, ensuring our cfReader doesn't
// get anything back from its first call to kc.Get() before we
s.handler.lock <- struct{}{}
opsBeforeRead := *s.handler.ops
- rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt")
+ rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo900.txt")
c.Assert(err, check.IsNil)
firstReadDone := make(chan struct{})
go func() {
- rdr.Read(make([]byte, 6))
- firstReadDone <- struct{}{}
+ n, err := rdr.Read(make([]byte, 3))
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+ close(firstReadDone)
}()
- err = rdr.Close()
- c.Assert(err, check.IsNil)
- c.Assert(rdr.(*cfReader).Error(), check.IsNil)
// Release the stub server's lock. The first GET operation will proceed.
<-s.handler.lock
// received from the first GET.
<-firstReadDone
- // doGet() should close toRead before sending any more bufs to it.
- if what, ok := <-rdr.(*cfReader).toRead; ok {
- c.Errorf("Got %q, expected toRead to be closed", what)
- }
+ err = rdr.Close()
+ c.Check(err, check.IsNil)
// Stub should have handled exactly one GET request.
- c.Assert(*s.handler.ops, check.Equals, opsBeforeRead+1)
+ c.Check(*s.handler.ops, check.Equals, opsBeforeRead+1)
}
func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
c.Check(err, check.NotNil)
c.Check(err, check.Not(check.Equals), io.EOF)
}
- c.Check(rdr.Close(), check.NotNil)
+ c.Check(rdr.Close(), check.IsNil)
}
lock sync.RWMutex
Client *http.Client
Retries int
+ BlockCache *BlockCache
// set to 1 if all writable services are of disk type, otherwise 0
replicasPerService int
return found
}
+func (kc *KeepClient) cache() *BlockCache {
+ if kc.BlockCache != nil {
+ return kc.BlockCache
+ } else {
+ return DefaultBlockCache
+ }
+}
+
type Locator struct {
Hash string
Size int // -1 if data size is not known
return 0
@synchronized
- def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+ def manifest_text(self, stream_name=".", portable_locators=False,
+ normalize=False, only_committed=False):
buf = ""
filestream = []
for segment in self.segments:
loc = segment.locator
- if loc.startswith("bufferblock"):
+ if self.parent._my_block_manager().is_bufferblock(loc):
+ if only_committed:
+ continue
loc = self._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
manifest = self._local_collection.manifest_text()
else:
# Get the manifest text without comitting pending blocks
- manifest = self._local_collection.manifest_text(".", strip=False,
+ manifest = self._local_collection.manifest_text(strip=False,
normalize=False,
only_committed=True)
# Update cache
end
def dns_server_update
- if hostname_changed? && hostname_was
+ if ip_address_changed? && ip_address
+ Node.where('id != ? and ip_address = ?',
+ id, ip_address).each do |stale_node|
+ # One or more(!) stale node records have the same IP address
+ # as the new node. Clear the ip_address field on the stale
+ # nodes. Otherwise, we (via SLURM) might inadvertently connect
+ # to the new node using the old node's hostname.
+ stale_node.update_attributes!(ip_address: nil)
+ end
+ end
+ if hostname_was && hostname_changed?
self.class.dns_server_update(hostname_was, UNUSED_NODE_IP)
end
- if hostname_changed? or ip_address_changed?
- if ip_address
- Node.where('id != ? and ip_address = ? and last_ping_at < ?',
- id, ip_address, 10.minutes.ago).each do |stale_node|
- # One or more stale compute node records have the same IP
- # address as the new node. Clear the ip_address field on
- # the stale nodes.
- stale_node.ip_address = nil
- stale_node.save!
- end
- end
- if hostname
- self.class.dns_server_update(hostname, ip_address || UNUSED_NODE_IP)
- end
+ if hostname && (hostname_changed? || ip_address_changed?)
+ self.class.dns_server_update(hostname, ip_address || UNUSED_NODE_IP)
end
end
node.update_attributes!(hostname: 'foo0', ip_address: '10.11.12.14')
end
end
+
+ test 'newest ping wins IP address conflict' do
+ act_as_system_user do
+ n1, n2 = Node.create!, Node.create!
+
+ n1.ping(ip: '10.5.5.5', ping_secret: n1.info['ping_secret'])
+ n1.reload
+
+ Node.expects(:dns_server_update).with(n1.hostname, Node::UNUSED_NODE_IP)
+ Node.expects(:dns_server_update).with(Not(equals(n1.hostname)), '10.5.5.5')
+ n2.ping(ip: '10.5.5.5', ping_secret: n2.info['ping_secret'])
+
+ n1.reload
+ n2.reload
+ assert_nil n1.ip_address
+ assert_equal '10.5.5.5', n2.ip_address
+
+ Node.expects(:dns_server_update).with(n2.hostname, Node::UNUSED_NODE_IP)
+ Node.expects(:dns_server_update).with(n1.hostname, '10.5.5.5')
+ n1.ping(ip: '10.5.5.5', ping_secret: n1.info['ping_secret'])
+
+ n1.reload
+ n2.reload
+ assert_nil n2.ip_address
+ assert_equal '10.5.5.5', n1.ip_address
+ end
+ end
end
"Value for GITOLITE_HTTP_HOME environment variable. If not empty, GL_BYPASS_ACCESS_CHECKS=1 will also be set."+deprecated)
cfgPath := flag.String("config", defaultCfgPath, "Configuration file `path`.")
+ dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
flag.Usage = usage
flag.Parse()
}
}
+ if *dumpConfig {
+ log.Fatal(config.DumpAndExit(theConfig))
+ }
+
srv := &server{}
if err := srv.Start(); err != nil {
log.Fatal(err)
"bytes"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/config"
- "git.curoverse.com/arvados.git/sdk/go/dispatch"
- "github.com/coreos/go-systemd/daemon"
"log"
"math"
"os"
"os/exec"
"strings"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "github.com/coreos/go-systemd/daemon"
)
// Config used by crunch-dispatch-slurm
"config",
defaultConfigPath,
"`path` to JSON or YAML configuration file")
+ dumpConfig := flag.Bool(
+ "dump-config",
+ false,
+ "write current configuration to stdout and exit")
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
}
+ if *dumpConfig {
+ log.Fatal(config.DumpAndExit(theConfig))
+ }
+
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Printf("Error making Arvados client: %v", err)
// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
- ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+ ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
}
// NewLogWriter is a factory function to create a new log writer.
return fw.len
}
-func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (fw FileWrapper) Seek(int64, int) (int64, error) {
+ return 0, errors.New("not implemented")
+}
+
+func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) {
if filename == hwImageId+".tar" {
rdr := ioutil.NopCloser(&bytes.Buffer{})
client.Called = true
return "", 0, errors.New("KeepError")
}
-func (KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) {
return nil, errors.New("KeepError")
}
return 0
}
-func (KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (ErrorReader) Seek(int64, int) (int64, error) {
+ return 0, errors.New("ErrorReader")
+}
+
+func (KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) {
return ErrorReader{}, nil
}
var debugf = func(string, ...interface{}) {}
func main() {
- var config Config
+ var cfg Config
var runOptions RunOptions
configPath := flag.String("config", defaultConfigPath,
"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
"send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+ dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
debugFlag := flag.Bool("debug", false, "enable debug messages")
flag.Usage = usage
flag.Parse()
- mustReadConfig(&config, *configPath)
+ mustReadConfig(&cfg, *configPath)
if *serviceListPath != "" {
- mustReadConfig(&config.KeepServiceList, *serviceListPath)
+ mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
+ }
+
+ if *dumpConfig {
+ log.Fatal(config.DumpAndExit(cfg))
}
if *debugFlag {
debugf = log.Printf
- if j, err := json.Marshal(config); err != nil {
+ if j, err := json.Marshal(cfg); err != nil {
log.Fatal(err)
} else {
log.Printf("config is %s", j)
if *dumpFlag {
runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
}
- err := CheckConfig(config, runOptions)
+ err := CheckConfig(cfg, runOptions)
if err != nil {
// (don't run)
} else if runOptions.Once {
- _, err = (&Balancer{}).Run(config, runOptions)
+ _, err = (&Balancer{}).Run(cfg, runOptions)
} else {
- err = RunForever(config, runOptions, nil)
+ err = RunForever(cfg, runOptions, nil)
}
if err != nil {
log.Fatal(err)
"fmt"
"html"
"io"
- "mime"
"net/http"
"net/url"
"os"
- "regexp"
+ "path"
"strconv"
"strings"
"sync"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/auth"
}
defer rdr.Close()
- basenamePos := strings.LastIndex(filename, "/")
- if basenamePos < 0 {
- basenamePos = 0
- }
- extPos := strings.LastIndex(filename, ".")
- if extPos > basenamePos {
- // Now extPos is safely >= 0.
- if t := mime.TypeByExtension(filename[extPos:]); t != "" {
- w.Header().Set("Content-Type", t)
- }
- }
- if rdr, ok := rdr.(keepclient.ReadCloserWithLen); ok {
- w.Header().Set("Content-Length", fmt.Sprintf("%d", rdr.Len()))
- }
+ basename := path.Base(filename)
+ applyContentDispositionHdr(w, r, basename, attachment)
- applyContentDispositionHdr(w, r, filename[basenamePos:], attachment)
- rangeRdr, statusCode := applyRangeHdr(w, r, rdr)
-
- w.WriteHeader(statusCode)
- _, err = io.Copy(w, rangeRdr)
+ modstr, _ := collection["modified_at"].(string)
+ modtime, err := time.Parse(time.RFC3339Nano, modstr)
if err != nil {
- statusCode, statusText = http.StatusBadGateway, err.Error()
- }
-}
-
-var rangeRe = regexp.MustCompile(`^bytes=0-([0-9]*)$`)
-
-func applyRangeHdr(w http.ResponseWriter, r *http.Request, rdr keepclient.ReadCloserWithLen) (io.Reader, int) {
- w.Header().Set("Accept-Ranges", "bytes")
- hdr := r.Header.Get("Range")
- fields := rangeRe.FindStringSubmatch(hdr)
- if fields == nil {
- return rdr, http.StatusOK
- }
- rangeEnd, err := strconv.ParseInt(fields[1], 10, 64)
- if err != nil {
- // Empty or too big for int64 == send entire content
- return rdr, http.StatusOK
- }
- if uint64(rangeEnd) >= rdr.Len() {
- return rdr, http.StatusOK
+ modtime = time.Now()
}
- w.Header().Set("Content-Length", fmt.Sprintf("%d", rangeEnd+1))
- w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", 0, rangeEnd, rdr.Len()))
- return &io.LimitedReader{R: rdr, N: rangeEnd + 1}, http.StatusPartialContent
+ http.ServeContent(w, r, basename, modtime, rdr)
}
func applyContentDispositionHdr(w http.ResponseWriter, r *http.Request, filename string, isAttachment bool) {
)
}
-func (s *IntegrationSuite) TestRange(c *check.C) {
- s.testServer.Config.AnonymousTokens = []string{arvadostest.AnonymousToken}
- u, _ := url.Parse("http://example.com/c=" + arvadostest.HelloWorldCollection + "/Hello%20world.txt")
- req := &http.Request{
- Method: "GET",
- Host: u.Host,
- URL: u,
- RequestURI: u.RequestURI(),
- Header: http.Header{"Range": {"bytes=0-4"}},
- }
- resp := httptest.NewRecorder()
- s.testServer.Handler.ServeHTTP(resp, req)
- c.Check(resp.Code, check.Equals, http.StatusPartialContent)
- c.Check(resp.Body.String(), check.Equals, "Hello")
- c.Check(resp.Header().Get("Content-Length"), check.Equals, "5")
- c.Check(resp.Header().Get("Content-Range"), check.Equals, "bytes 0-4/12")
-
- req.Header.Set("Range", "bytes=0-")
- resp = httptest.NewRecorder()
- s.testServer.Handler.ServeHTTP(resp, req)
- // 200 and 206 are both correct:
- c.Check(resp.Code, check.Equals, http.StatusOK)
- c.Check(resp.Body.String(), check.Equals, "Hello world\n")
- c.Check(resp.Header().Get("Content-Length"), check.Equals, "12")
-
- // Unsupported ranges are ignored
- for _, hdr := range []string{
- "bytes=5-5", // non-zero start byte
- "bytes=-5", // last 5 bytes
- "cubits=0-5", // unsupported unit
- "bytes=0-340282366920938463463374607431768211456", // 2^128
- } {
- req.Header.Set("Range", hdr)
- resp = httptest.NewRecorder()
- s.testServer.Handler.ServeHTTP(resp, req)
- c.Check(resp.Code, check.Equals, http.StatusOK)
- c.Check(resp.Body.String(), check.Equals, "Hello world\n")
- c.Check(resp.Header().Get("Content-Length"), check.Equals, "12")
- c.Check(resp.Header().Get("Content-Range"), check.Equals, "")
- c.Check(resp.Header().Get("Accept-Ranges"), check.Equals, "bytes")
- }
-}
-
// XHRs can't follow redirect-with-cookie so they rely on method=POST
// and disposition=attachment (telling us it's acceptable to respond
// with content instead of a redirect) and an Origin header that gets
"Only serve attachments at the given `host:port`"+deprecated)
flag.BoolVar(&cfg.TrustAllContent, "trust-all-content", false,
"Serve non-public content from a single origin. Dangerous: read docs before using!"+deprecated)
+ dumpConfig := flag.Bool("dump-config", false,
+ "write current configuration to stdout and exit")
flag.Usage = usage
flag.Parse()
cfg.AnonymousTokens = []string{os.Getenv("ARVADOS_API_TOKEN")}
}
+ if *dumpConfig {
+ log.Fatal(config.DumpAndExit(cfg))
+ }
+
os.Setenv("ARVADOS_API_HOST", cfg.Client.APIHost)
srv := &server{Config: cfg}
if err := srv.Start(); err != nil {
--- /dev/null
+package main
+
+import (
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ check "gopkg.in/check.v1"
+)
+
+func (s *IntegrationSuite) TestRanges(c *check.C) {
+ blocksize := 1000000
+ var uuid string
+ {
+ testdata := make([]byte, blocksize)
+ for i := 0; i < blocksize; i++ {
+ testdata[i] = byte(' ')
+ }
+ copy(testdata[1:4], []byte("foo"))
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, check.Equals, nil)
+ arv.ApiToken = arvadostest.ActiveToken
+ kc, err := keepclient.MakeKeepClient(arv)
+ c.Assert(err, check.Equals, nil)
+ loc, _, err := kc.PutB(testdata[:])
+ c.Assert(err, check.Equals, nil)
+ loc2, _, err := kc.PutB([]byte{'Z'})
+ c.Assert(err, check.Equals, nil)
+
+ mtext := fmt.Sprintf(". %s %s %s %s %s 1:%d:testdata.bin 0:1:space.txt\n", loc, loc, loc, loc, loc2, blocksize*4)
+ coll := map[string]interface{}{}
+ err = arv.Create("collections",
+ map[string]interface{}{
+ "collection": map[string]interface{}{
+ "name": "test data for keep-web TestRanges",
+ "manifest_text": mtext,
+ },
+ }, &coll)
+ c.Assert(err, check.Equals, nil)
+ uuid = coll["uuid"].(string)
+ defer arv.Delete("collections", uuid, nil, nil)
+ }
+
+ url := mustParseURL("http://" + uuid + ".collections.example.com/testdata.bin")
+ for _, trial := range []struct {
+ header string
+ expectObey bool
+ expectBody string
+ }{
+ {"0-2", true, "foo"},
+ {"-2", true, " Z"},
+ {"1-4", true, "oo "},
+ {"z-y", false, ""},
+ {"1000000-1000003", true, "foo "},
+ {"999999-1000003", true, " foo "},
+ {"2000000-2000003", true, "foo "},
+ {"1999999-2000002", true, " foo"},
+ {"3999998-3999999", true, " Z"},
+ {"3999998-4000004", true, " Z"},
+ {"3999998-", true, " Z"},
+ } {
+ c.Logf("trial: %#v", trial)
+ resp := httptest.NewRecorder()
+ req := &http.Request{
+ Method: "GET",
+ URL: url,
+ Host: url.Host,
+ RequestURI: url.RequestURI(),
+ Header: http.Header{
+ "Authorization": {"OAuth2 " + arvadostest.ActiveToken},
+ "Range": {"bytes=" + trial.header},
+ },
+ }
+ s.testServer.Handler.ServeHTTP(resp, req)
+ if trial.expectObey {
+ c.Check(resp.Code, check.Equals, http.StatusPartialContent)
+ c.Check(resp.Body.Len(), check.Equals, len(trial.expectBody))
+ if resp.Body.Len() > 1000 {
+ c.Check(resp.Body.String()[:1000]+"[...]", check.Equals, trial.expectBody)
+ } else {
+ c.Check(resp.Body.String(), check.Equals, trial.expectBody)
+ }
+ } else {
+ c.Check(resp.Code, check.Equals, http.StatusRequestedRangeNotSatisfiable)
+ }
+ }
+}
package main
import (
- "encoding/json"
"errors"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/coreos/go-systemd/daemon"
+ "github.com/ghodss/yaml"
"github.com/gorilla/mux"
)
var cfgPath string
const defaultCfgPath = "/etc/arvados/keepproxy/keepproxy.yml"
flagset.StringVar(&cfgPath, "config", defaultCfgPath, "Configuration file `path`")
+ dumpConfig := flagset.Bool("dump-config", false, "write current configuration to stdout and exit")
flagset.Parse(os.Args[1:])
err := config.LoadFile(cfg, cfgPath)
if regexp.MustCompile("^(?i:1|yes|true)$").MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")) {
cfg.Client.Insecure = true
}
- if j, err := json.MarshalIndent(cfg, "", " "); err == nil {
- log.Print("Current configuration:\n", string(j))
+ if y, err := yaml.Marshal(cfg); err == nil && !*dumpConfig {
+ log.Print("Current configuration:\n", string(y))
}
cfg.Timeout = arvados.Duration(time.Duration(*timeoutSeconds) * time.Second)
}
+ if *dumpConfig {
+ log.Fatal(config.DumpAndExit(cfg))
+ }
+
arv, err := arvadosclient.New(&cfg.Client)
if err != nil {
log.Fatalf("Error setting up arvados client %s", err.Error())
"git.curoverse.com/arvados.git/sdk/go/keepclient"
log "github.com/Sirupsen/logrus"
"github.com/coreos/go-systemd/daemon"
- "github.com/ghodss/yaml"
)
// A Keep "block" is 64MB.
}
if *dumpConfig {
- y, err := yaml.Marshal(theConfig)
- if err != nil {
- log.Fatal(err)
- }
- os.Stdout.Write(y)
- os.Exit(0)
+ log.Fatal(config.DumpAndExit(theConfig))
}
err = theConfig.Start()