refs #10227
authorradhika <radhika@curoverse.com>
Fri, 14 Oct 2016 18:18:46 +0000 (14:18 -0400)
committerradhika <radhika@curoverse.com>
Fri, 14 Oct 2016 18:18:46 +0000 (14:18 -0400)
Merge branch '10227-make-diagnostics-tests-reliable'

28 files changed:
doc/install/install-arv-git-httpd.html.textile.liquid
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/hw.py [new file with mode: 0644]
sdk/cwl/tests/test_pathmapper.py [new file with mode: 0644]
sdk/cwl/tests/test_submit.py
sdk/python/arvados/arvfile.py
sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py
sdk/python/tests/test_collections.py
services/api/app/models/container.rb
services/api/app/models/container_request.rb
services/api/test/unit/container_request_test.rb
services/arv-git-httpd/arvados-git-httpd.service
services/arv-git-httpd/doc.go [deleted file]
services/arv-git-httpd/git_handler.go
services/arv-git-httpd/git_handler_test.go
services/arv-git-httpd/gitolite_test.go
services/arv-git-httpd/integration_test.go
services/arv-git-httpd/main.go
services/arv-git-httpd/usage.go
services/fuse/arvados_fuse/command.py
services/fuse/tests/test_command_args.py
services/fuse/tests/test_mount.py
services/keepstore/keepstore.go
tools/arvbox/lib/arvbox/docker/service/arv-git-httpd/run-service

index 5e373c38b855bb3b2f27410d94b0a6835da07c18..b28674de03cc8364d657ba957a9700e119da6732 100644 (file)
@@ -232,6 +232,7 @@ On Red Hat-based systems:
 
 <notextile>
 <pre><code>~$ <span class="userinput">sudo yum install git arvados-git-httpd</span>
+~$ <span class="userinput">sudo systemctl enable arvados-git-httpd</span>
 </code></pre>
 </notextile>
 
@@ -239,10 +240,9 @@ Verify that @arvados-git-httpd@ and @git-http-backend@ can be run:
 
 <notextile>
 <pre><code>~$ <span class="userinput">arvados-git-httpd -h</span>
-Usage of arvados-git-httpd:
-  -address="0.0.0.0:80": Address to listen on, "host:port".
-  -git-command="/usr/bin/git": Path to git executable. Each authenticated request will execute this program with a single argument, "http-backend".
-  -repo-root="/path/to/cwd": Path to git repositories.
+[...]
+Usage: arvados-git-httpd [-config path/to/arvados/git-httpd.yml]
+[...]
 ~$ <span class="userinput">git http-backend</span>
 Status: 500 Internal Server Error
 Expires: Fri, 01 Jan 1980 00:00:00 GMT
@@ -255,43 +255,29 @@ fatal: No REQUEST_METHOD from server
 
 h3. Enable arvados-git-httpd
 
-Install runit to supervise the arvados-git-httpd daemon.  {% include 'install_runit' %}
+{% include 'notebox_begin' %}
+
+The arvados-git-httpd package includes configuration files for systemd.  If you're using a different init system, you'll need to configure a service to start and stop an @arvados-git-httpd@ process as desired.
+
+{% include 'notebox_end' %}
 
-Configure runit to run arvados-git-httpd, making sure to update the API host to match your site:
+Create the configuration file @/etc/arvados/git-httpd/git-httpd.yml@. Run @arvados-git-httpd -h@ to learn more about configuration entries.
 
 <notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/sv</span>
-~$ <span class="userinput">cd /etc/sv</span>
-/etc/sv$ <span class="userinput">sudo mkdir arvados-git-httpd; cd arvados-git-httpd</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo mkdir log</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo sh -c 'cat &gt;log/run' &lt;&lt;'EOF'
-#!/bin/sh
-mkdir -p main
-chown git:git main
-exec chpst -u git:git svlogd -tt main
-EOF</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo sh -c 'cat &gt;run' &lt;&lt;'EOF'
-#!/bin/sh
-export ARVADOS_API_HOST=<b>uuid_prefix.your.domain</b>
-export GITOLITE_HTTP_HOME=/var/lib/arvados/git
-export GL_BYPASS_ACCESS_CHECKS=1
-export PATH="$PATH:/var/lib/arvados/git/bin"
-exec chpst -u git:git arvados-git-httpd -address=:9001 -git-command=/var/lib/arvados/git/gitolite/src/gitolite-shell -repo-root=<b>/var/lib/arvados/git</b>/repositories 2&gt;&1
-EOF</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo chmod +x run log/run</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo ln -s "$(pwd)" /etc/service/</span>
+<pre><code>Client:
+  APIHost: <b>uuid_prefix.your.domain</b>
+  Insecure: false
+GitCommand: /var/lib/arvados/git/gitolite/src/gitolite-shell
+GitoliteHome: /var/lib/arvados/git
+Listen: :9001
+RepoRoot: /var/lib/arvados/git/repositories
 </code></pre>
 </notextile>
 
-If you are using a different daemon supervisor, or if you want to test the daemon in a terminal window, an equivalent shell command to run arvados-git-httpd is:
+Restart the systemd service to ensure the new configuration is used.
 
 <notextile>
-<pre><code>sudo -u git \
-  ARVADOS_API_HOST=<span class="userinput">uuid_prefix.your.domain</span> \
-  GITOLITE_HTTP_HOME=/var/lib/arvados/git \
-  GL_BYPASS_ACCESS_CHECKS=1 \
-  PATH="$PATH:/var/lib/arvados/git/bin" \
-  arvados-git-httpd -address=:9001 -git-command=/var/lib/arvados/git/gitolite/src/gitolite-shell -repo-root=/var/lib/arvados/git/repositories 2&gt;&1
+<pre><code>~$ <span class="userinput">sudo systemctl restart arvados-git-httpd</span>
 </code></pre>
 </notextile>
 
index c90f8902684304b400cc7ece97068a8e6b094000..7ebb13f1bb48af456ce50f2c7d8629e03c975cb0 100644 (file)
@@ -24,7 +24,7 @@ import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner
+from. runner import Runner, upload_instance
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
@@ -64,6 +64,8 @@ class ArvCwlRunner(object):
         self.pipeline = None
         self.final_output_collection = None
         self.output_name = output_name
+        self.project_uuid = None
+
         if keep_client is not None:
             self.keep_client = keep_client
         else:
@@ -266,6 +268,8 @@ class ArvCwlRunner(object):
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
+        upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
+
         runnerjob = None
         if kwargs.get("submit"):
             if self.work_api == "containers":
@@ -289,7 +293,7 @@ class ArvCwlRunner(object):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
-            runnerjob.run()
+            runnerjob.run(wait=kwargs.get("wait"))
             return runnerjob.uuid
 
         self.poll_api = arvados.api('v1')
index 8b1a9346830b349e145fcb0fe5ec3f3cf1acf823..8269eeebdbd417ba908c8e6ca56e8535fd1ffcd9 100644 (file)
@@ -2,6 +2,7 @@ import logging
 import re
 import copy
 import json
+import time
 
 from cwltool.process import get_feature, shortname
 from cwltool.errors import WorkflowException
@@ -233,6 +234,12 @@ class RunnerJob(Runner):
 
         workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
 
+        # Need to filter this out, gets added by cwltool when providing
+        # parameters on the command line, and arv-run-pipeline-instance doesn't
+        # like it.
+        if "job_order" in self.job_order:
+            del self.job_order["job_order"]
+
         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
         if self.output_name:
             self.job_order["arv:output_name"] = self.output_name
@@ -248,28 +255,38 @@ class RunnerJob(Runner):
 
     def run(self, *args, **kwargs):
         job_spec = self.arvados_job_spec(*args, **kwargs)
-        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-
-        response = self.arvrunner.api.jobs().create(
-            body=job_spec,
-            find_or_create=self.enable_reuse
-        ).execute(num_retries=self.arvrunner.num_retries)
 
-        self.uuid = response["uuid"]
+        for k,v in job_spec["script_parameters"].items():
+            if isinstance(v, dict):
+                job_spec["script_parameters"][k] = {"value": v}
+
+        self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
+            body={
+                "owner_uuid": self.arvrunner.project_uuid,
+                "name": shortname(self.tool.tool["id"]),
+                "components": {"cwl-runner": job_spec },
+                "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
+        logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
+
+        if kwargs.get("wait") is False:
+            self.uuid = self.arvrunner.pipeline["uuid"]
+            return
+
+        job = None
+        while not job:
+            time.sleep(2)
+            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().get(
+                uuid=self.arvrunner.pipeline["uuid"]).execute(
+                    num_retries=self.arvrunner.num_retries)
+            job = self.arvrunner.pipeline["components"]["cwl-runner"].get("job")
+            if not job and self.arvrunner.pipeline["state"] != "RunningOnServer":
+                raise WorkflowException("Submitted pipeline is %s" % (self.arvrunner.pipeline["state"]))
+
+        self.uuid = job["uuid"]
         self.arvrunner.processes[self.uuid] = self
 
-        logger.info("Submitted job %s", response["uuid"])
-
-        if kwargs.get("submit"):
-            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
-                body={
-                    "owner_uuid": self.arvrunner.project_uuid,
-                    "name": shortname(self.tool.tool["id"]),
-                    "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
-                    "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
-        if response["state"] in ("Complete", "Failed", "Cancelled"):
-            self.done(response)
+        if job["state"] in ("Complete", "Failed", "Cancelled"):
+            self.done(job)
 
 
 class RunnerTemplate(object):
index 228d43304af9e4345b1800043b175cb0ec13f594..73c81ceb0fcdb033203c1b7e5425b3875ea121d6 100644 (file)
@@ -37,11 +37,11 @@ class ArvPathMapper(PathMapper):
                 # Local FS ref, may need to be uploaded or may be on keep
                 # mount.
                 ab = abspath(src, self.input_basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
+                st = arvados.commands.run.statfile("", ab, fnPattern="keep:%s/%s")
                 if isinstance(st, arvados.commands.run.UploadFile):
                     uploadfiles.add((src, ab, st))
                 elif isinstance(st, arvados.commands.run.ArvFile):
-                    self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+                    self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
                 elif src.startswith("_:"):
                     if "contents" in srcobj:
                         pass
@@ -78,9 +78,11 @@ class ArvPathMapper(PathMapper):
 
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None
-        self._pathmap = self.arvrunner.get_uploaded()
         uploadfiles = set()
 
+        for k,v in self.arvrunner.get_uploaded().iteritems():
+            self._pathmap[k] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+
         for srcobj in referenced_files:
             self.visit(srcobj, uploadfiles)
 
@@ -89,12 +91,12 @@ class ArvPathMapper(PathMapper):
                                              self.arvrunner.api,
                                              dry_run=False,
                                              num_retries=self.arvrunner.num_retries,
-                                             fnPattern=self.file_pattern,
+                                             fnPattern="keep:%s/%s",
                                              name=self.name,
                                              project=self.arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
-            self._pathmap[src] = MapperEnt("keep:" + st.keepref, st.fn, "File")
+            self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
             self.arvrunner.add_uploaded(src, self._pathmap[src])
 
         for srcobj in referenced_files:
index e5b4e006e8cce7cac780436fc06c6dcd79882730..054d3530cfe174b9a5da3025d248097ca1062d7a 100644 (file)
@@ -112,6 +112,30 @@ def upload_docker(arvrunner, tool):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
 
+def upload_instance(arvrunner, name, tool, job_order):
+        upload_docker(arvrunner, tool)
+
+        workflowmapper = upload_dependencies(arvrunner,
+                                             name,
+                                             tool.doc_loader,
+                                             tool.tool,
+                                             tool.tool["id"],
+                                             True)
+
+        jobmapper = upload_dependencies(arvrunner,
+                                        os.path.basename(job_order.get("id", "#")),
+                                        tool.doc_loader,
+                                        job_order,
+                                        job_order.get("id", "#"),
+                                        False)
+
+        adjustDirObjs(job_order, trim_listing)
+
+        if "id" in job_order:
+            del job_order["id"]
+
+        return workflowmapper
+
 
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse, output_name):
@@ -128,31 +152,8 @@ class Runner(object):
         pass
 
     def arvados_job_spec(self, *args, **kwargs):
-        upload_docker(self.arvrunner, self.tool)
-
         self.name = os.path.basename(self.tool.tool["id"])
-
-        workflowmapper = upload_dependencies(self.arvrunner,
-                                             self.name,
-                                             self.tool.doc_loader,
-                                             self.tool.tool,
-                                             self.tool.tool["id"],
-                                             True)
-
-        jobmapper = upload_dependencies(self.arvrunner,
-                                        os.path.basename(self.job_order.get("id", "#")),
-                                        self.tool.doc_loader,
-                                        self.job_order,
-                                        self.job_order.get("id", "#"),
-                                        False)
-
-        adjustDirObjs(self.job_order, trim_listing)
-
-        if "id" in self.job_order:
-            del self.job_order["id"]
-
-        return workflowmapper
-
+        return upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
 
     def done(self, record):
         if record["state"] == "Complete":
diff --git a/sdk/cwl/tests/hw.py b/sdk/cwl/tests/hw.py
new file mode 100644 (file)
index 0000000..62c813a
--- /dev/null
@@ -0,0 +1 @@
+print "Hello world"
diff --git a/sdk/cwl/tests/test_pathmapper.py b/sdk/cwl/tests/test_pathmapper.py
new file mode 100644 (file)
index 0000000..7e13066
--- /dev/null
@@ -0,0 +1,91 @@
+import functools
+import mock
+import sys
+import unittest
+import json
+import logging
+import os
+
+import arvados
+import arvados.keep
+import arvados.collection
+import arvados_cwl
+
+from cwltool.pathmapper import MapperEnt
+
+from arvados_cwl.pathmapper import ArvPathMapper
+
+def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
+    pdh = "99999999999999999999999999999991+99"
+    for c in files:
+        c.fn = fnPattern % (pdh, os.path.basename(c.fn))
+
+class TestPathmap(unittest.TestCase):
+    def test_keepref(self):
+        """Test direct keep references."""
+
+        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+
+        p = ArvPathMapper(arvrunner, [{
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py"
+        }], "", "/test/%s", "/test/%s/%s")
+
+        self.assertEqual({'keep:99999999999999999999999999999991+99/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+                         p._pathmap)
+
+    @mock.patch("arvados.commands.run.uploadfiles")
+    def test_upload(self, upl):
+        """Test pathmapper uploading files."""
+
+        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+
+        upl.side_effect = upload_mock
+
+        p = ArvPathMapper(arvrunner, [{
+            "class": "File",
+            "location": "tests/hw.py"
+        }], "", "/test/%s", "/test/%s/%s")
+
+        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+                         p._pathmap)
+
+    @mock.patch("arvados.commands.run.uploadfiles")
+    def test_prev_uploaded(self, upl):
+        """Test pathmapper handling previously uploaded files."""
+
+        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+        arvrunner.add_uploaded('tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File'))
+
+        upl.side_effect = upload_mock
+
+        p = ArvPathMapper(arvrunner, [{
+            "class": "File",
+            "location": "tests/hw.py"
+        }], "", "/test/%s", "/test/%s/%s")
+
+        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+                         p._pathmap)
+
+    @mock.patch("arvados.commands.run.uploadfiles")
+    @mock.patch("arvados.commands.run.statfile")
+    def test_statfile(self, statfile, upl):
+        """Test pathmapper handling ArvFile references."""
+        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+
+        # An ArvFile object returned from arvados.commands.run.statfile means the file is located on a
+        # keep mount, so we can construct a direct reference directly without upload.
+        def statfile_mock(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
+            st = arvados.commands.run.ArvFile("", fnPattern % ("99999999999999999999999999999991+99", "hw.py"))
+            return st
+
+        upl.side_effect = upload_mock
+        statfile.side_effect = statfile_mock
+
+        p = ArvPathMapper(arvrunner, [{
+            "class": "File",
+            "location": "tests/hw.py"
+        }], "", "/test/%s", "/test/%s/%s")
+
+        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+                         p._pathmap)
index 6674efb8c4f75c2391160a702588ea2354e377a3..d3bdf8fedc30d897b323cd23fce5e74a017f3da9 100644 (file)
@@ -60,7 +60,13 @@ def stubs(func):
             "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz5",
             "portable_data_hash": "99999999999999999999999999999995+99",
             "manifest_text": ""
-        }        )
+        },
+        {
+            "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz6",
+            "portable_data_hash": "99999999999999999999999999999996+99",
+            "manifest_text": ""
+        }
+        )
         stubs.api.collections().get().execute.return_value = {
             "portable_data_hash": "99999999999999999999999999999993+99", "manifest_text": "./tool 00000000000000000000000000000000+0 0:0:submit_tool.cwl 0:0:blub.txt"}
 
@@ -112,6 +118,38 @@ def stubs(func):
             'script_version': 'master',
             'script': 'cwl-runner'
         }
+        stubs.pipeline_component = stubs.expect_job_spec.copy()
+        stubs.expect_pipeline_instance = {
+            'name': 'submit_wf.cwl',
+            'state': 'RunningOnServer',
+            "components": {
+                "cwl-runner": {
+                    'runtime_constraints': {'docker_image': 'arvados/jobs'},
+                    'script_parameters': {
+                        'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
+                        'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'}},
+                        'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
+                              'listing': [
+                                  {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
+                              ]}},
+                        'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+                    },
+                    'repository': 'arvados',
+                    'script_version': 'master',
+                    'script': 'cwl-runner'
+                }
+            }
+        }
+        stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
+        stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+        stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
+        stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
+        stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
+            "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+            "state": "Queued"
+        }
+        stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
+        stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
 
         stubs.expect_container_spec = {
             'priority': 1,
@@ -157,11 +195,12 @@ def stubs(func):
 
 
 class TestSubmit(unittest.TestCase):
+    @mock.patch("time.sleep")
     @stubs
-    def test_submit(self, stubs):
+    def test_submit(self, stubs, tm):
         capture_stdout = cStringIO.StringIO()
         exited = arvados_cwl.main(
-            ["--submit", "--no-wait",
+            ["--submit", "--no-wait", "--debug",
              "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
             capture_stdout, sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
@@ -192,16 +231,16 @@ class TestSubmit(unittest.TestCase):
             }, ensure_unique_name=True),
             mock.call().execute()])
 
-        expect_job = copy.deepcopy(stubs.expect_job_spec)
-        expect_job["owner_uuid"] = stubs.fake_user_uuid
-        stubs.api.jobs().create.assert_called_with(
-            body=expect_job,
-            find_or_create=True)
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
         self.assertEqual(capture_stdout.getvalue(),
-                         stubs.expect_job_uuid + '\n')
+                         stubs.expect_pipeline_uuid + '\n')
 
+    @mock.patch("time.sleep")
     @stubs
-    def test_submit_with_project_uuid(self, stubs):
+    def test_submit_with_project_uuid(self, stubs, tm):
         project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
 
         exited = arvados_cwl.main(
@@ -211,11 +250,10 @@ class TestSubmit(unittest.TestCase):
             sys.stdout, sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
 
-        expect_body = copy.deepcopy(stubs.expect_job_spec)
-        expect_body["owner_uuid"] = project_uuid
-        stubs.api.jobs().create.assert_called_with(
-            body=expect_body,
-            find_or_create=True)
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        expect_pipeline["owner_uuid"] = project_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
 
     @stubs
     def test_submit_container(self, stubs):
index 1ca7ad82ed845844a2e18cc7bf4b7a9d24c3d652..c394dab810715c2659b6f72f8f5f1e173d711ead 100644 (file)
@@ -834,14 +834,14 @@ class ArvadosFile(object):
             self._writers.add(writer)
 
     @synchronized
-    def remove_writer(self, writer):
+    def remove_writer(self, writer, flush):
         """
         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
         and do some block maintenance tasks.
         """
         self._writers.remove(writer)
 
-        if self.size() > config.KEEP_BLOCK_SIZE / 2:
+        if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
             # File writer closed, not small enough for repacking
             self.flush()
         elif self.closed():
@@ -1166,7 +1166,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def flush(self):
         self.arvadosfile.flush()
 
-    def close(self):
+    def close(self, flush=True):
         if not self.closed:
-            self.arvadosfile.remove_writer(self)
+            self.arvadosfile.remove_writer(self, flush)
             super(ArvadosFileWriter, self).close()
index 89753a22863808c3fb89686cbf9948dae8760171..5cb699f49f7a21eb8d7789a52bd0aea7bdd056f0 100644 (file)
@@ -14,14 +14,10 @@ import hashlib
 import json
 import os
 import pwd
-import time
 import signal
 import socket
 import sys
 import tempfile
-import threading
-import copy
-import logging
 from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
@@ -280,344 +276,79 @@ class ResumeCache(object):
         self.__init__(self.filename)
 
 
-class ArvPutUploadJob(object):
-    CACHE_DIR = '.cache/arvados/arv-put'
-    EMPTY_STATE = {
-        'manifest' : None, # Last saved manifest checkpoint
-        'files' : {} # Previous run file list: {path : {size, mtime}}
-    }
-
-    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
-                 name=None, owner_uuid=None, ensure_unique_name=False,
-                 num_retries=None, replication_desired=None,
-                 filename=None, update_time=60.0):
-        self.paths = paths
-        self.resume = resume
+class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
+    STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
+                   ['bytes_written', '_seen_inputs'])
+
+    def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
+        self.bytes_written = 0
+        self._seen_inputs = []
+        self.cache = cache
         self.reporter = reporter
         self.bytes_expected = bytes_expected
-        self.bytes_written = 0
-        self.bytes_skipped = 0
-        self.name = name
-        self.owner_uuid = owner_uuid
-        self.ensure_unique_name = ensure_unique_name
-        self.num_retries = num_retries
-        self.replication_desired = replication_desired
-        self.filename = filename
-        self._state_lock = threading.Lock()
-        self._state = None # Previous run state (file list & manifest)
-        self._current_files = [] # Current run file list
-        self._cache_file = None
-        self._collection = None
-        self._collection_lock = threading.Lock()
-        self._stop_checkpointer = threading.Event()
-        self._checkpointer = threading.Thread(target=self._update_task)
-        self._update_task_time = update_time  # How many seconds wait between update runs
-        self.logger = logging.getLogger('arvados.arv_put')
-        # Load cached data if any and if needed
-        self._setup_state()
-
-    def start(self):
-        """
-        Start supporting thread & file uploading
-        """
-        self._checkpointer.daemon = True
-        self._checkpointer.start()
+        super(ArvPutCollectionWriter, self).__init__(**kwargs)
+
+    @classmethod
+    def from_cache(cls, cache, reporter=None, bytes_expected=None,
+                   num_retries=0, replication=0):
         try:
-            for path in self.paths:
-                # Test for stdin first, in case some file named '-' exist
-                if path == '-':
-                    self._write_stdin(self.filename or 'stdin')
-                elif os.path.isdir(path):
-                    self._write_directory_tree(path)
-                else:
-                    self._write_file(path, self.filename or os.path.basename(path))
-        finally:
-            # Stop the thread before doing anything else
-            self._stop_checkpointer.set()
-            self._checkpointer.join()
-            # Commit all & one last _update()
-            self.manifest_text()
-            self._update()
-            if self.resume:
-                self._cache_file.close()
-                # Correct the final written bytes count
-                self.bytes_written -= self.bytes_skipped
-
-    def save_collection(self):
-        with self._collection_lock:
-            self._my_collection().save_new(
-                name=self.name, owner_uuid=self.owner_uuid,
-                ensure_unique_name=self.ensure_unique_name,
-                num_retries=self.num_retries)
-
-    def destroy_cache(self):
-        if self.resume:
-            try:
-                os.unlink(self._cache_filename)
-            except OSError as error:
-                # That's what we wanted anyway.
-                if error.errno != errno.ENOENT:
-                    raise
-            self._cache_file.close()
-
-    def _collection_size(self, collection):
-        """
-        Recursively get the total size of the collection
-        """
-        size = 0
-        for item in collection.values():
-            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
-                size += self._collection_size(item)
-            else:
-                size += item.size()
-        return size
-
-    def _update_task(self):
-        """
-        Periodically called support task. File uploading is
-        asynchronous so we poll status from the collection.
-        """
-        while not self._stop_checkpointer.wait(self._update_task_time):
-            self._update()
-
-    def _update(self):
-        """
-        Update cached manifest text and report progress.
-        """
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._my_collection())
-            # Update cache, if resume enabled
-            if self.resume:
-                with self._state_lock:
-                    # Get the manifest text without comitting pending blocks
-                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
-        if self.resume:
-            self._save_state()
-        # Call the reporter, if any
-        self.report_progress()
+            state = cache.load()
+            state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
+            writer = cls.from_state(state, cache, reporter, bytes_expected,
+                                    num_retries=num_retries,
+                                    replication=replication)
+        except (TypeError, ValueError,
+                arvados.errors.StaleWriterStateError) as error:
+            return cls(cache, reporter, bytes_expected,
+                       num_retries=num_retries,
+                       replication=replication)
+        else:
+            return writer
+
+    def cache_state(self):
+        if self.cache is None:
+            return
+        state = self.dump_state()
+        # Transform attributes for serialization.
+        for attr, value in state.items():
+            if attr == '_data_buffer':
+                state[attr] = base64.encodestring(''.join(value))
+            elif hasattr(value, 'popleft'):
+                state[attr] = list(value)
+        self.cache.save(state)
 
     def report_progress(self):
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
-    def _write_directory_tree(self, path, stream_name="."):
-        # TODO: Check what happens when multiple directories are passed as
-        # arguments.
-        # If the code below is uncommented, integration test
-        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
-        # fails, I suppose it is because the manifest_uuid changes because
-        # of the dir addition to stream_name.
-
-        # if stream_name == '.':
-        #     stream_name = os.path.join('.', os.path.basename(path))
-        for item in os.listdir(path):
-            if os.path.isdir(os.path.join(path, item)):
-                self._write_directory_tree(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-            else:
-                self._write_file(os.path.join(path, item),
-                                os.path.join(stream_name, item))
-
-    def _write_stdin(self, filename):
-        with self._collection_lock:
-            output = self._my_collection().open(filename, 'w')
-        self._write(sys.stdin, output)
-        output.close()
-
-    def _write_file(self, source, filename):
-        resume_offset = 0
-        if self.resume:
-            # Check if file was already uploaded (at least partially)
-            with self._collection_lock:
-                try:
-                    file_in_collection = self._my_collection().find(filename)
-                except IOError:
-                    # Not found
-                    file_in_collection = None
-            # If no previous cached data on this file, store it for an eventual
-            # repeated run.
-            if source not in self._state['files']:
-                with self._state_lock:
-                    self._state['files'][source] = {
-                        'mtime': os.path.getmtime(source),
-                        'size' : os.path.getsize(source)
-                    }
-            with self._state_lock:
-                cached_file_data = self._state['files'][source]
-            # See if this file was already uploaded at least partially
-            if file_in_collection:
-                if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
-                    if cached_file_data['size'] == file_in_collection.size():
-                        # File already there, skip it.
-                        self.bytes_skipped += cached_file_data['size']
-                        return
-                    elif cached_file_data['size'] > file_in_collection.size():
-                        # File partially uploaded, resume!
-                        resume_offset = file_in_collection.size()
-                    else:
-                        # Inconsistent cache, re-upload the file
-                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
-                else:
-                    # Local file differs from cached data, re-upload it
-                    pass
-        with open(source, 'r') as source_fd:
-            if resume_offset > 0:
-                # Start upload where we left off
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'a')
-                source_fd.seek(resume_offset)
-                self.bytes_skipped += resume_offset
-            else:
-                # Start from scratch
-                with self._collection_lock:
-                    output = self._my_collection().open(filename, 'w')
-            self._write(source_fd, output)
-            output.close()
-
-    def _write(self, source_fd, output):
-        first_read = True
-        while True:
-            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-            # Allow an empty file to be written
-            if not data and not first_read:
-                break
-            if first_read:
-                first_read = False
-            output.write(data)
-
-    def _my_collection(self):
-        """
-        Create a new collection if none cached. Load it from cache otherwise.
-        """
-        if self._collection is None:
-            with self._state_lock:
-                manifest = self._state['manifest']
-            if self.resume and manifest is not None:
-                # Create collection from saved state
-                self._collection = arvados.collection.Collection(
-                    manifest,
-                    replication_desired=self.replication_desired)
-            else:
-                # Create new collection
-                self._collection = arvados.collection.Collection(
-                    replication_desired=self.replication_desired)
-        return self._collection
-
-    def _setup_state(self):
-        """
-        Create a new cache file or load a previously existing one.
-        """
-        if self.resume:
-            md5 = hashlib.md5()
-            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
-            realpaths = sorted(os.path.realpath(path) for path in self.paths)
-            md5.update('\0'.join(realpaths))
-            if self.filename:
-                md5.update(self.filename)
-            cache_filename = md5.hexdigest()
-            self._cache_file = open(os.path.join(
-                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-                cache_filename), 'a+')
-            self._cache_filename = self._cache_file.name
-            self._lock_file(self._cache_file)
-            self._cache_file.seek(0)
-            with self._state_lock:
-                try:
-                    self._state = json.load(self._cache_file)
-                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
-                        # Cache at least partially incomplete, set up new cache
-                        self._state = copy.deepcopy(self.EMPTY_STATE)
-                except ValueError:
-                    # Cache file empty, set up new cache
-                    self._state = copy.deepcopy(self.EMPTY_STATE)
-            # Load how many bytes were uploaded on previous run
-            with self._collection_lock:
-                self.bytes_written = self._collection_size(self._my_collection())
-        # No resume required
-        else:
-            with self._state_lock:
-                self._state = copy.deepcopy(self.EMPTY_STATE)
-
-    def _lock_file(self, fileobj):
-        try:
-            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
-        except IOError:
-            raise ResumeCacheConflict("{} locked".format(fileobj.name))
-
-    def _save_state(self):
-        """
-        Atomically save current state into cache.
-        """
-        try:
-            with self._state_lock:
-                state = self._state
-            new_cache_fd, new_cache_name = tempfile.mkstemp(
-                dir=os.path.dirname(self._cache_filename))
-            self._lock_file(new_cache_fd)
-            new_cache = os.fdopen(new_cache_fd, 'r+')
-            json.dump(state, new_cache)
-            new_cache.flush()
-            os.fsync(new_cache)
-            os.rename(new_cache_name, self._cache_filename)
-        except (IOError, OSError, ResumeCacheConflict) as error:
-            self.logger.error("There was a problem while saving the cache file: {}".format(error))
-            try:
-                os.unlink(new_cache_name)
-            except NameError:  # mkstemp failed.
-                pass
-        else:
-            self._cache_file.close()
-            self._cache_file = new_cache
-
-    def collection_name(self):
-        with self._collection_lock:
-            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
-        return name
-
-    def manifest_locator(self):
-        with self._collection_lock:
-            locator = self._my_collection().manifest_locator()
-        return locator
-
-    def portable_data_hash(self):
-        with self._collection_lock:
-            datahash = self._my_collection().portable_data_hash()
-        return datahash
-
-    def manifest_text(self, stream_name=".", strip=False, normalize=False):
-        with self._collection_lock:
-            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
-        return manifest
-
-    def _datablocks_on_item(self, item):
-        """
-        Return a list of datablock locators, recursively navigating
-        through subcollections
-        """
-        if isinstance(item, arvados.arvfile.ArvadosFile):
-            if item.size() == 0:
-                # Empty file locator
-                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
-            else:
-                locators = []
-                for segment in item.segments():
-                    loc = segment.locator
-                    locators.append(loc)
-                return locators
-        elif isinstance(item, arvados.collection.Collection):
-            l = [self._datablocks_on_item(x) for x in item.values()]
-            # Fast list flattener method taken from:
-            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
-            return [loc for sublist in l for loc in sublist]
-        else:
-            return None
-
-    def data_locators(self):
-        with self._collection_lock:
-            # Make sure all datablocks are flushed before getting the locators
-            self._my_collection().manifest_text()
-            datablocks = self._datablocks_on_item(self._my_collection())
-        return datablocks
+    def flush_data(self):
+        start_buffer_len = self._data_buffer_len
+        start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
+        super(ArvPutCollectionWriter, self).flush_data()
+        if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
+            self.bytes_written += (start_buffer_len - self._data_buffer_len)
+            self.report_progress()
+            if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
+                self.cache_state()
+
+    def _record_new_input(self, input_type, source_name, dest_name):
+        # The key needs to be a list because that's what we'll get back
+        # from JSON deserialization.
+        key = [input_type, source_name, dest_name]
+        if key in self._seen_inputs:
+            return False
+        self._seen_inputs.append(key)
+        return True
+
+    def write_file(self, source, filename=None):
+        if self._record_new_input('file', source, filename):
+            super(ArvPutCollectionWriter, self).write_file(source, filename)
+
+    def write_directory_tree(self,
+                             path, stream_name='.', max_manifest_depth=-1):
+        if self._record_new_input('directory', path, stream_name):
+            super(ArvPutCollectionWriter, self).write_directory_tree(
+                path, stream_name, max_manifest_depth)
 
 
 def expected_bytes_for(pathlist):
@@ -699,62 +430,118 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         print >>stderr, error
         sys.exit(1)
 
+    # write_copies diverges from args.replication here.
+    # args.replication is how many copies we will instruct Arvados to
+    # maintain (by passing it in collections().create()) after all
+    # data is written -- and if None was given, we'll use None there.
+    # Meanwhile, write_copies is how many copies of each data block we
+    # write to Keep, which has to be a number.
+    #
+    # If we simply changed args.replication from None to a default
+    # here, we'd end up erroneously passing the default replication
+    # level (instead of None) to collections().create().
+    write_copies = (args.replication or
+                    api_client._rootDesc.get('defaultCollectionReplication', 2))
+
     if args.progress:
         reporter = progress_writer(human_progress)
     elif args.batch_progress:
         reporter = progress_writer(machine_progress)
     else:
         reporter = None
-
     bytes_expected = expected_bytes_for(args.paths)
-    try:
-        writer = ArvPutUploadJob(paths = args.paths,
-                                 resume = args.resume,
-                                 filename = args.filename,
-                                 reporter = reporter,
-                                 bytes_expected = bytes_expected,
-                                 num_retries = args.retries,
-                                 replication_desired = args.replication,
-                                 name = collection_name,
-                                 owner_uuid = project_uuid,
-                                 ensure_unique_name = True)
-    except ResumeCacheConflict:
-        print >>stderr, "\n".join([
-            "arv-put: Another process is already uploading this data.",
-            "         Use --no-resume if this is really what you want."])
-        sys.exit(1)
+
+    resume_cache = None
+    if args.resume:
+        try:
+            resume_cache = ResumeCache(ResumeCache.make_path(args))
+            resume_cache.check_cache(api_client=api_client, num_retries=args.retries)
+        except (IOError, OSError, ValueError):
+            pass  # Couldn't open cache directory/file.  Continue without it.
+        except ResumeCacheConflict:
+            print >>stderr, "\n".join([
+                "arv-put: Another process is already uploading this data.",
+                "         Use --no-resume if this is really what you want."])
+            sys.exit(1)
+
+    if resume_cache is None:
+        writer = ArvPutCollectionWriter(
+            resume_cache, reporter, bytes_expected,
+            num_retries=args.retries,
+            replication=write_copies)
+    else:
+        writer = ArvPutCollectionWriter.from_cache(
+            resume_cache, reporter, bytes_expected,
+            num_retries=args.retries,
+            replication=write_copies)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if args.resume and writer.bytes_written > 0:
+    if writer.bytes_written > 0:  # We're resuming a previous upload.
         print >>stderr, "\n".join([
                 "arv-put: Resuming previous upload from last checkpoint.",
                 "         Use the --no-resume option to start over."])
 
     writer.report_progress()
-    output = None
-    writer.start()
+    writer.do_queued_work()  # Do work resumed from cache.
+    for path in args.paths:  # Copy file data to Keep.
+        if path == '-':
+            writer.start_new_stream()
+            writer.start_new_file(args.filename)
+            r = sys.stdin.read(64*1024)
+            while r:
+                # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
+                # CollectionWriter.write().
+                super(arvados.collection.ResumableCollectionWriter, writer).write(r)
+                r = sys.stdin.read(64*1024)
+        elif os.path.isdir(path):
+            writer.write_directory_tree(
+                path, max_manifest_depth=args.max_manifest_depth)
+        else:
+            writer.start_new_stream()
+            writer.write_file(path, args.filename or os.path.basename(path))
+    writer.finish_current_stream()
+
     if args.progress:  # Print newline to split stderr from stdout for humans.
         print >>stderr
 
+    output = None
     if args.stream:
+        output = writer.manifest_text()
         if args.normalize:
-            output = writer.manifest_text(normalize=True)
-        else:
-            output = writer.manifest_text()
+            output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
     elif args.raw:
         output = ','.join(writer.data_locators())
     else:
         try:
-            writer.save_collection()
-            print >>stderr, "Collection saved as '%s'" % writer.collection_name()
-            if args.portable_data_hash:
-                output = writer.portable_data_hash()
+            manifest_text = writer.manifest_text()
+            if args.normalize:
+                manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
+            replication_attr = 'replication_desired'
+            if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
+                # API called it 'redundancy' before #3410.
+                replication_attr = 'redundancy'
+            # Register the resulting collection in Arvados.
+            collection = api_client.collections().create(
+                body={
+                    'owner_uuid': project_uuid,
+                    'name': collection_name,
+                    'manifest_text': manifest_text,
+                    replication_attr: args.replication,
+                    },
+                ensure_unique_name=True
+                ).execute(num_retries=args.retries)
+
+            print >>stderr, "Collection saved as '%s'" % collection['name']
+
+            if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
+                output = collection['portable_data_hash']
             else:
-                output = writer.manifest_locator()
+                output = collection['uuid']
+
         except apiclient_errors.Error as error:
             print >>stderr, (
                 "arv-put: Error creating Collection on project: {}.".format(
@@ -775,10 +562,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     if status != 0:
         sys.exit(status)
 
-    # Success!
-    writer.destroy_cache()
-    return output
+    if resume_cache is not None:
+        resume_cache.destroy()
 
+    return output
 
 if __name__ == '__main__':
     main()
index 7a0120c02814d00b27e81dd41fbb50e51ef2855c..e64d91474170ce688780c3ab94ea3ae6bb69bbfb 100644 (file)
@@ -13,15 +13,11 @@ import tempfile
 import time
 import unittest
 import yaml
-import threading
-import hashlib
-import random
 
 from cStringIO import StringIO
 
 import arvados
 import arvados.commands.put as arv_put
-import arvados_testutil as tutil
 
 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
 import run_test_server
@@ -238,53 +234,66 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
                           arv_put.ResumeCache, path)
 
 
-class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
-                          ArvadosBaseTestCase):
+class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
+                                     ArvadosBaseTestCase):
     def setUp(self):
-        super(ArvPutUploadJobTest, self).setUp()
+        super(ArvadosPutCollectionWriterTest, self).setUp()
         run_test_server.authorize_with('active')
-        # Temp files creation
-        self.tempdir = tempfile.mkdtemp()
-        subdir = os.path.join(self.tempdir, 'subdir')
-        os.mkdir(subdir)
-        data = "x" * 1024 # 1 KB
-        for i in range(1, 5):
-            with open(os.path.join(self.tempdir, str(i)), 'w') as f:
-                f.write(data * i)
-        with open(os.path.join(subdir, 'otherfile'), 'w') as f:
-            f.write(data * 5)
-        # Large temp file for resume test
-        _, self.large_file_name = tempfile.mkstemp()
-        fileobj = open(self.large_file_name, 'w')
-        # Make sure to write just a little more than one block
-        for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
-            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
-            fileobj.write(data)
-        fileobj.close()
-        self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
+        with tempfile.NamedTemporaryFile(delete=False) as cachefile:
+            self.cache = arv_put.ResumeCache(cachefile.name)
+            self.cache_filename = cachefile.name
 
     def tearDown(self):
-        super(ArvPutUploadJobTest, self).tearDown()
-        shutil.rmtree(self.tempdir)
-        os.unlink(self.large_file_name)
+        super(ArvadosPutCollectionWriterTest, self).tearDown()
+        if os.path.exists(self.cache_filename):
+            self.cache.destroy()
+        self.cache.close()
+
+    def test_writer_caches(self):
+        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
+        cwriter.write_file('/dev/null')
+        cwriter.cache_state()
+        self.assertTrue(self.cache.load())
+        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
     def test_writer_works_without_cache(self):
-        cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
-        cwriter.start()
+        cwriter = arv_put.ArvPutCollectionWriter()
+        cwriter.write_file('/dev/null')
+        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+
+    def test_writer_resumes_from_cache(self):
+        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
+        with self.make_test_file() as testfile:
+            cwriter.write_file(testfile.name, 'test')
+            cwriter.cache_state()
+            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
+                self.cache)
+            self.assertEqual(
+                ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
+                new_writer.manifest_text())
+
+    def test_new_writer_from_stale_cache(self):
+        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
+        with self.make_test_file() as testfile:
+            cwriter.write_file(testfile.name, 'test')
+        new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
+        new_writer.write_file('/dev/null')
+        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
+
+    def test_new_writer_from_empty_cache(self):
+        cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
+        cwriter.write_file('/dev/null')
         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
-    def test_writer_works_with_cache(self):
-        with tempfile.NamedTemporaryFile() as f:
-            f.write('foo')
-            f.flush()
-            cwriter = arv_put.ArvPutUploadJob([f.name])
-            cwriter.start()
-            self.assertEqual(3, cwriter.bytes_written)
-            # Don't destroy the cache, and start another upload
-            cwriter_new = arv_put.ArvPutUploadJob([f.name])
-            cwriter_new.start()
-            cwriter_new.destroy_cache()
-            self.assertEqual(0, cwriter_new.bytes_written)
+    def test_writer_resumable_after_arbitrary_bytes(self):
+        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
+        # These bytes are intentionally not valid UTF-8.
+        with self.make_test_file('\x00\x07\xe2') as testfile:
+            cwriter.write_file(testfile.name, 'test')
+            cwriter.cache_state()
+            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
+                self.cache)
+        self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
 
     def make_progress_tester(self):
         progression = []
@@ -293,47 +302,24 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
         return progression, record_func
 
     def test_progress_reporting(self):
-        with tempfile.NamedTemporaryFile() as f:
-            f.write('foo')
-            f.flush()
-            for expect_count in (None, 8):
-                progression, reporter = self.make_progress_tester()
-                cwriter = arv_put.ArvPutUploadJob([f.name],
-                    reporter=reporter, bytes_expected=expect_count)
-                cwriter.start()
-                cwriter.destroy_cache()
-                self.assertIn((3, expect_count), progression)
-
-    def test_writer_upload_directory(self):
-        cwriter = arv_put.ArvPutUploadJob([self.tempdir])
-        cwriter.start()
-        cwriter.destroy_cache()
-        self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
-
-    def test_resume_large_file_upload(self):
-        def wrapped_write(*args, **kwargs):
-            data = args[1]
-            # Exit only on last block
-            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
-                raise SystemExit("Simulated error")
-            return self.arvfile_write(*args, **kwargs)
-
-        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
-                        autospec=True) as mocked_write:
-            mocked_write.side_effect = wrapped_write
-            writer = arv_put.ArvPutUploadJob([self.large_file_name],
-                                             replication_desired=1)
-            with self.assertRaises(SystemExit):
-                writer.start()
-                self.assertLess(writer.bytes_written,
-                                os.path.getsize(self.large_file_name))
-        # Retry the upload
-        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
-                                          replication_desired=1)
-        writer2.start()
-        self.assertEqual(writer.bytes_written + writer2.bytes_written,
-                         os.path.getsize(self.large_file_name))
-        writer2.destroy_cache()
+        for expect_count in (None, 8):
+            progression, reporter = self.make_progress_tester()
+            cwriter = arv_put.ArvPutCollectionWriter(
+                reporter=reporter, bytes_expected=expect_count)
+            with self.make_test_file() as testfile:
+                cwriter.write_file(testfile.name, 'test')
+            cwriter.finish_current_stream()
+            self.assertIn((4, expect_count), progression)
+
+    def test_resume_progress(self):
+        cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
+        with self.make_test_file() as testfile:
+            # Set up a writer with some flushed bytes.
+            cwriter.write_file(testfile.name, 'test')
+            cwriter.finish_current_stream()
+            cwriter.cache_state()
+            new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
+            self.assertEqual(new_writer.bytes_written, 4)
 
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
@@ -434,8 +420,9 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
             os.chmod(cachedir, 0o700)
 
     def test_put_block_replication(self):
-        self.call_main_on_test_file()
-        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
+        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
+             mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
+            cache_mock.side_effect = ValueError
             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
             self.call_main_on_test_file(['--replication', '1'])
             self.call_main_on_test_file(['--replication', '4'])
@@ -474,16 +461,17 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
                           ['--project-uuid', self.Z_UUID, '--stream'])
 
     def test_api_error_handling(self):
-        coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
-        coll_save_mock.side_effect = arvados.errors.ApiError(
+        collections_mock = mock.Mock(name='arv.collections()')
+        coll_create_mock = collections_mock().create().execute
+        coll_create_mock.side_effect = arvados.errors.ApiError(
             fake_httplib2_response(403), '{}')
-        with mock.patch('arvados.collection.Collection.save_new',
-                        new=coll_save_mock):
-            with self.assertRaises(SystemExit) as exc_test:
-                self.call_main_with_args(['/dev/null'])
-            self.assertLess(0, exc_test.exception.args[0])
-            self.assertLess(0, coll_save_mock.call_count)
-            self.assertEqual("", self.main_stdout.getvalue())
+        arv_put.api_client = arvados.api('v1')
+        arv_put.api_client.collections = collections_mock
+        with self.assertRaises(SystemExit) as exc_test:
+            self.call_main_with_args(['/dev/null'])
+        self.assertLess(0, exc_test.exception.args[0])
+        self.assertLess(0, coll_create_mock.call_count)
+        self.assertEqual("", self.main_stdout.getvalue())
 
 
 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
index 8d3e9d62a1c32cf15c17fa3e264c58d19cd6f9ee..fc30a242eba1bfc665a05747de66f999869ef8a4 100644 (file)
@@ -1110,13 +1110,16 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
     def test_only_small_blocks_are_packed_together(self):
         c = Collection()
         # Write a couple of small files, 
-        with c.open("count.txt", "w") as f:
-            f.write("0123456789")
-        with c.open("foo.txt", "w") as foo:
-            foo.write("foo")
+        f = c.open("count.txt", "w")
+        f.write("0123456789")
+        f.close(flush=False)
+        foo = c.open("foo.txt", "w")
+        foo.write("foo")
+        foo.close(flush=False)
         # Then, write a big file, it shouldn't be packed with the ones above
-        with c.open("bigfile.txt", "w") as big:
-            big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
+        big = c.open("bigfile.txt", "w")
+        big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
+        big.close(flush=False)
         self.assertEqual(
             c.manifest_text("."),
             '. 2d303c138c118af809f39319e5d507e9+34603008 a8430a058b8fbf408e1931b794dbd6fb+13 0:34603008:bigfile.txt 34603008:10:count.txt 34603018:3:foo.txt\n')
index 43c5b30a1f603fd94b828b3449f71c01b5ba716a..3a16e30e9ec545840b3be592138a0b2aacf34694 100644 (file)
@@ -166,6 +166,10 @@ class Container < ArvadosModel
             uuids: uuid_list)
   end
 
+  def final?
+    [Complete, Cancelled].include?(self.state)
+  end
+
   protected
 
   def fill_field_defaults
@@ -305,7 +309,7 @@ class Container < ArvadosModel
   def handle_completed
     # This container is finished so finalize any associated container requests
     # that are associated with this container.
-    if self.state_changed? and [Complete, Cancelled].include? self.state
+    if self.state_changed? and self.final?
       act_as_system_user do
 
         if self.state == Cancelled
@@ -337,7 +341,7 @@ class Container < ArvadosModel
         # Notify container requests associated with this container
         ContainerRequest.where(container_uuid: uuid,
                                state: ContainerRequest::Committed).each do |cr|
-          cr.container_completed!
+          cr.finalize!
         end
 
         # Try to cancel any outstanding container requests made by this container.
index a588c86451a88c2205472ba2dcc19eaff3dd15d3..696b873bde383ade05993a7d7c33800b70dffe04 100644 (file)
@@ -19,6 +19,7 @@ class ContainerRequest < ArvadosModel
   validate :validate_change
   validate :validate_runtime_constraints
   after_save :update_priority
+  after_save :finalize_if_needed
   before_create :set_requesting_container_uuid
 
   api_accessible :user, extend: :common do |t|
@@ -65,10 +66,19 @@ class ContainerRequest < ArvadosModel
     %w(modified_by_client_uuid container_uuid requesting_container_uuid)
   end
 
+  def finalize_if_needed
+    if state == Committed && Container.find_by_uuid(container_uuid).final?
+      reload
+      act_as_system_user do
+        finalize!
+      end
+    end
+  end
+
   # Finalize the container request after the container has
   # finished/cancelled.
-  def container_completed!
-    update_attributes!(state: ContainerRequest::Final)
+  def finalize!
+    update_attributes!(state: Final)
     c = Container.find_by_uuid(container_uuid)
     ['output', 'log'].each do |out_type|
       pdh = c.send(out_type)
index 3b175742370b93bf983754d06e459115a7180d84..1c5c7ae5cea5a55da5c2af9500e42321cd5811f8 100644 (file)
@@ -481,4 +481,31 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal prev_container_uuid, cr.container_uuid
   end
 
+  test "Finalize committed request when reusing a finished container" do
+    set_user_from_auth :active
+    cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+    cr.reload
+    assert_equal ContainerRequest::Committed, cr.state
+    act_as_system_user do
+      c = Container.find_by_uuid(cr.container_uuid)
+      c.update_attributes!(state: Container::Locked)
+      c.update_attributes!(state: Container::Running)
+      c.update_attributes!(state: Container::Complete,
+                           exit_code: 0,
+                           output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
+                           log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+    end
+    cr.reload
+    assert_equal ContainerRequest::Final, cr.state
+
+    cr2 = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+    assert_equal cr.container_uuid, cr2.container_uuid
+    assert_equal ContainerRequest::Final, cr2.state
+
+    cr3 = create_minimal_req!(priority: 1, state: ContainerRequest::Uncommitted)
+    assert_equal ContainerRequest::Uncommitted, cr3.state
+    cr3.update_attributes!(state: ContainerRequest::Committed)
+    assert_equal cr.container_uuid, cr3.container_uuid
+    assert_equal ContainerRequest::Final, cr3.state
+  end
 end
index f71c2ffbb5482ee6b93914ad4c7d824a555325cc..c41a5f3465d61403959a366565a89ec671af236e 100644 (file)
@@ -2,7 +2,7 @@
 Description=Arvados git server
 Documentation=https://doc.arvados.org/
 After=network.target
-AssertPathExists=/etc/arvados/arvados-git-httpd/arvados-git-httpd.yml
+AssertPathExists=/etc/arvados/git-httpd/git-httpd.yml
 
 [Service]
 Type=notify
diff --git a/services/arv-git-httpd/doc.go b/services/arv-git-httpd/doc.go
deleted file mode 100644 (file)
index ff4599d..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-arv-git-httpd provides authenticated access to Arvados-hosted git repositories.
-
-See http://doc.arvados.org/install/install-arv-git-httpd.html.
-
-Example:
-
-       arv-git-httpd -address=:8000 -repo-root=/var/lib/arvados/git
-
-Options:
-
-       -address [host]:[port]
-
-Listen at the given host and port.
-
-Host can be a domain name, an IP address, or empty (listen on all
-addresses).
-
-Port can be a name, a port number, or 0 (choose an available port).
-
-       -repo-root path
-
-Directory containing git repositories. When a client requests either
-"foo/bar.git" or "foo/bar/.git", git-http-backend will be invoked on
-"path/foo/bar.git" or (if that doesn't exist) "path/foo/bar/.git".
-
-       -git-command path
-
-Location of the CGI program to execute for each authorized request
-(normally this is gitolite-shell if repositories are controlled by
-gitolite, otherwise git). It is invoked with a single argument,
-'http-backend'.  Default is /usr/bin/git.
-
-*/
-package main
index f0b98fab72382dfa02c2b12a144e2a6b9f5190c4..2caf1294e060c7af7f1323a10a75e837a417f177 100644 (file)
@@ -5,6 +5,7 @@ import (
        "net"
        "net/http"
        "net/http/cgi"
+       "os"
 )
 
 // gitHandler is an http.Handler that invokes git-http-backend (or
@@ -16,21 +17,29 @@ type gitHandler struct {
 }
 
 func newGitHandler() http.Handler {
+       const glBypass = "GL_BYPASS_ACCESS_CHECKS"
+       const glHome = "GITOLITE_HTTP_HOME"
+       var env []string
+       path := os.Getenv("PATH")
+       if theConfig.GitoliteHome != "" {
+               env = append(env,
+                       glHome+"="+theConfig.GitoliteHome,
+                       glBypass+"=1")
+               path = path + ":" + theConfig.GitoliteHome + "/bin"
+       } else if home, bypass := os.Getenv(glHome), os.Getenv(glBypass); home != "" || bypass != "" {
+               env = append(env, glHome+"="+home, glBypass+"="+bypass)
+               log.Printf("DEPRECATED: Passing through %s and %s environment variables. Use GitoliteHome configuration instead.", glHome, glBypass)
+       }
+       env = append(env,
+               "GIT_PROJECT_ROOT="+theConfig.RepoRoot,
+               "GIT_HTTP_EXPORT_ALL=",
+               "SERVER_ADDR="+theConfig.Listen,
+               "PATH="+path)
        return &gitHandler{
                Handler: cgi.Handler{
                        Path: theConfig.GitCommand,
                        Dir:  theConfig.RepoRoot,
-                       Env: []string{
-                               "GIT_PROJECT_ROOT=" + theConfig.RepoRoot,
-                               "GIT_HTTP_EXPORT_ALL=",
-                               "SERVER_ADDR=" + theConfig.Listen,
-                       },
-                       InheritEnv: []string{
-                               "PATH",
-                               // Needed if GitCommand is gitolite-shell:
-                               "GITOLITE_HTTP_HOME",
-                               "GL_BYPASS_ACCESS_CHECKS",
-                       },
+                       Env:  env,
                        Args: []string{"http-backend"},
                },
        }
index d87162dca3aa6f80ac16411c4a138e6286fc40e2..6b08eeecdc303a246cba91873fd5c44a4ddddb4a 100644 (file)
@@ -4,7 +4,6 @@ import (
        "net/http"
        "net/http/httptest"
        "net/url"
-       "os"
        "regexp"
 
        check "gopkg.in/check.v1"
@@ -15,6 +14,10 @@ var _ = check.Suite(&GitHandlerSuite{})
 type GitHandlerSuite struct{}
 
 func (s *GitHandlerSuite) TestEnvVars(c *check.C) {
+       theConfig = defaultConfig()
+       theConfig.RepoRoot = "/"
+       theConfig.GitoliteHome = "/test/ghh"
+
        u, err := url.Parse("git.zzzzz.arvadosapi.com/test")
        c.Check(err, check.Equals, nil)
        resp := httptest.NewRecorder()
@@ -26,15 +29,14 @@ func (s *GitHandlerSuite) TestEnvVars(c *check.C) {
        h := newGitHandler()
        h.(*gitHandler).Path = "/bin/sh"
        h.(*gitHandler).Args = []string{"-c", "printf 'Content-Type: text/plain\r\n\r\n'; env"}
-       os.Setenv("GITOLITE_HTTP_HOME", "/test/ghh")
-       os.Setenv("GL_BYPASS_ACCESS_CHECKS", "yesplease")
 
        h.ServeHTTP(resp, req)
 
        c.Check(resp.Code, check.Equals, http.StatusOK)
        body := resp.Body.String()
+       c.Check(body, check.Matches, `(?ms).*^PATH=.*:/test/ghh/bin$.*`)
        c.Check(body, check.Matches, `(?ms).*^GITOLITE_HTTP_HOME=/test/ghh$.*`)
-       c.Check(body, check.Matches, `(?ms).*^GL_BYPASS_ACCESS_CHECKS=yesplease$.*`)
+       c.Check(body, check.Matches, `(?ms).*^GL_BYPASS_ACCESS_CHECKS=1$.*`)
        c.Check(body, check.Matches, `(?ms).*^REMOTE_HOST=::1$.*`)
        c.Check(body, check.Matches, `(?ms).*^REMOTE_PORT=12345$.*`)
        c.Check(body, check.Matches, `(?ms).*^SERVER_ADDR=`+regexp.QuoteMeta(theConfig.Listen)+`$.*`)
index 74c2b8cf4d91a8ac3da2835b12e90a35e6dd0380..38ff2309c1d2a3a57def17ac588276dab204e265 100644 (file)
@@ -48,9 +48,10 @@ func (s *GitoliteSuite) SetUpTest(c *check.C) {
                        APIHost:  arvadostest.APIHost(),
                        Insecure: true,
                },
-               Listen:     ":0",
-               GitCommand: "/usr/share/gitolite3/gitolite-shell",
-               RepoRoot:   s.tmpRepoRoot,
+               Listen:       ":0",
+               GitCommand:   "/usr/share/gitolite3/gitolite-shell",
+               GitoliteHome: s.gitoliteHome,
+               RepoRoot:     s.tmpRepoRoot,
        }
        s.IntegrationSuite.SetUpTest(c)
 
@@ -58,9 +59,6 @@ func (s *GitoliteSuite) SetUpTest(c *check.C) {
        // (*IntegrationTest)SetUpTest() -- see 2.2.4 at
        // http://gitolite.com/gitolite/gitolite.html
        runGitolite("gitolite", "setup")
-
-       os.Setenv("GITOLITE_HTTP_HOME", s.gitoliteHome)
-       os.Setenv("GL_BYPASS_ACCESS_CHECKS", "1")
 }
 
 func (s *GitoliteSuite) TearDownTest(c *check.C) {
index 5e55eca754838d97d2aaa8888482c686306a42cf..1d252599cdf3078b9924318980d91e031feb687d 100644 (file)
@@ -112,6 +112,8 @@ func (s *IntegrationSuite) TearDownTest(c *check.C) {
        s.tmpWorkdir = ""
 
        s.Config = nil
+
+       theConfig = defaultConfig()
 }
 
 func (s *IntegrationSuite) RunGit(c *check.C, token, gitCmd, repo string, args ...string) error {
index dd281366b29ac886365056ad8c2c4e2250a0d739..3bd7b3a8aa93c5871ff18773188c1336033d2599 100644 (file)
@@ -14,27 +14,24 @@ import (
 
 // Server configuration
 type Config struct {
-       Client     arvados.Client
-       Listen     string
-       GitCommand string
-       RepoRoot   string
+       Client       arvados.Client
+       Listen       string
+       GitCommand   string
+       RepoRoot     string
+       GitoliteHome string
 }
 
 var theConfig = defaultConfig()
 
 func defaultConfig() *Config {
-       cwd, err := os.Getwd()
-       if err != nil {
-               log.Fatalln("Getwd():", err)
-       }
        return &Config{
                Listen:     ":80",
                GitCommand: "/usr/bin/git",
-               RepoRoot:   cwd,
+               RepoRoot:   "/var/lib/arvados/git/repositories",
        }
 }
 
-func init() {
+func main() {
        const defaultCfgPath = "/etc/arvados/git-httpd/git-httpd.yml"
        const deprecated = " (DEPRECATED -- use config file instead)"
        flag.StringVar(&theConfig.Listen, "address", theConfig.Listen,
@@ -43,6 +40,8 @@ func init() {
                "Path to git or gitolite-shell executable. Each authenticated request will execute this program with a single argument, \"http-backend\"."+deprecated)
        flag.StringVar(&theConfig.RepoRoot, "repo-root", theConfig.RepoRoot,
                "Path to git repositories."+deprecated)
+       flag.StringVar(&theConfig.GitoliteHome, "gitolite-home", theConfig.GitoliteHome,
+               "Value for GITOLITE_HTTP_HOME environment variable. If not empty, GL_BYPASS_ACCESS_CHECKS=1 will also be set."+deprecated)
 
        cfgPath := flag.String("config", defaultCfgPath, "Configuration file `path`.")
        flag.Usage = usage
@@ -63,9 +62,7 @@ func init() {
                        log.Print("Current configuration:\n", string(j))
                }
        }
-}
 
-func main() {
        srv := &server{}
        if err := srv.Start(); err != nil {
                log.Fatal(err)
index 666edc01aa54f8a3ae0545809403a69be7850f20..1fb25b92f2f17286c576203f609e20ec82f3b300 100644 (file)
@@ -1,33 +1,40 @@
+// arvados-git-httpd provides authenticated access to Arvados-hosted
+// git repositories.
+//
+// See http://doc.arvados.org/install/install-arv-git-httpd.html.
 package main
 
 import (
-       "encoding/json"
        "flag"
        "fmt"
        "os"
+
+       "github.com/ghodss/yaml"
 )
 
 func usage() {
        c := defaultConfig()
        c.Client.APIHost = "zzzzz.arvadosapi.com:443"
-       exampleConfigFile, err := json.MarshalIndent(c, "    ", "  ")
+       exampleConfigFile, err := yaml.Marshal(c)
        if err != nil {
                panic(err)
        }
        fmt.Fprintf(os.Stderr, `
 
-arv-git-httpd provides authenticated access to Arvados-hosted git repositories.
+arvados-git-httpd provides authenticated access to Arvados-hosted git
+repositories.
 
 See http://doc.arvados.org/install/install-arv-git-httpd.html.
 
-Usage: arv-git-httpd [-config path/to/arv-git-httpd.yml]
+Usage: arvados-git-httpd [-config path/to/arvados/git-httpd.yml]
 
 Options:
 `)
        flag.PrintDefaults()
        fmt.Fprintf(os.Stderr, `
 Example config file:
-    %s
+
+%s
 
 Client.APIHost:
 
@@ -42,21 +49,29 @@ Client.Insecure:
     True if your Arvados API endpoint uses an unverifiable SSL/TLS
     certificate.
 
-Listen:
-
-    Local port to listen on. Can be "address:port" or ":port", where
-    "address" is a host IP address or name and "port" is a port number
-    or name.
-
 GitCommand:
 
     Path to git or gitolite-shell executable. Each authenticated
     request will execute this program with the single argument
     "http-backend".
 
+GitoliteHome:
+
+    Path to Gitolite's home directory. If a non-empty path is given,
+    the CGI environment will be set up to support the use of
+    gitolite-shell as a GitCommand: for example, if GitoliteHome is
+    "/gh", then the CGI environment will have GITOLITE_HTTP_HOME=/gh,
+    PATH=$PATH:/gh/bin, and GL_BYPASS_ACCESS_CHECKS=1.
+
+Listen:
+
+    Local port to listen on. Can be "address:port" or ":port", where
+    "address" is a host IP address or name and "port" is a port number
+    or name.
+
 RepoRoot:
 
-    Path to git repositories. Defaults to current working directory.
+    Path to git repositories.
 
 `, exampleConfigFile)
 }
index d15f01792a8e8dd4b433611ce20a752c1138c877..3f89732bea25dcd1ca546fbef126227e9e0a9256 100644 (file)
@@ -77,6 +77,8 @@ class ArgumentParser(argparse.ArgumentParser):
         self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024)
         self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024)
 
+        self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
+
         self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
         self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
 
@@ -111,7 +113,7 @@ class Mount(object):
 
     def __enter__(self):
         llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-        if self.listen_for_events:
+        if self.listen_for_events and not self.args.disable_event_listening:
             self.operations.listen_for_events()
         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
         self.llfuse_thread.daemon = True
@@ -330,7 +332,7 @@ From here, the following directories are available:
                 self.daemon_ctx.open()
 
             # Subscribe to change events from API server
-            if self.listen_for_events:
+            if self.listen_for_events and not self.args.disable_event_listening:
                 self.operations.listen_for_events()
 
             self._llfuse_main()
index bb80d0a2fc94dc4c77c0f46f59414a8d00627235..e8488d7ff967179423f3732c8e6e56b05194ed58 100644 (file)
@@ -170,6 +170,20 @@ class MountArgsTest(unittest.TestCase):
                          run_test_server.fixture('users')['active']['uuid'])
         self.assertEqual(True, self.mnt.listen_for_events)
 
+    @noexit
+    @mock.patch('arvados.events.subscribe')
+    def test_disable_event_listening(self, mock_subscribe):
+        args = arvados_fuse.command.ArgumentParser().parse_args([
+            '--disable-event-listening',
+            '--by-id',
+            '--foreground', self.mntdir])
+        self.mnt = arvados_fuse.command.Mount(args)
+        self.assertEqual(True, self.mnt.listen_for_events)
+        self.assertEqual(True, self.mnt.args.disable_event_listening)
+        with self.mnt:
+            pass
+        self.assertEqual(0, mock_subscribe.call_count)
+
     @noexit
     @mock.patch('arvados.events.subscribe')
     def test_custom(self, mock_subscribe):
index 8b6d01969a7819f52975c511f1d7a954aaaa324d..8e4510355d80d996a0f02a730945df20e56611e6 100644 (file)
@@ -1163,6 +1163,7 @@ class TokenExpiryTest(MountTestBase):
     def setUp(self):
         super(TokenExpiryTest, self).setUp(local_store=False)
 
+    @unittest.skip("bug #10008")
     @mock.patch('arvados.keep.KeepClient.get')
     def runTest(self, mocked_get):
         self.api._rootDesc = {"blobSignatureTtl": 2}
index 48b83de4b8aa2a40e62be953e236162d396ddae0..a04bc0b6fdf93296846ee5be48ffac46b535106f 100644 (file)
@@ -130,6 +130,9 @@ var (
 )
 
 func (vs *volumeSet) String() string {
+       if vs == nil {
+               return "[]"
+       }
        return fmt.Sprintf("%+v", (*vs)[:])
 }
 
index 518fe33d049a753cd9cece8b416c46dc4045e483..b6caa14ccce73f0026084427a30d579a1c363211 100755 (executable)
@@ -20,12 +20,11 @@ fi
 
 export ARVADOS_API_HOST=$localip:${services[api]}
 export ARVADOS_API_HOST_INSECURE=1
-export GITOLITE_HTTP_HOME=/var/lib/arvados/git
-export GL_BYPASS_ACCESS_CHECKS=1
 export PATH="$PATH:/var/lib/arvados/git/bin"
 cd ~git
 
 exec /usr/local/bin/arv-git-httpd \
-     -address=:${services[arv-git-httpd]} \
-     -git-command=/usr/share/gitolite3/gitolite-shell \
-     -repo-root=/var/lib/arvados/git/repositories
+    -address=:${services[arv-git-httpd]} \
+    -git-command=/usr/share/gitolite3/gitolite-shell \
+    -gitolite-home=/var/lib/arvados/git \
+    -repo-root=/var/lib/arvados/git/repositories