Merge branch '10700-dispatch'
authorTom Clegg <tom@curoverse.com>
Tue, 31 Jan 2017 21:32:05 +0000 (16:32 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 31 Jan 2017 21:32:05 +0000 (16:32 -0500)
closes #10700
  refs #10701
closes #10702
closes #10703
closes #10704

Conflicts:
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go

45 files changed:
apps/workbench/app/controllers/collections_controller.rb
apps/workbench/app/controllers/jobs_controller.rb
apps/workbench/app/controllers/pipeline_instances_controller.rb
build/build.list
build/package-build-dockerfiles/Makefile
build/package-build-dockerfiles/ubuntu1604/Dockerfile [new file with mode: 0644]
build/package-test-dockerfiles/ubuntu1604/Dockerfile [new file with mode: 0644]
build/package-test-dockerfiles/ubuntu1604/etc-apt-preferences.d-arvados [new file with mode: 0644]
build/package-testing/deb-common-test-packages.sh
build/package-testing/test-packages-ubuntu1604.sh [moved from build/package-testing/test-packages-debian7.sh with 100% similarity]
build/run-build-packages-one-target.sh
build/run-build-packages.sh
doc/_includes/_navbar_top.liquid
doc/user/index.html.textile.liquid
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/wf/scatter2_subwf.cwl
sdk/go/config/dump.go [new file with mode: 0644]
sdk/go/keepclient/block_cache.go [new file with mode: 0644]
sdk/go/keepclient/collectionreader.go
sdk/go/keepclient/collectionreader_test.go
sdk/go/keepclient/keepclient.go
sdk/python/arvados/arvfile.py
sdk/python/arvados/commands/put.py
services/api/app/models/node.rb
services/api/test/unit/node_test.rb
services/arv-git-httpd/main.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/keep-balance/main.go
services/keep-web/handler.go
services/keep-web/handler_test.go
services/keep-web/main.go
services/keep-web/ranges_test.go [new file with mode: 0644]
services/keepproxy/keepproxy.go
services/keepstore/keepstore.go

index 46dcab6dce38487a7bb938472e91f209f4e926e2..be58b91871f1a7a51fdeb28dc854425af9d8e68a 100644 (file)
@@ -220,7 +220,7 @@ class CollectionsController < ApplicationController
     if params["tab_pane"] == "Provenance_graph"
       @prov_svg = ProvenanceHelper::create_provenance_graph(@object.provenance, "provenance_svg",
                                                             {:request => request,
-                                                             :direction => :bottom_up,
+                                                             :direction => :top_down,
                                                              :combine_jobs => :script_only}) rescue nil
     end
 
@@ -261,9 +261,9 @@ class CollectionsController < ApplicationController
         if params["tab_pane"] == "Used_by"
           @used_by_svg = ProvenanceHelper::create_provenance_graph(@object.used_by, "used_by_svg",
                                                                    {:request => request,
-                                                                     :direction => :top_down,
-                                                                     :combine_jobs => :script_only,
-                                                                     :pdata_only => true}) rescue nil
+                                                                    :direction => :top_down,
+                                                                    :combine_jobs => :script_only,
+                                                                    :pdata_only => true}) rescue nil
         end
       end
     end
index f18a79d646c4a0a1dc774e52f0c2d4da1c8f9346..c39b7c4317d56fdcd7d31f95116e23572bcbec49 100644 (file)
@@ -30,6 +30,7 @@ class JobsController < ApplicationController
 
     @svg = ProvenanceHelper::create_provenance_graph nodes, "provenance_svg", {
       :request => request,
+      :direction => :top_down,
       :all_script_parameters => true,
       :script_version_nodes => true}
   end
index 83fe0dda4645a0437a962aa95e9572c9c897afe2..d68c5795d012f6703ea96e6944fedac12388ed18 100644 (file)
@@ -188,6 +188,7 @@ class PipelineInstancesController < ApplicationController
     if provenance
       @prov_svg = ProvenanceHelper::create_provenance_graph provenance, "provenance_svg", {
         :request => request,
+        :direction => :top_down,
         :all_script_parameters => true,
         :combine_jobs => :script_and_version,
         :pips => pips,
index 554bde475393f7603309e32b7f42218b73bda566..9e6317a41e3cf44e2ce9cab6abfb06923035a74b 100644 (file)
@@ -1,33 +1,33 @@
 #distribution(s)|name|version|iteration|type|architecture|extra fpm arguments
 debian8,ubuntu1204,centos7|python-gflags|2.0|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|google-api-python-client|1.4.2|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|oauth2client|1.5.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|google-api-python-client|1.4.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|oauth2client|1.5.2|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|pyasn1|0.1.7|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|pyasn1-modules|0.0.5|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|rsa|3.4.2|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|uritemplate|3.0.0|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|httplib2|0.9.2|3|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rsa|3.4.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|uritemplate|3.0.0|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|httplib2|0.9.2|3|python|all
 debian8,ubuntu1204,centos7|ws4py|0.3.5|2|python|all
 debian8,ubuntu1204,centos7|pykka|1.2.1|2|python|all
 debian8,ubuntu1204,ubuntu1404|six|1.10.0|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|ciso8601|1.0.3|3|python|amd64
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|ciso8601|1.0.3|3|python|amd64
 debian8,ubuntu1204,centos7|pycrypto|2.6.1|3|python|amd64
-debian8,ubuntu1204,ubuntu1404|backports.ssl_match_hostname|3.5.0.1|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604|backports.ssl_match_hostname|3.5.0.1|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|llfuse|0.41.1|3|python|amd64
-debian8,ubuntu1204,ubuntu1404,centos7|pycurl|7.19.5.3|3|python|amd64
-debian8,ubuntu1204,ubuntu1404,centos7|pyyaml|3.12|2|python|amd64
-debian8,ubuntu1204,ubuntu1404,centos7|rdflib|4.2.1|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pycurl|7.19.5.3|3|python|amd64
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pyyaml|3.12|2|python|amd64
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rdflib|4.2.1|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|shellescape|3.4.1|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|mistune|0.7.3|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|typing|3.5.3.0|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|mistune|0.7.3|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|typing|3.5.3.0|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|avro|1.8.1|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|ruamel.ordereddict|0.4.9|2|python|amd64
-debian8,ubuntu1204,ubuntu1404,centos7|cachecontrol|0.11.7|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|pathlib2|2.1.0|2|python|all
-debian8,ubuntu1204,ubuntu1404,centos7|docker-py|1.7.2|2|python3|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|cachecontrol|0.11.7|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pathlib2|2.1.0|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|docker-py|1.7.2|2|python3|all
 debian8,ubuntu1204,centos7|six|1.10.0|2|python3|all
 debian8,ubuntu1204,ubuntu1404,centos7|requests|2.12.4|2|python3|all
-debian8,ubuntu1204,ubuntu1404,centos7|websocket-client|0.37.0|2|python3|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|websocket-client|0.37.0|2|python3|all
 ubuntu1204|requests|2.12.4|2|python|all
 ubuntu1204,centos7|contextlib2|0.5.4|2|python|all
 ubuntu1204,centos7|isodate|0.5.4|2|python|all
@@ -37,7 +37,7 @@ centos7|pyparsing|2.1.10|2|python|all
 centos7|sparqlwrapper|1.8.0|2|python|all
 centos7|html5lib|0.9999999|2|python|all
 centos7|keepalive|0.5|2|python|all
-all|lockfile|0.12.2|2|python|all|--epoch 1
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|lockfile|0.12.2|2|python|all|--epoch 1
 all|ruamel.yaml|0.13.7|2|python|amd64|--python-setup-py-arguments --single-version-externally-managed
 all|cwltest|1.0.20160907111242|3|python|all|--depends 'python-futures >= 3.0.5'
 all|rdflib-jsonld|0.4.0|2|python|all
index 9987e9e0eb8565e7211c70a29384befd5d9ab2d5..5fae9dc589412ccdd449335ae1a82d41065c232d 100644 (file)
@@ -1,4 +1,4 @@
-all: centos7/generated debian8/generated ubuntu1204/generated ubuntu1404/generated
+all: centos7/generated debian8/generated ubuntu1204/generated ubuntu1404/generated ubuntu1604/generated
 
 centos7/generated: common-generated-all
        test -d centos7/generated || mkdir centos7/generated
@@ -16,6 +16,10 @@ ubuntu1404/generated: common-generated-all
        test -d ubuntu1404/generated || mkdir ubuntu1404/generated
        cp -rlt ubuntu1404/generated common-generated/*
 
+ubuntu1604/generated: common-generated-all
+       test -d ubuntu1604/generated || mkdir ubuntu1604/generated
+       cp -rlt ubuntu1604/generated common-generated/*
+
 GOTARBALL=go1.7.1.linux-amd64.tar.gz
 
 common-generated-all: common-generated/$(GOTARBALL)
diff --git a/build/package-build-dockerfiles/ubuntu1604/Dockerfile b/build/package-build-dockerfiles/ubuntu1604/Dockerfile
new file mode 100644 (file)
index 0000000..b3ac60f
--- /dev/null
@@ -0,0 +1,20 @@
+FROM ubuntu:xenial
+MAINTAINER Ward Vandewege <ward@curoverse.com>
+
+# Install dependencies and set up system.
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev libgnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip
+
+# Install RVM
+RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+    curl -L https://get.rvm.io | bash -s stable && \
+    /usr/local/rvm/bin/rvm install 2.3.3 && \
+    /usr/local/rvm/bin/rvm alias create default ruby-2.3.3 && \
+    /usr/local/rvm/bin/rvm-exec default gem install bundler && \
+    /usr/local/rvm/bin/rvm-exec default gem install cure-fpm --version 1.6.0b
+
+# Install golang binary
+ADD generated/go1.7.1.linux-amd64.tar.gz /usr/local/
+RUN ln -s /usr/local/go/bin/go /usr/local/bin/
+
+ENV WORKSPACE /arvados
+CMD ["/usr/local/rvm/bin/rvm-exec", "default", "bash", "/jenkins/run-build-packages.sh", "--target", "ubuntu1604"]
diff --git a/build/package-test-dockerfiles/ubuntu1604/Dockerfile b/build/package-test-dockerfiles/ubuntu1604/Dockerfile
new file mode 100644 (file)
index 0000000..87d19cd
--- /dev/null
@@ -0,0 +1,20 @@
+FROM ubuntu:xenial
+MAINTAINER Ward Vandewege <ward@curoverse.com>
+
+# Install RVM
+RUN apt-get update && \
+    DEBIAN_FRONTEND=noninteractive apt-get -y install --no-install-recommends curl ca-certificates && \
+    gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+    curl -L https://get.rvm.io | bash -s stable && \
+    /usr/local/rvm/bin/rvm install 2.3 && \
+    /usr/local/rvm/bin/rvm alias create default ruby-2.3
+
+# udev daemon can't start in a container, so don't try.
+RUN mkdir -p /etc/udev/disabled
+
+RUN echo "deb file:///arvados/packages/ubuntu1604/ /" >>/etc/apt/sources.list
+
+# Add preferences file for the Arvados packages. This pins Arvados
+# packages at priority 501, so that older python dependency versions
+# are preferred in those cases where we need them
+ADD etc-apt-preferences.d-arvados /etc/apt/preferences.d/arvados
diff --git a/build/package-test-dockerfiles/ubuntu1604/etc-apt-preferences.d-arvados b/build/package-test-dockerfiles/ubuntu1604/etc-apt-preferences.d-arvados
new file mode 100644 (file)
index 0000000..9e24695
--- /dev/null
@@ -0,0 +1,3 @@
+Package: *
+Pin: release o=Arvados
+Pin-Priority: 501
index 5f32a606dac3ee43e75c53fad95b75b82767afc0..58fc57e0b3ea43e84a38f608f8208051c59e642a 100755 (executable)
@@ -11,7 +11,7 @@ export ARV_PACKAGES_DIR="/arvados/packages/$target"
 dpkg-query --show > "$ARV_PACKAGES_DIR/$1.before"
 
 apt-get -qq update
-apt-get --assume-yes --force-yes install "$1"
+apt-get --assume-yes --allow-unauthenticated install "$1"
 
 dpkg-query --show > "$ARV_PACKAGES_DIR/$1.after"
 
index 6a1ec9ca6d75b622abe37cb71b7773bd3efd60b6..685ca5156466135856fddcc90535ae770c5c3952 100755 (executable)
@@ -11,7 +11,7 @@ Syntax:
 --command
     Build command to execute (default: use built-in Docker image command)
 --test-packages
-    Run package install test script "test-packages-$target.sh"
+    Run package install test script "test-packages-[target].sh"
 --debug
     Output debug information (default: false)
 --only-build <package>
@@ -99,7 +99,8 @@ if [[ -n "$test_packages" ]]; then
 
     if [[ -n "$(find $WORKSPACE/packages/$TARGET -name '*.deb')" ]] ; then
         (cd $WORKSPACE/packages/$TARGET
-         dpkg-scanpackages .  2> >(grep -v 'warning' 1>&2) | gzip -c > Packages.gz
+          dpkg-scanpackages .  2> >(grep -v 'warning' 1>&2) | tee Packages | gzip -c > Packages.gz
+          apt-ftparchive -o APT::FTPArchive::Release::Origin=Arvados release . > Release
         )
     fi
 
index 7840b3c2275c6fc832c6611e51eeae4493ed8675..37e963b6474eea96e939015a86e09aaf03a2d13b 100755 (executable)
@@ -112,6 +112,9 @@ case "$TARGET" in
     ubuntu1404)
         FORMAT=deb
         ;;
+    ubuntu1604)
+        FORMAT=deb
+        ;;
     centos7)
         FORMAT=rpm
         PYTHON2_PACKAGE=$(rpm -qf "$(which python$PYTHON2_VERSION)" --queryformat '%{NAME}\n')
index 43cc99558fd795b1b72540f1795dae0354245b6e..6caf36a18882115027c288717a74146b8281dd49 100644 (file)
@@ -7,7 +7,7 @@
         <span class="icon-bar"></span>
         <span class="icon-bar"></span>
       </button>
-      <a class="navbar-brand" href="{{ site.baseurl }}/">Arvados Docs</a>
+      <a class="navbar-brand" href="{{ site.baseurl }}/">Arvados&trade; Docs</a>
     </div>
     <div class="collapse navbar-collapse" id="bs-navbar-collapse">
       <ul class="nav navbar-nav">
index 2d2120268aeecba5a532b1e4903151f714763988..8e97d9480a933edcd7f070a331ef1cc49eb1af03 100644 (file)
@@ -1,7 +1,7 @@
 ---
 layout: default
 navsection: userguide
-title: Welcome to Arvados!
+title: Welcome to Arvados&trade;!
 ...
 
 _If you are new to Arvados, please try the Quickstart on <a href="http://doc.arvados.org">the documentation homepage</a> instead of this detailed User Guide._
index 13135d0b68499ae5944e3e0f7dd56d30da0412a4..2842e8a114e2e4159c3da97f88b54de28ba3e369 100644 (file)
@@ -680,9 +680,11 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     if arvargs.debug:
         logger.setLevel(logging.DEBUG)
+        logging.getLogger('arvados').setLevel(logging.DEBUG)
 
     if arvargs.quiet:
         logger.setLevel(logging.WARN)
+        logging.getLogger('arvados').setLevel(logging.WARN)
         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
     if arvargs.metrics:
index 235e9b8dfac00d7c73e4072222f8c79068f0a181..4c4db171f50bdfb4d80bee9b1f2dc473b326068c 100644 (file)
@@ -129,10 +129,11 @@ class ArvadosContainer(object):
             self.uuid = response["uuid"]
             self.arvrunner.processes[self.uuid] = self
 
-            logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-
             if response["state"] == "Final":
+                logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
                 self.done(response)
+            else:
+                logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
         except Exception as e:
             logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
             self.output_callback({}, "permanentFail")
index 88c5dd2d4f428e946602c4eaeb5c59c1e4e4a2e6..f83add9379c176d5cb9ad472c0778c68174058b5 100644 (file)
@@ -1,6 +1,7 @@
 import logging
 import sys
 import threading
+import copy
 
 from schema_salad.sourceline import SourceLine
 
@@ -17,6 +18,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
 
     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
+        dockerRequirement = copy.deepcopy(dockerRequirement)
         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
         if hasattr(dockerRequirement, 'lc'):
             dockerRequirement.lc.data["dockerImageId"] = dockerRequirement.lc.data["dockerPull"]
index 7b318026d57a0da8ef3caa5a4b91f768e824431a..b7f72a97d69eccd5d872861c197b63b9ba9873a6 100644 (file)
@@ -18,7 +18,7 @@ import ruamel.yaml as yaml
 import arvados.collection
 
 from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing, upload_workflow_collection
 from .pathmapper import InitialWorkDirPathMapper
 from .perf import Perf
 from . import done
@@ -141,11 +141,12 @@ class ArvadosJob(object):
 
             self.update_pipeline_component(response)
 
-            logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-
-            if response["state"] in ("Complete", "Failed", "Cancelled"):
+            if response["state"] == "Complete":
+                logger.info("%s reused job %s", self.arvrunner.label(self), response["uuid"])
                 with Perf(metrics, "done %s" % self.name):
                     self.done(response)
+            else:
+                logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
         except Exception as e:
             logger.exception("%s error" % (self.arvrunner.label(self)))
             self.output_callback({}, "permanentFail")
@@ -236,30 +237,6 @@ class ArvadosJob(object):
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
 
-    def upload_workflow_collection(self, packed):
-        collection = arvados.collection.Collection(api_client=self.arvrunner.api,
-                                                   keep_client=self.arvrunner.keep_client,
-                                                   num_retries=self.arvrunner.num_retries)
-        with collection.open("workflow.cwl", "w") as f:
-            f.write(yaml.round_trip_dump(packed))
-
-        filters = [["portable_data_hash", "=", collection.portable_data_hash()],
-                   ["name", "like", self.name+"%"]]
-        if self.arvrunner.project_uuid:
-            filters.append(["owner_uuid", "=", self.arvrunner.project_uuid])
-        exists = self.arvrunner.api.collections().list(filters=filters).execute(num_retries=self.arvrunner.num_retries)
-
-        if exists["items"]:
-            logger.info("Using collection %s", exists["items"][0]["uuid"])
-        else:
-            collection.save_new(name=self.name,
-                                owner_uuid=self.arvrunner.project_uuid,
-                                ensure_unique_name=True,
-                                num_retries=self.arvrunner.num_retries)
-            logger.info("Uploaded to %s", collection.manifest_locator())
-
-        return collection.portable_data_hash()
-
     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
         """Create an Arvados job specification for this workflow.
 
@@ -272,7 +249,7 @@ class RunnerJob(Runner):
             self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
         else:
             packed = packed_workflow(self.arvrunner, self.tool)
-            wf_pdh = self.upload_workflow_collection(packed)
+            wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
             self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
 
         adjustDirObjs(self.job_order, trim_listing)
index cd7e41a906b8fdacdea740a26d188a933348d9d9..4db1f4f2f4d8f48739bd7ef525e9526bdd2c3b4c 100644 (file)
@@ -13,7 +13,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 
 import ruamel.yaml as yaml
 
-from .runner import upload_dependencies, trim_listing, packed_workflow
+from .runner import upload_dependencies, trim_listing, packed_workflow, upload_workflow_collection
 from .arvtool import ArvadosCommandTool
 from .perf import Perf
 
@@ -56,6 +56,13 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
         call = arvRunner.api.workflows().create(body=body)
     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
 
+def dedup_reqs(reqs):
+    dedup = {}
+    for r in reversed(reqs):
+        if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
+            dedup[r["class"]] = r
+    return [dedup[r] for r in sorted(dedup.keys())]
+
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
 
@@ -63,6 +70,7 @@ class ArvadosWorkflow(Workflow):
         super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
         self.arvrunner = arvrunner
         self.work_api = kwargs["work_api"]
+        self.wf_pdh = None
 
     def job(self, joborder, output_callback, **kwargs):
         kwargs["work_api"] = self.work_api
@@ -74,17 +82,6 @@ class ArvadosWorkflow(Workflow):
             document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
 
             with Perf(metrics, "subworkflow upload_deps"):
-                workflowobj["requirements"] = self.requirements + workflowobj.get("requirements", [])
-                workflowobj["hints"] = self.hints + workflowobj.get("hints", [])
-                packed = pack(document_loader, workflowobj, uri, self.metadata)
-
-                upload_dependencies(self.arvrunner,
-                                    kwargs.get("name", ""),
-                                    document_loader,
-                                    packed,
-                                    uri,
-                                    False)
-
                 upload_dependencies(self.arvrunner,
                                     os.path.basename(joborder.get("id", "#")),
                                     document_loader,
@@ -92,6 +89,19 @@ class ArvadosWorkflow(Workflow):
                                     joborder.get("id", "#"),
                                     False)
 
+                if self.wf_pdh is None:
+                    workflowobj["requirements"] = dedup_reqs(self.requirements)
+                    workflowobj["hints"] = dedup_reqs(self.hints)
+
+                    packed = pack(document_loader, workflowobj, uri, self.metadata)
+
+                    upload_dependencies(self.arvrunner,
+                                        kwargs.get("name", ""),
+                                        document_loader,
+                                        packed,
+                                        uri,
+                                        False)
+
             with Perf(metrics, "subworkflow adjust"):
                 joborder_keepmount = copy.deepcopy(joborder)
 
@@ -111,8 +121,11 @@ class ArvadosWorkflow(Workflow):
 
                 adjustFileObjs(joborder_keepmount, keepmount)
                 adjustDirObjs(joborder_keepmount, keepmount)
-                adjustFileObjs(packed, keepmount)
-                adjustDirObjs(packed, keepmount)
+
+                if self.wf_pdh is None:
+                    adjustFileObjs(packed, keepmount)
+                    adjustDirObjs(packed, keepmount)
+                    self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
 
             wf_runner = cmap({
                 "class": "CommandLineTool",
@@ -125,10 +138,13 @@ class ArvadosWorkflow(Workflow):
                     "class": "InitialWorkDirRequirement",
                     "listing": [{
                             "entryname": "workflow.cwl",
-                            "entry": yaml.round_trip_dump(packed).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+                            "entry": {
+                                "class": "File",
+                                "location": "keep:%s/workflow.cwl" % self.wf_pdh
+                            }
                         }, {
                             "entryname": "cwl.input.yml",
-                            "entry": yaml.round_trip_dump(joborder_keepmount).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+                            "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
                         }]
                 }],
                 "hints": workflowobj["hints"],
index a6b3d15e2c503af7bab06eaf0bd7407f5975b9fe..63d36f5ea9057769aa4904426ac4401f2f7633d0 100644 (file)
@@ -85,8 +85,12 @@ class ArvPathMapper(PathMapper):
         # type: (List[Any], unicode) -> None
         uploadfiles = set()
 
-        for k,v in self.arvrunner.get_uploaded().iteritems():
-            self._pathmap[k] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+        already_uploaded = self.arvrunner.get_uploaded()
+        for k in referenced_files:
+            loc = k["location"]
+            if loc in already_uploaded:
+                v = already_uploaded[loc]
+                self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
 
         for srcobj in referenced_files:
             self.visit(srcobj, uploadfiles)
index d3e0a0e1070869fd0249946bf414ec5a8f45608b..49047fa14328271106608721c9fa22099dfcb5ae 100644 (file)
@@ -123,7 +123,8 @@ def upload_dependencies(arvrunner, name, document_loader,
 
 
 def upload_docker(arvrunner, tool):
-    """Visitor which uploads Docker images referenced in CommandLineTool objects."""
+    """Uploads Docker images used in CommandLineTool objects."""
+
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
         if docker_req:
@@ -132,6 +133,9 @@ def upload_docker(arvrunner, tool):
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+    elif isinstance(tool, cwltool.workflow.Workflow):
+        for s in tool.steps:
+            upload_docker(arvrunner, s.embedded_tool)
 
 def packed_workflow(arvrunner, tool):
     """Create a packed workflow.
@@ -189,7 +193,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
 
 def upload_workflow_deps(arvrunner, tool):
     # Ensure that Docker images needed by this workflow are available
-    tool.visit(partial(upload_docker, arvrunner))
+
+    upload_docker(arvrunner, tool)
 
     document_loader = tool.doc_loader
 
@@ -215,6 +220,31 @@ def arvados_jobs_image(arvrunner, img):
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
     return img
 
+def upload_workflow_collection(arvrunner, name, packed):
+    collection = arvados.collection.Collection(api_client=arvrunner.api,
+                                               keep_client=arvrunner.keep_client,
+                                               num_retries=arvrunner.num_retries)
+    with collection.open("workflow.cwl", "w") as f:
+        f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
+
+    filters = [["portable_data_hash", "=", collection.portable_data_hash()],
+               ["name", "like", name+"%"]]
+    if arvrunner.project_uuid:
+        filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+    exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
+
+    if exists["items"]:
+        logger.info("Using collection %s", exists["items"][0]["uuid"])
+    else:
+        collection.save_new(name=name,
+                            owner_uuid=arvrunner.project_uuid,
+                            ensure_unique_name=True,
+                            num_retries=arvrunner.num_retries)
+        logger.info("Uploaded to %s", collection.manifest_locator())
+
+    return collection.portable_data_hash()
+
+
 class Runner(object):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
index c3a5bcd7a4ccc749e6bf1d30d0841df47dea8e82..178b5e85cfa7cf477ad41e2da1b221a075ad4053 100644 (file)
@@ -49,7 +49,7 @@ setup(name='arvados-cwl-runner',
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
           'cwltool==1.0.20170119234115',
-          'schema-salad==2.2.20170119151016',
+          'schema-salad==2.2.20170126160727',
           'ruamel.yaml==0.13.7',
           'arvados-python-client>=0.1.20170112173420',
           'setuptools'
index 8aafb4a3c87c05cc0b7b08d4574852157e40701e..076514b1e96f48b6ae887bf8b1e6f2c0893123bc 100644 (file)
@@ -306,7 +306,10 @@ class TestWorkflow(unittest.TestCase):
             find_or_create=True)
 
         mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
-        mockcollection().open().__enter__().write.assert_has_calls([mock.call('sleeptime: 5')])
+        mockcollection().open().__enter__().write.assert_has_calls([mock.call(
+'''{
+  "sleeptime": 5
+}''')])
 
     def test_default_work_api(self):
         arvados_cwl.add_arv_hints()
index eaddca0acdf12b56606063ecafd51864dc1614c9..bbff612a64ce0869b2e85fd1478236593d663209 100644 (file)
@@ -124,7 +124,7 @@ def stubs(func):
                     'class': 'Directory'
                 },
                 'cwl:tool':
-                'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main'
+                '4db32e8a15aa48ea084b2f38108f406d+60/workflow.cwl#main'
             },
             'repository': 'arvados',
             'script_version': 'master',
@@ -146,7 +146,7 @@ def stubs(func):
                               'listing': [
                                   {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
                               ]}},
-                        'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main',
+                        'cwl:tool': '4db32e8a15aa48ea084b2f38108f406d+60/workflow.cwl#main',
                         'arv:enable_reuse': True,
                         'arv:on_error': 'continue'
                     },
@@ -233,9 +233,10 @@ def stubs(func):
 
 
 class TestSubmit(unittest.TestCase):
+    @mock.patch("arvados_cwl.runner.arv_docker_get_image")
     @mock.patch("time.sleep")
     @stubs
-    def test_submit(self, stubs, tm):
+    def test_submit(self, stubs, tm, arvdock):
         capture_stdout = cStringIO.StringIO()
         exited = arvados_cwl.main(
             ["--submit", "--no-wait", "--api=jobs", "--debug",
@@ -260,6 +261,11 @@ class TestSubmit(unittest.TestCase):
             }), ensure_unique_name=True),
             mock.call().execute()])
 
+        arvdock.assert_has_calls([
+            mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8"}, True, None),
+            mock.call(stubs.api, {'dockerPull': 'arvados/jobs:'+arvados_cwl.__version__}, True, None)
+        ])
+
         expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
         stubs.api.pipeline_instances().create.assert_called_with(
             body=JsonDiffMatcher(expect_pipeline))
@@ -1121,7 +1127,7 @@ class TestTemplateInputs(unittest.TestCase):
                 },
                 'script_parameters': {
                     'cwl:tool':
-                    '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl#main',
+                    '00e281847a33e1c0df93161d70a6fc5d+60/workflow.cwl#main',
                     'optionalFloatInput': None,
                     'fileInput': {
                         'type': 'File',
index daf18b11ca79cf6ddbb6892d331270df0bc2e9a0..df4d992c359a42eeb619826aa14ce4b87aeab19d 100644 (file)
@@ -1,41 +1,77 @@
-cwlVersion: v1.0
-$graph:
-- class: Workflow
-  id: '#main'
-  inputs:
-  - type: int
-    id: '#main/sleeptime'
-  outputs:
-  - type: string
-    outputSource: '#main/sleep1/out'
-    id: '#main/out'
-  steps:
-  - in:
-    - valueFrom: |
-        ${
-          return String(inputs.sleeptime) + "b";
+{
+  "$graph": [
+    {
+      "class": "Workflow",
+      "hints": [],
+      "id": "#main",
+      "inputs": [
+        {
+          "id": "#main/sleeptime",
+          "type": "int"
         }
-      id: '#main/sleep1/blurb'
-    - source: '#main/sleeptime'
-      id: '#main/sleep1/sleeptime'
-    out: ['#main/sleep1/out']
-    run:
-      class: CommandLineTool
-      inputs:
-      - type: int
-        inputBinding: {position: 1}
-        id: '#main/sleep1/sleeptime'
-      outputs:
-      - type: string
-        outputBinding:
-          outputEval: out
-        id: '#main/sleep1/out'
-      baseCommand: sleep
-    id: '#main/sleep1'
-  requirements:
-  - {class: InlineJavascriptRequirement}
-  - {class: ScatterFeatureRequirement}
-  - {class: StepInputExpressionRequirement}
-  - {class: SubworkflowFeatureRequirement}
-  hints:
-  - class: http://arvados.org/cwl#RunInSingleContainer
\ No newline at end of file
+      ],
+      "outputs": [
+        {
+          "id": "#main/out",
+          "outputSource": "#main/sleep1/out",
+          "type": "string"
+        }
+      ],
+      "requirements": [
+        {
+          "class": "InlineJavascriptRequirement"
+        },
+        {
+          "class": "ScatterFeatureRequirement"
+        },
+        {
+          "class": "StepInputExpressionRequirement"
+        },
+        {
+          "class": "SubworkflowFeatureRequirement"
+        }
+      ],
+      "steps": [
+        {
+          "id": "#main/sleep1",
+          "in": [
+            {
+              "id": "#main/sleep1/blurb",
+              "valueFrom": "${\n  return String(inputs.sleeptime) + \"b\";\n}\n"
+            },
+            {
+              "id": "#main/sleep1/sleeptime",
+              "source": "#main/sleeptime"
+            }
+          ],
+          "out": [
+            "#main/sleep1/out"
+          ],
+          "run": {
+            "baseCommand": "sleep",
+            "class": "CommandLineTool",
+            "inputs": [
+              {
+                "id": "#main/sleep1/sleeptime",
+                "inputBinding": {
+                  "position": 1
+                },
+                "type": "int"
+              }
+            ],
+            "outputs": [
+              {
+                "id": "#main/sleep1/out",
+                "outputBinding": {
+                  "outputEval": "out"
+                },
+                "type": "string"
+              }
+            ]
+          }
+        }
+      ]
+    }
+  ],
+  "cwlVersion": "v1.0"
+}
\ No newline at end of file
diff --git a/sdk/go/config/dump.go b/sdk/go/config/dump.go
new file mode 100644 (file)
index 0000000..36c50eb
--- /dev/null
@@ -0,0 +1,27 @@
+package config
+
+import (
+       "errors"
+       "os"
+
+       "github.com/ghodss/yaml"
+)
+
+// DumpAndExit writes the given config to stdout as YAML. If an error
+// occurs, that error is returned. Otherwise, the program exits 0.
+//
+// Example:
+//
+//     log.Fatal(DumpAndExit(cfg))
+func DumpAndExit(cfg interface{}) error {
+       y, err := yaml.Marshal(cfg)
+       if err != nil {
+               return err
+       }
+       _, err = os.Stdout.Write(y)
+       if err != nil {
+               return err
+       }
+       os.Exit(0)
+       return errors.New("exit failed!?")
+}
diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
new file mode 100644 (file)
index 0000000..7d03b68
--- /dev/null
@@ -0,0 +1,104 @@
+package keepclient
+
+import (
+       "io/ioutil"
+       "sort"
+       "sync"
+       "time"
+)
+
+var DefaultBlockCache = &BlockCache{}
+
+type BlockCache struct {
+       // Maximum number of blocks to keep in the cache. If 0, a
+       // default size (currently 4) is used instead.
+       MaxBlocks int
+
+       cache     map[string]*cacheBlock
+       mtx       sync.Mutex
+       setupOnce sync.Once
+}
+
+const defaultMaxBlocks = 4
+
+// Sweep deletes the least recently used blocks from the cache until
+// there are no more than MaxBlocks left.
+func (c *BlockCache) Sweep() {
+       max := c.MaxBlocks
+       if max < defaultMaxBlocks {
+               max = defaultMaxBlocks
+       }
+       c.mtx.Lock()
+       defer c.mtx.Unlock()
+       if len(c.cache) <= max {
+               return
+       }
+       lru := make([]time.Time, 0, len(c.cache))
+       for _, b := range c.cache {
+               lru = append(lru, b.lastUse)
+       }
+       sort.Sort(sort.Reverse(timeSlice(lru)))
+       threshold := lru[max]
+       for loc, b := range c.cache {
+               if !b.lastUse.After(threshold) {
+                       delete(c.cache, loc)
+               }
+       }
+}
+
+// Get returns data from the cache, first retrieving it from Keep if
+// necessary.
+func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
+       c.setupOnce.Do(c.setup)
+       cacheKey := locator[:32]
+       c.mtx.Lock()
+       b, ok := c.cache[cacheKey]
+       if !ok || b.err != nil {
+               b = &cacheBlock{
+                       fetched: make(chan struct{}),
+                       lastUse: time.Now(),
+               }
+               c.cache[cacheKey] = b
+               go func() {
+                       rdr, _, _, err := kc.Get(locator)
+                       var data []byte
+                       if err == nil {
+                               data, err = ioutil.ReadAll(rdr)
+                       }
+                       c.mtx.Lock()
+                       b.data, b.err = data, err
+                       c.mtx.Unlock()
+                       close(b.fetched)
+                       go c.Sweep()
+               }()
+       }
+       c.mtx.Unlock()
+
+       // Wait (with mtx unlocked) for the fetch goroutine to finish,
+       // in case it hasn't already.
+       <-b.fetched
+
+       c.mtx.Lock()
+       b.lastUse = time.Now()
+       c.mtx.Unlock()
+       return b.data, b.err
+}
+
+func (c *BlockCache) setup() {
+       c.cache = make(map[string]*cacheBlock)
+}
+
+type timeSlice []time.Time
+
+func (ts timeSlice) Len() int { return len(ts) }
+
+func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
+
+func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
+
+type cacheBlock struct {
+       data    []byte
+       err     error
+       fetched chan struct{}
+       lastUse time.Time
+}
index 33bb58710e0c94e1cfa562b8bd1c56afff62a4d7..61fabefacdd5ee937faab81c792370bd8af2c675 100644 (file)
@@ -2,16 +2,20 @@ package keepclient
 
 import (
        "errors"
+       "fmt"
        "io"
        "os"
 
        "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
-// ReadCloserWithLen extends io.ReadCloser with a Len() method that
-// returns the total number of bytes available to read.
-type ReadCloserWithLen interface {
-       io.ReadCloser
+// A Reader implements, io.Reader, io.Seeker, and io.Closer, and has a
+// Len() method that returns the total number of bytes available to
+// read.
+type Reader interface {
+       io.Reader
+       io.Seeker
+       io.Closer
        Len() uint64
 }
 
@@ -31,10 +35,10 @@ const (
 // parameter when retrieving the collection record).
 var ErrNoManifest = errors.New("Collection has no manifest")
 
-// CollectionFileReader returns a ReadCloserWithLen that reads file
-// content from a collection. The filename must be given relative to
-// the root of the collection, without a leading "./".
-func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (ReadCloserWithLen, error) {
+// CollectionFileReader returns a Reader that reads file content from
+// a collection. The filename must be given relative to the root of
+// the collection, without a leading "./".
+func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (Reader, error) {
        mText, ok := collection["manifest_text"].(string)
        if !ok {
                return nil, ErrNoManifest
@@ -43,218 +47,137 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi
        return kc.ManifestFileReader(m, filename)
 }
 
-func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) {
-       rdrChan := make(chan *cfReader)
-       go kc.queueSegmentsToGet(m, filename, rdrChan)
-       r, ok := <-rdrChan
-       if !ok {
-               return nil, os.ErrNotExist
-       }
-       return r, nil
-}
-
-// Send segments for the specified file to r.toGet. Send a *cfReader
-// to rdrChan if the specified file is found (even if it's empty).
-// Then, close rdrChan.
-func (kc *KeepClient) queueSegmentsToGet(m manifest.Manifest, filename string, rdrChan chan *cfReader) {
-       defer close(rdrChan)
-
-       // q is a queue of FileSegments that we have received but
-       // haven't yet been able to send to toGet.
-       var q []*manifest.FileSegment
-       var r *cfReader
-       for seg := range m.FileSegmentIterByName(filename) {
-               if r == nil {
-                       // We've just discovered that the requested
-                       // filename does appear in the manifest, so we
-                       // can return a real reader (not nil) from
-                       // CollectionFileReader().
-                       r = newCFReader(kc)
-                       rdrChan <- r
-               }
-               q = append(q, seg)
-               r.totalSize += uint64(seg.Len)
-               // Send toGet as many segments as we can until it
-               // blocks.
-       Q:
-               for len(q) > 0 {
-                       select {
-                       case r.toGet <- q[0]:
-                               q = q[1:]
-                       default:
-                               break Q
-                       }
-               }
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (Reader, error) {
+       f := &file{
+               kc: kc,
        }
-       if r == nil {
-               // File not found.
-               return
+       err := f.load(m, filename)
+       if err != nil {
+               return nil, err
        }
-       close(r.countDone)
-       for _, seg := range q {
-               r.toGet <- seg
-       }
-       close(r.toGet)
+       return f, nil
 }
 
-type cfReader struct {
-       keepClient *KeepClient
-
-       // doGet() reads FileSegments from toGet, gets the data from
-       // Keep, and sends byte slices to toRead to be consumed by
-       // Read().
-       toGet chan *manifest.FileSegment
-
-       // toRead is a buffered channel, sized to fit one full Keep
-       // block. This lets us verify checksums without having a
-       // store-and-forward delay between blocks: by the time the
-       // caller starts receiving data from block N, cfReader is
-       // starting to fetch block N+1. A larger buffer would be
-       // useful for a caller whose read speed varies a lot.
-       toRead chan []byte
-
-       // bytes ready to send next time someone calls Read()
-       buf []byte
-
-       // Total size of the file being read. Not safe to read this
-       // until countDone is closed.
-       totalSize uint64
-       countDone chan struct{}
-
-       // First error encountered.
-       err error
-
-       // errNotNil is closed IFF err contains a non-nil error.
-       // Receiving from it will block until an error occurs.
-       errNotNil chan struct{}
+type file struct {
+       kc       *KeepClient
+       segments []*manifest.FileSegment
+       size     int64 // total file size
+       offset   int64 // current read offset
+
+       // current/latest segment accessed -- might or might not match pos
+       seg           *manifest.FileSegment
+       segStart      int64 // position of segment relative to file
+       segData       []byte
+       segNext       []*manifest.FileSegment
+       readaheadDone bool
+}
 
-       // rdrClosed is closed IFF the reader's Close() method has
-       // been called. Any goroutines associated with the reader will
-       // stop and free up resources when they notice this channel is
-       // closed.
-       rdrClosed chan struct{}
+// Close implements io.Closer.
+func (f *file) Close() error {
+       f.kc = nil
+       f.segments = nil
+       f.segData = nil
+       return nil
 }
 
-func (r *cfReader) Read(outbuf []byte) (int, error) {
-       if r.Error() != nil {
-               // Short circuit: the caller might as well find out
-               // now that we hit an error, even if there's buffered
-               // data we could return.
-               return 0, r.Error()
+// Read implements io.Reader.
+func (f *file) Read(buf []byte) (int, error) {
+       if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
+               // f.seg does not cover the current read offset
+               // (f.pos).  Iterate over f.segments to find the one
+               // that does.
+               f.seg = nil
+               f.segStart = 0
+               f.segData = nil
+               f.segNext = f.segments
+               for len(f.segNext) > 0 {
+                       seg := f.segNext[0]
+                       f.segNext = f.segNext[1:]
+                       segEnd := f.segStart + int64(seg.Len)
+                       if segEnd > f.offset {
+                               f.seg = seg
+                               break
+                       }
+                       f.segStart = segEnd
+               }
+               f.readaheadDone = false
+       }
+       if f.seg == nil {
+               return 0, io.EOF
        }
-       for len(r.buf) == 0 {
-               // Private buffer was emptied out by the last Read()
-               // (or this is the first Read() and r.buf is nil).
-               // Read from r.toRead until we get a non-empty slice
-               // or hit an error.
-               var ok bool
-               r.buf, ok = <-r.toRead
-               if r.Error() != nil {
-                       // Error encountered while waiting for bytes
-                       return 0, r.Error()
-               } else if !ok {
-                       // No more bytes to read, no error encountered
-                       return 0, io.EOF
+       if f.segData == nil {
+               data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
+               if err != nil {
+                       return 0, err
+               }
+               if len(data) < f.seg.Offset+f.seg.Len {
+                       return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
                }
+               f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
        }
-       // Copy as much as possible from our private buffer to the
-       // caller's buffer
-       n := len(r.buf)
-       if len(r.buf) > len(outbuf) {
-               n = len(outbuf)
+       // dataOff and dataLen denote a portion of f.segData
+       // corresponding to a portion of the file at f.offset.
+       dataOff := int(f.offset - f.segStart)
+       dataLen := f.seg.Len - dataOff
+
+       if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
+               // If we have already read more than just the first
+               // few bytes of this file, and we have already
+               // consumed a noticeable portion of this segment, and
+               // there's more data for this file in the next segment
+               // ... then there's a good chance we are going to need
+               // the data for that next segment soon. Start getting
+               // it into the cache now.
+               go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
+               f.readaheadDone = true
        }
-       copy(outbuf[:n], r.buf[:n])
-
-       // Next call to Read() will continue where we left off
-       r.buf = r.buf[n:]
 
+       n := len(buf)
+       if n > dataLen {
+               n = dataLen
+       }
+       copy(buf[:n], f.segData[dataOff:dataOff+n])
+       f.offset += int64(n)
        return n, nil
 }
 
-// Close releases resources. It returns a non-nil error if an error
-// was encountered by the reader.
-func (r *cfReader) Close() error {
-       close(r.rdrClosed)
-       return r.Error()
-}
-
-// Error returns an error if one has been encountered, otherwise
-// nil. It is safe to call from any goroutine.
-func (r *cfReader) Error() error {
-       select {
-       case <-r.errNotNil:
-               return r.err
+// Seek implements io.Seeker.
+func (f *file) Seek(offset int64, whence int) (int64, error) {
+       var want int64
+       switch whence {
+       case io.SeekStart:
+               want = offset
+       case io.SeekCurrent:
+               want = f.offset + offset
+       case io.SeekEnd:
+               want = f.size + offset
        default:
-               return nil
+               return f.offset, fmt.Errorf("invalid whence %d", whence)
+       }
+       if want < 0 {
+               return f.offset, fmt.Errorf("attempted seek to %d", want)
+       }
+       if want > f.size {
+               want = f.size
        }
+       f.offset = want
+       return f.offset, nil
 }
 
-// Len returns the total number of bytes in the file being read. If
-// necessary, it waits for manifest parsing to finish.
-func (r *cfReader) Len() uint64 {
-       // Wait for all segments to be counted
-       <-r.countDone
-       return r.totalSize
+// Len returns the file size in bytes.
+func (f *file) Len() uint64 {
+       return uint64(f.size)
 }
 
-func (r *cfReader) doGet() {
-       defer close(r.toRead)
-GET:
-       for fs := range r.toGet {
-               rdr, _, _, err := r.keepClient.Get(fs.Locator)
-               if err != nil {
-                       r.err = err
-                       close(r.errNotNil)
-                       return
-               }
-               var buf = make([]byte, fs.Offset+fs.Len)
-               _, err = io.ReadFull(rdr, buf)
-               errClosing := rdr.Close()
-               if err == nil {
-                       err = errClosing
-               }
-               if err != nil {
-                       r.err = err
-                       close(r.errNotNil)
-                       return
-               }
-               for bOff, bLen := fs.Offset, dataSliceSize; bOff < fs.Offset+fs.Len && bLen > 0; bOff += bLen {
-                       if bOff+bLen > fs.Offset+fs.Len {
-                               bLen = fs.Offset + fs.Len - bOff
-                       }
-                       select {
-                       case r.toRead <- buf[bOff : bOff+bLen]:
-                       case <-r.rdrClosed:
-                               // Reader is closed: no point sending
-                               // anything more to toRead.
-                               break GET
-                       }
-               }
-               // It is possible that r.rdrClosed is closed but we
-               // never noticed because r.toRead was also ready in
-               // every select{} above. Here we check before wasting
-               // a keepclient.Get() call.
-               select {
-               case <-r.rdrClosed:
-                       break GET
-               default:
-               }
+func (f *file) load(m manifest.Manifest, path string) error {
+       f.segments = nil
+       f.size = 0
+       for seg := range m.FileSegmentIterByName(path) {
+               f.segments = append(f.segments, seg)
+               f.size += int64(seg.Len)
        }
-       // In case we exited the above loop early: before returning,
-       // drain the toGet channel so its sender doesn't sit around
-       // blocking forever.
-       for range r.toGet {
+       if f.segments == nil {
+               return os.ErrNotExist
        }
-}
-
-func newCFReader(kc *KeepClient) (r *cfReader) {
-       r = new(cfReader)
-       r.keepClient = kc
-       r.rdrClosed = make(chan struct{})
-       r.errNotNil = make(chan struct{})
-       r.toGet = make(chan *manifest.FileSegment, 2)
-       r.toRead = make(chan []byte, (BLOCKSIZE+dataSliceSize-1)/dataSliceSize)
-       r.countDone = make(chan struct{})
-       go r.doGet()
-       return
+       return nil
 }
index be4f386ff229f7227ccdb03a722b9c3eeb63f8ff..6f49c590fdca4c1b1b4776562e3f5b78ee1bbe54 100644 (file)
@@ -121,6 +121,7 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
                {mt: mt, f: "segmented/frob", want: "frob"},
                {mt: mt, f: "segmented/oof", want: "oof"},
        } {
+               c.Logf("%#v", testCase)
                rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
                switch want := testCase.want.(type) {
                case error:
@@ -136,6 +137,23 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
                                c.Check(n, check.Equals, 0)
                                c.Check(err, check.Equals, io.EOF)
                        }
+
+                       for a := len(want) - 2; a >= 0; a-- {
+                               for b := a + 1; b <= len(want); b++ {
+                                       offset, err := rdr.Seek(int64(a), io.SeekStart)
+                                       c.Logf("...a=%d, b=%d", a, b)
+                                       c.Check(err, check.IsNil)
+                                       c.Check(offset, check.Equals, int64(a))
+                                       buf := make([]byte, b-a)
+                                       n, err := io.ReadFull(rdr, buf)
+                                       c.Check(n, check.Equals, b-a)
+                                       c.Check(string(buf), check.Equals, want[a:b])
+                               }
+                       }
+                       offset, err := rdr.Seek(-1, io.SeekStart)
+                       c.Check(err, check.NotNil)
+                       c.Check(offset, check.Equals, int64(len(want)))
+
                        c.Check(rdr.Close(), check.Equals, nil)
                }
        }
@@ -168,13 +186,16 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
 }
 
 func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
+       s.kc.BlockCache = &BlockCache{}
        s.kc.PutB([]byte("foo"))
+       s.kc.PutB([]byte("bar"))
+       s.kc.PutB([]byte("baz"))
 
        mt := ". "
-       for i := 0; i < 1000; i++ {
-               mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 "
+       for i := 0; i < 300; i++ {
+               mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3 "
        }
-       mt += "0:3000:foo1000.txt\n"
+       mt += "0:2700:foo900.txt\n"
 
        // Grab the stub server's lock, ensuring our cfReader doesn't
        // get anything back from its first call to kc.Get() before we
@@ -182,17 +203,16 @@ func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
        s.handler.lock <- struct{}{}
        opsBeforeRead := *s.handler.ops
 
-       rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt")
+       rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo900.txt")
        c.Assert(err, check.IsNil)
 
        firstReadDone := make(chan struct{})
        go func() {
-               rdr.Read(make([]byte, 6))
-               firstReadDone <- struct{}{}
+               n, err := rdr.Read(make([]byte, 3))
+               c.Check(n, check.Equals, 3)
+               c.Check(err, check.IsNil)
+               close(firstReadDone)
        }()
-       err = rdr.Close()
-       c.Assert(err, check.IsNil)
-       c.Assert(rdr.(*cfReader).Error(), check.IsNil)
 
        // Release the stub server's lock. The first GET operation will proceed.
        <-s.handler.lock
@@ -201,13 +221,11 @@ func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
        // received from the first GET.
        <-firstReadDone
 
-       // doGet() should close toRead before sending any more bufs to it.
-       if what, ok := <-rdr.(*cfReader).toRead; ok {
-               c.Errorf("Got %q, expected toRead to be closed", what)
-       }
+       err = rdr.Close()
+       c.Check(err, check.IsNil)
 
        // Stub should have handled exactly one GET request.
-       c.Assert(*s.handler.ops, check.Equals, opsBeforeRead+1)
+       c.Check(*s.handler.ops, check.Equals, opsBeforeRead+1)
 }
 
 func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
@@ -220,5 +238,5 @@ func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
                c.Check(err, check.NotNil)
                c.Check(err, check.Not(check.Equals), io.EOF)
        }
-       c.Check(rdr.Close(), check.NotNil)
+       c.Check(rdr.Close(), check.IsNil)
 }
index baf4bac02444170446c91a61c0b7469813bf308c..4f84afca61a413796fbb222eb5108473c449cfaf 100644 (file)
@@ -72,6 +72,7 @@ type KeepClient struct {
        lock               sync.RWMutex
        Client             *http.Client
        Retries            int
+       BlockCache         *BlockCache
 
        // set to 1 if all writable services are of disk type, otherwise 0
        replicasPerService int
@@ -406,6 +407,14 @@ func (kc *KeepClient) getSortedRoots(locator string) []string {
        return found
 }
 
+func (kc *KeepClient) cache() *BlockCache {
+       if kc.BlockCache != nil {
+               return kc.BlockCache
+       } else {
+               return DefaultBlockCache
+       }
+}
+
 type Locator struct {
        Hash  string
        Size  int      // -1 if data size is not known
index 8446fb6a2981c9ee6d02404a4a846a3a2daf8d38..edeb910570ed80af8d1b45589cc252f7b58e718f 100644 (file)
@@ -1070,12 +1070,15 @@ class ArvadosFile(object):
             return 0
 
     @synchronized
-    def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+    def manifest_text(self, stream_name=".", portable_locators=False,
+                      normalize=False, only_committed=False):
         buf = ""
         filestream = []
         for segment in self.segments:
             loc = segment.locator
-            if loc.startswith("bufferblock"):
+            if self.parent._my_block_manager().is_bufferblock(loc):
+                if only_committed:
+                    continue
                 loc = self._bufferblocks[loc].calculate_locator()
             if portable_locators:
                 loc = KeepLocator(loc).stripped()
index d57b2b775dfcfddea60d2f312f4dfefad95fe540..5b46ba75b70d864589f681f1619500def41781d5 100644 (file)
@@ -520,7 +520,7 @@ class ArvPutUploadJob(object):
                     manifest = self._local_collection.manifest_text()
                 else:
                     # Get the manifest text without comitting pending blocks
-                    manifest = self._local_collection.manifest_text(".", strip=False,
+                    manifest = self._local_collection.manifest_text(strip=False,
                                                                     normalize=False,
                                                                     only_committed=True)
                 # Update cache
index 18550204669c7cc6353d87cfc863bcbf3c4d876a..e0cdda15628fb78514beaa32269f22ed8793b6e3 100644 (file)
@@ -139,23 +139,21 @@ class Node < ArvadosModel
   end
 
   def dns_server_update
-    if hostname_changed? && hostname_was
+    if ip_address_changed? && ip_address
+      Node.where('id != ? and ip_address = ?',
+                 id, ip_address).each do |stale_node|
+        # One or more(!) stale node records have the same IP address
+        # as the new node. Clear the ip_address field on the stale
+        # nodes. Otherwise, we (via SLURM) might inadvertently connect
+        # to the new node using the old node's hostname.
+        stale_node.update_attributes!(ip_address: nil)
+      end
+    end
+    if hostname_was && hostname_changed?
       self.class.dns_server_update(hostname_was, UNUSED_NODE_IP)
     end
-    if hostname_changed? or ip_address_changed?
-      if ip_address
-        Node.where('id != ? and ip_address = ? and last_ping_at < ?',
-                   id, ip_address, 10.minutes.ago).each do |stale_node|
-          # One or more stale compute node records have the same IP
-          # address as the new node.  Clear the ip_address field on
-          # the stale nodes.
-          stale_node.ip_address = nil
-          stale_node.save!
-        end
-      end
-      if hostname
-        self.class.dns_server_update(hostname, ip_address || UNUSED_NODE_IP)
-      end
+    if hostname && (hostname_changed? || ip_address_changed?)
+      self.class.dns_server_update(hostname, ip_address || UNUSED_NODE_IP)
     end
   end
 
index df8c22baf4ad04a6a20b97c0f9c551a335a9fb96..c1e77f6a4d4cf78e6e3ac7ce77b4ffe5a2fd4c9e 100644 (file)
@@ -152,4 +152,31 @@ class NodeTest < ActiveSupport::TestCase
       node.update_attributes!(hostname: 'foo0', ip_address: '10.11.12.14')
     end
   end
+
+  test 'newest ping wins IP address conflict' do
+    act_as_system_user do
+      n1, n2 = Node.create!, Node.create!
+
+      n1.ping(ip: '10.5.5.5', ping_secret: n1.info['ping_secret'])
+      n1.reload
+
+      Node.expects(:dns_server_update).with(n1.hostname, Node::UNUSED_NODE_IP)
+      Node.expects(:dns_server_update).with(Not(equals(n1.hostname)), '10.5.5.5')
+      n2.ping(ip: '10.5.5.5', ping_secret: n2.info['ping_secret'])
+
+      n1.reload
+      n2.reload
+      assert_nil n1.ip_address
+      assert_equal '10.5.5.5', n2.ip_address
+
+      Node.expects(:dns_server_update).with(n2.hostname, Node::UNUSED_NODE_IP)
+      Node.expects(:dns_server_update).with(n1.hostname, '10.5.5.5')
+      n1.ping(ip: '10.5.5.5', ping_secret: n1.info['ping_secret'])
+
+      n1.reload
+      n2.reload
+      assert_nil n2.ip_address
+      assert_equal '10.5.5.5', n1.ip_address
+    end
+  end
 end
index 75645ff47215c2b5f7f666057006ea2510f99811..5b6ae61b51e2423db9ebb0a94c861a89d417c7f2 100644 (file)
@@ -44,6 +44,7 @@ func main() {
                "Value for GITOLITE_HTTP_HOME environment variable. If not empty, GL_BYPASS_ACCESS_CHECKS=1 will also be set."+deprecated)
 
        cfgPath := flag.String("config", defaultCfgPath, "Configuration file `path`.")
+       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
        flag.Usage = usage
        flag.Parse()
 
@@ -63,6 +64,10 @@ func main() {
                }
        }
 
+       if *dumpConfig {
+               log.Fatal(config.DumpAndExit(theConfig))
+       }
+
        srv := &server{}
        if err := srv.Start(); err != nil {
                log.Fatal(err)
index 60dc6071b40459ccbab6920ba80727917a69d9eb..1c080f36ac13133b12ad4308fb62d6f53549ded3 100644 (file)
@@ -6,17 +6,18 @@ import (
        "bytes"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/config"
-       "git.curoverse.com/arvados.git/sdk/go/dispatch"
-       "github.com/coreos/go-systemd/daemon"
        "log"
        "math"
        "os"
        "os/exec"
        "strings"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/dispatch"
+       "github.com/coreos/go-systemd/daemon"
 )
 
 // Config used by crunch-dispatch-slurm
@@ -58,6 +59,10 @@ func doMain() error {
                "config",
                defaultConfigPath,
                "`path` to JSON or YAML configuration file")
+       dumpConfig := flag.Bool(
+               "dump-config",
+               false,
+               "write current configuration to stdout and exit")
 
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
@@ -91,6 +96,10 @@ func doMain() error {
                log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
        }
 
+       if *dumpConfig {
+               log.Fatal(config.DumpAndExit(theConfig))
+       }
+
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Printf("Error making Arvados client: %v", err)
index 971cb3a27a246c9fbe1a325496a6da63e6e69ac4..0e979c1a4a35bf99bd78d327221843d8127e0f6e 100644 (file)
@@ -40,7 +40,7 @@ var ErrCancelled = errors.New("Cancelled")
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
-       ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+       ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
index b9856aca2964755a7fab0f9baf527084adff8532..e8b45c978d92d6860672af22f61bcaa33071cfd0 100644 (file)
@@ -245,7 +245,11 @@ func (fw FileWrapper) Len() uint64 {
        return fw.len
 }
 
-func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (fw FileWrapper) Seek(int64, int) (int64, error) {
+       return 0, errors.New("not implemented")
+}
+
+func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) {
        if filename == hwImageId+".tar" {
                rdr := ioutil.NopCloser(&bytes.Buffer{})
                client.Called = true
@@ -324,7 +328,7 @@ func (KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
        return "", 0, errors.New("KeepError")
 }
 
-func (KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) {
        return nil, errors.New("KeepError")
 }
 
@@ -348,7 +352,11 @@ func (ErrorReader) Len() uint64 {
        return 0
 }
 
-func (KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (ErrorReader) Seek(int64, int) (int64, error) {
+       return 0, errors.New("ErrorReader")
+}
+
+func (KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) {
        return ErrorReader{}, nil
 }
 
index 310c77a21c228c12779de5ac39c686cc08d7b624..04d3e9992b21f74289919dfeb5a67bda509e94f6 100644 (file)
@@ -64,7 +64,7 @@ type RunOptions struct {
 var debugf = func(string, ...interface{}) {}
 
 func main() {
-       var config Config
+       var cfg Config
        var runOptions RunOptions
 
        configPath := flag.String("config", defaultConfigPath,
@@ -78,19 +78,24 @@ func main() {
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
        flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
        dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
        debugFlag := flag.Bool("debug", false, "enable debug messages")
        flag.Usage = usage
        flag.Parse()
 
-       mustReadConfig(&config, *configPath)
+       mustReadConfig(&cfg, *configPath)
        if *serviceListPath != "" {
-               mustReadConfig(&config.KeepServiceList, *serviceListPath)
+               mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
+       }
+
+       if *dumpConfig {
+               log.Fatal(config.DumpAndExit(cfg))
        }
 
        if *debugFlag {
                debugf = log.Printf
-               if j, err := json.Marshal(config); err != nil {
+               if j, err := json.Marshal(cfg); err != nil {
                        log.Fatal(err)
                } else {
                        log.Printf("config is %s", j)
@@ -99,13 +104,13 @@ func main() {
        if *dumpFlag {
                runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
        }
-       err := CheckConfig(config, runOptions)
+       err := CheckConfig(cfg, runOptions)
        if err != nil {
                // (don't run)
        } else if runOptions.Once {
-               _, err = (&Balancer{}).Run(config, runOptions)
+               _, err = (&Balancer{}).Run(cfg, runOptions)
        } else {
-               err = RunForever(config, runOptions, nil)
+               err = RunForever(cfg, runOptions, nil)
        }
        if err != nil {
                log.Fatal(err)
index 11d0d96b298de5e4369474418f9f78583634510e..db7517adc68ee39ec2300e7763caad7cd5ab07c3 100644 (file)
@@ -4,14 +4,14 @@ import (
        "fmt"
        "html"
        "io"
-       "mime"
        "net/http"
        "net/url"
        "os"
-       "regexp"
+       "path"
        "strconv"
        "strings"
        "sync"
+       "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/auth"
@@ -335,51 +335,15 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
        defer rdr.Close()
 
-       basenamePos := strings.LastIndex(filename, "/")
-       if basenamePos < 0 {
-               basenamePos = 0
-       }
-       extPos := strings.LastIndex(filename, ".")
-       if extPos > basenamePos {
-               // Now extPos is safely >= 0.
-               if t := mime.TypeByExtension(filename[extPos:]); t != "" {
-                       w.Header().Set("Content-Type", t)
-               }
-       }
-       if rdr, ok := rdr.(keepclient.ReadCloserWithLen); ok {
-               w.Header().Set("Content-Length", fmt.Sprintf("%d", rdr.Len()))
-       }
+       basename := path.Base(filename)
+       applyContentDispositionHdr(w, r, basename, attachment)
 
-       applyContentDispositionHdr(w, r, filename[basenamePos:], attachment)
-       rangeRdr, statusCode := applyRangeHdr(w, r, rdr)
-
-       w.WriteHeader(statusCode)
-       _, err = io.Copy(w, rangeRdr)
+       modstr, _ := collection["modified_at"].(string)
+       modtime, err := time.Parse(time.RFC3339Nano, modstr)
        if err != nil {
-               statusCode, statusText = http.StatusBadGateway, err.Error()
-       }
-}
-
-var rangeRe = regexp.MustCompile(`^bytes=0-([0-9]*)$`)
-
-func applyRangeHdr(w http.ResponseWriter, r *http.Request, rdr keepclient.ReadCloserWithLen) (io.Reader, int) {
-       w.Header().Set("Accept-Ranges", "bytes")
-       hdr := r.Header.Get("Range")
-       fields := rangeRe.FindStringSubmatch(hdr)
-       if fields == nil {
-               return rdr, http.StatusOK
-       }
-       rangeEnd, err := strconv.ParseInt(fields[1], 10, 64)
-       if err != nil {
-               // Empty or too big for int64 == send entire content
-               return rdr, http.StatusOK
-       }
-       if uint64(rangeEnd) >= rdr.Len() {
-               return rdr, http.StatusOK
+               modtime = time.Now()
        }
-       w.Header().Set("Content-Length", fmt.Sprintf("%d", rangeEnd+1))
-       w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", 0, rangeEnd, rdr.Len()))
-       return &io.LimitedReader{R: rdr, N: rangeEnd + 1}, http.StatusPartialContent
+       http.ServeContent(w, r, basename, modtime, rdr)
 }
 
 func applyContentDispositionHdr(w http.ResponseWriter, r *http.Request, filename string, isAttachment bool) {
index b3e17e8b61db0a4f3dd88b48ae82e3c6f6579673..0c960b8c0e323e860b4f79ec032d879de9a92944 100644 (file)
@@ -350,49 +350,6 @@ func (s *IntegrationSuite) TestAnonymousTokenError(c *check.C) {
        )
 }
 
-func (s *IntegrationSuite) TestRange(c *check.C) {
-       s.testServer.Config.AnonymousTokens = []string{arvadostest.AnonymousToken}
-       u, _ := url.Parse("http://example.com/c=" + arvadostest.HelloWorldCollection + "/Hello%20world.txt")
-       req := &http.Request{
-               Method:     "GET",
-               Host:       u.Host,
-               URL:        u,
-               RequestURI: u.RequestURI(),
-               Header:     http.Header{"Range": {"bytes=0-4"}},
-       }
-       resp := httptest.NewRecorder()
-       s.testServer.Handler.ServeHTTP(resp, req)
-       c.Check(resp.Code, check.Equals, http.StatusPartialContent)
-       c.Check(resp.Body.String(), check.Equals, "Hello")
-       c.Check(resp.Header().Get("Content-Length"), check.Equals, "5")
-       c.Check(resp.Header().Get("Content-Range"), check.Equals, "bytes 0-4/12")
-
-       req.Header.Set("Range", "bytes=0-")
-       resp = httptest.NewRecorder()
-       s.testServer.Handler.ServeHTTP(resp, req)
-       // 200 and 206 are both correct:
-       c.Check(resp.Code, check.Equals, http.StatusOK)
-       c.Check(resp.Body.String(), check.Equals, "Hello world\n")
-       c.Check(resp.Header().Get("Content-Length"), check.Equals, "12")
-
-       // Unsupported ranges are ignored
-       for _, hdr := range []string{
-               "bytes=5-5",  // non-zero start byte
-               "bytes=-5",   // last 5 bytes
-               "cubits=0-5", // unsupported unit
-               "bytes=0-340282366920938463463374607431768211456", // 2^128
-       } {
-               req.Header.Set("Range", hdr)
-               resp = httptest.NewRecorder()
-               s.testServer.Handler.ServeHTTP(resp, req)
-               c.Check(resp.Code, check.Equals, http.StatusOK)
-               c.Check(resp.Body.String(), check.Equals, "Hello world\n")
-               c.Check(resp.Header().Get("Content-Length"), check.Equals, "12")
-               c.Check(resp.Header().Get("Content-Range"), check.Equals, "")
-               c.Check(resp.Header().Get("Accept-Ranges"), check.Equals, "bytes")
-       }
-}
-
 // XHRs can't follow redirect-with-cookie so they rely on method=POST
 // and disposition=attachment (telling us it's acceptable to respond
 // with content instead of a redirect) and an Origin header that gets
index df8a0b5c078b31d3a42b51786e3d4f6d0f15477f..5f4cb5090468708ce02d34ec5f74d9baf80720a5 100644 (file)
@@ -62,6 +62,8 @@ func main() {
                "Only serve attachments at the given `host:port`"+deprecated)
        flag.BoolVar(&cfg.TrustAllContent, "trust-all-content", false,
                "Serve non-public content from a single origin. Dangerous: read docs before using!"+deprecated)
+       dumpConfig := flag.Bool("dump-config", false,
+               "write current configuration to stdout and exit")
        flag.Usage = usage
        flag.Parse()
 
@@ -78,6 +80,10 @@ func main() {
                cfg.AnonymousTokens = []string{os.Getenv("ARVADOS_API_TOKEN")}
        }
 
+       if *dumpConfig {
+               log.Fatal(config.DumpAndExit(cfg))
+       }
+
        os.Setenv("ARVADOS_API_HOST", cfg.Client.APIHost)
        srv := &server{Config: cfg}
        if err := srv.Start(); err != nil {
diff --git a/services/keep-web/ranges_test.go b/services/keep-web/ranges_test.go
new file mode 100644 (file)
index 0000000..186306d
--- /dev/null
@@ -0,0 +1,90 @@
+package main
+
+import (
+       "fmt"
+       "net/http"
+       "net/http/httptest"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       check "gopkg.in/check.v1"
+)
+
+func (s *IntegrationSuite) TestRanges(c *check.C) {
+       blocksize := 1000000
+       var uuid string
+       {
+               testdata := make([]byte, blocksize)
+               for i := 0; i < blocksize; i++ {
+                       testdata[i] = byte(' ')
+               }
+               copy(testdata[1:4], []byte("foo"))
+               arv, err := arvadosclient.MakeArvadosClient()
+               c.Assert(err, check.Equals, nil)
+               arv.ApiToken = arvadostest.ActiveToken
+               kc, err := keepclient.MakeKeepClient(arv)
+               c.Assert(err, check.Equals, nil)
+               loc, _, err := kc.PutB(testdata[:])
+               c.Assert(err, check.Equals, nil)
+               loc2, _, err := kc.PutB([]byte{'Z'})
+               c.Assert(err, check.Equals, nil)
+
+               mtext := fmt.Sprintf(". %s %s %s %s %s 1:%d:testdata.bin 0:1:space.txt\n", loc, loc, loc, loc, loc2, blocksize*4)
+               coll := map[string]interface{}{}
+               err = arv.Create("collections",
+                       map[string]interface{}{
+                               "collection": map[string]interface{}{
+                                       "name":          "test data for keep-web TestRanges",
+                                       "manifest_text": mtext,
+                               },
+                       }, &coll)
+               c.Assert(err, check.Equals, nil)
+               uuid = coll["uuid"].(string)
+               defer arv.Delete("collections", uuid, nil, nil)
+       }
+
+       url := mustParseURL("http://" + uuid + ".collections.example.com/testdata.bin")
+       for _, trial := range []struct {
+               header     string
+               expectObey bool
+               expectBody string
+       }{
+               {"0-2", true, "foo"},
+               {"-2", true, " Z"},
+               {"1-4", true, "oo  "},
+               {"z-y", false, ""},
+               {"1000000-1000003", true, "foo "},
+               {"999999-1000003", true, " foo "},
+               {"2000000-2000003", true, "foo "},
+               {"1999999-2000002", true, " foo"},
+               {"3999998-3999999", true, " Z"},
+               {"3999998-4000004", true, " Z"},
+               {"3999998-", true, " Z"},
+       } {
+               c.Logf("trial: %#v", trial)
+               resp := httptest.NewRecorder()
+               req := &http.Request{
+                       Method:     "GET",
+                       URL:        url,
+                       Host:       url.Host,
+                       RequestURI: url.RequestURI(),
+                       Header: http.Header{
+                               "Authorization": {"OAuth2 " + arvadostest.ActiveToken},
+                               "Range":         {"bytes=" + trial.header},
+                       },
+               }
+               s.testServer.Handler.ServeHTTP(resp, req)
+               if trial.expectObey {
+                       c.Check(resp.Code, check.Equals, http.StatusPartialContent)
+                       c.Check(resp.Body.Len(), check.Equals, len(trial.expectBody))
+                       if resp.Body.Len() > 1000 {
+                               c.Check(resp.Body.String()[:1000]+"[...]", check.Equals, trial.expectBody)
+                       } else {
+                               c.Check(resp.Body.String(), check.Equals, trial.expectBody)
+                       }
+               } else {
+                       c.Check(resp.Code, check.Equals, http.StatusRequestedRangeNotSatisfiable)
+               }
+       }
+}
index 24df531fa4ab434fea7652503db1d44f1ba04a9b..76a8a1551fb867f5258221f1f22692656039d97a 100644 (file)
@@ -1,7 +1,6 @@
 package main
 
 import (
-       "encoding/json"
        "errors"
        "flag"
        "fmt"
@@ -22,6 +21,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "github.com/coreos/go-systemd/daemon"
+       "github.com/ghodss/yaml"
        "github.com/gorilla/mux"
 )
 
@@ -62,6 +62,7 @@ func main() {
        var cfgPath string
        const defaultCfgPath = "/etc/arvados/keepproxy/keepproxy.yml"
        flagset.StringVar(&cfgPath, "config", defaultCfgPath, "Configuration file `path`")
+       dumpConfig := flagset.Bool("dump-config", false, "write current configuration to stdout and exit")
        flagset.Parse(os.Args[1:])
 
        err := config.LoadFile(cfg, cfgPath)
@@ -77,12 +78,16 @@ func main() {
                if regexp.MustCompile("^(?i:1|yes|true)$").MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")) {
                        cfg.Client.Insecure = true
                }
-               if j, err := json.MarshalIndent(cfg, "", "    "); err == nil {
-                       log.Print("Current configuration:\n", string(j))
+               if y, err := yaml.Marshal(cfg); err == nil && !*dumpConfig {
+                       log.Print("Current configuration:\n", string(y))
                }
                cfg.Timeout = arvados.Duration(time.Duration(*timeoutSeconds) * time.Second)
        }
 
+       if *dumpConfig {
+               log.Fatal(config.DumpAndExit(cfg))
+       }
+
        arv, err := arvadosclient.New(&cfg.Client)
        if err != nil {
                log.Fatalf("Error setting up arvados client %s", err.Error())
index 54147959719183141a8e3137d5d1363ec9667e6b..9033de811775f776499b61f5347545dd42775cc0 100644 (file)
@@ -16,7 +16,6 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
-       "github.com/ghodss/yaml"
 )
 
 // A Keep "block" is 64MB.
@@ -105,12 +104,7 @@ func main() {
        }
 
        if *dumpConfig {
-               y, err := yaml.Marshal(theConfig)
-               if err != nil {
-                       log.Fatal(err)
-               }
-               os.Stdout.Write(y)
-               os.Exit(0)
+               log.Fatal(config.DumpAndExit(theConfig))
        }
 
        err = theConfig.Start()