Merge branch '10025-arvbox-layers' closes #10025
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 21 Oct 2016 01:25:15 +0000 (21:25 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 21 Oct 2016 01:25:15 +0000 (21:25 -0400)
59 files changed:
apps/workbench/test/diagnostics/pipeline_test.rb
apps/workbench/test/diagnostics_test_helper.rb
crunch_scripts/cwl-runner
sdk/cli/bin/arv-run-pipeline-instance
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/crunch_script.py [new file with mode: 0644]
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_make_output.py
sdk/cwl/tests/test_pathmapper.py
sdk/cwl/tests/test_submit.py
sdk/go/arvados/duration.go
sdk/go/streamer/streamer.go
sdk/go/streamer/streamer_test.go
sdk/python/tests/run_test_server.py
services/api/Gemfile
services/api/Gemfile.lock
services/api/app/controllers/application_controller.rb
services/api/app/controllers/arvados/v1/schema_controller.rb
services/api/app/controllers/user_sessions_controller.rb
services/api/app/models/collection.rb
services/api/app/models/container_request.rb
services/api/app/models/user.rb
services/api/config/application.default.yml
services/api/db/migrate/20161019171346_add_use_existing_to_container_requests.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/lib/josh_id.rb
services/api/test/fixtures/collections.yml
services/api/test/functional/arvados/v1/jobs_controller_test.rb
services/api/test/functional/arvados/v1/schema_controller_test.rb
services/api/test/integration/user_sessions_test.rb
services/api/test/unit/container_request_test.rb
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/squeue.go
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/bufferpool_test.go
services/keepstore/config.go [new file with mode: 0644]
services/keepstore/deprecated.go [new file with mode: 0644]
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/keepstore.service [new file with mode: 0644]
services/keepstore/keepstore_test.go
services/keepstore/perms.go
services/keepstore/perms_test.go
services/keepstore/pull_worker_test.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/trash_worker.go
services/keepstore/trash_worker_test.go
services/keepstore/usage.go [new file with mode: 0644]
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go
services/login-sync/bin/arvados-login-sync

index f9e324ca41f84377cf9b4cb6bc284e58b3abebf2..d038222cf0cf58278818bd087e288a4e1c11b52c 100644 (file)
@@ -46,7 +46,7 @@ class PipelineTest < DiagnosticsTest
       page.assert_selector 'a,button', text: 'Pause'
 
       # Wait for pipeline run to complete
-      wait_until_page_has 'Complete', pipeline_config['max_wait_seconds']
+      wait_until_page_has 'completed', pipeline_config['max_wait_seconds']
     end
   end
 
index c7433bb247450464fb42fa780e309ce09fdb27b7..3587721edae7bc6e96778efceaaedfadddbeacd3 100644 (file)
@@ -24,9 +24,12 @@ class DiagnosticsTest < ActionDispatch::IntegrationTest
   # Looks for the text_to_look_for for up to the max_time provided
   def wait_until_page_has text_to_look_for, max_time=30
     max_time = 30 if (!max_time || (max_time.to_s != max_time.to_i.to_s))
+    text_found = false
     Timeout.timeout(max_time) do
-      loop until page.has_text?(text_to_look_for)
+      until text_found do
+        visit_page_with_token 'active', current_path
+        text_found = has_text?(text_to_look_for)
+      end
     end
   end
-
 end
index 5d7f553fecad9d4383e7326cabfd1419612dd9e2..ff2addf8fd9e6f874ac1a96a4025afef2afec326 100755 (executable)
@@ -1,15 +1,30 @@
 #!/usr/bin/env python
 
-# Crunch script integration for running arvados-cwl-runner (importing
-# arvados_cwl module) inside a crunch job.
-#
+# Crunch script integration for running arvados-cwl-runner inside a crunch job.
+
+import arvados_cwl
+import sys
+
+try:
+    # Use the crunch script defined in the arvados_cwl package.  This helps
+    # prevent the crunch script from going out of sync with the rest of the
+    # arvados_cwl package.
+    import arvados_cwl.crunch_script
+    arvados_cwl.crunch_script.run()
+    sys.exit()
+except ImportError:
+    pass
+
+# When running against an older arvados-cwl-runner package without
+# arvados_cwl.crunch_script, fall back to the old code.
+
+
 # This gets the job record, transforms the script parameters into a valid CWL
 # input object, then executes the CWL runner to run the underlying workflow or
 # tool.  When the workflow completes, record the output object in an output
 # collection for this runner job.
 
 import arvados
-import arvados_cwl
 import arvados.collection
 import arvados.util
 import cwltool.main
index 6dc82c5a20b841b1aeb1400ecdaf7dd6c21d4ed5..bcb11d1d706d1fc6be68b340d0038daf6cc43266 100755 (executable)
@@ -364,22 +364,25 @@ class WhRunPipelineInstance
     @components.each do |componentname, component|
       component[:script_parameters].each do |parametername, parameter|
         parameter = { :value => parameter } unless parameter.is_a? Hash
-        value =
-          (params["#{componentname}::#{parametername}"] ||
-           parameter[:value] ||
-           (parameter[:output_of].nil? &&
-            (params[parametername.to_s] ||
-             parameter[:default])) ||
-           nil)
-        if value.nil? and
-            ![false,'false',0,'0'].index parameter[:required]
-          if parameter[:output_of]
-            if not @components[parameter[:output_of].intern]
-              errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
-            end
-            next
+        if params.has_key?("#{componentname}::#{parametername}")
+          value = params["#{componentname}::#{parametername}"]
+        elsif parameter.has_key?(:value)
+          value = parameter[:value]
+        elsif parameter.has_key?(:output_of)
+          if !@components[parameter[:output_of].intern]
+            errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
+          else
+            # value will be filled in later when the upstream
+            # component's output becomes known
           end
+          next
+        elsif params.has_key?(parametername.to_s)
+          value = params[parametername.to_s]
+        elsif parameter.has_key?(:default)
+          value = parameter[:default]
+        else
           errors << [componentname, parametername, "required parameter is missing"]
+          next
         end
         debuglog "parameter #{componentname}::#{parametername} == #{value}"
 
index 7ebb13f1bb48af456ce50f2c7d8629e03c975cb0..f5f326bde12bf17c69c5886be2ceb8a3f00ed3d7 100644 (file)
@@ -58,7 +58,6 @@ class ArvCwlRunner(object):
         self.uploaded = {}
         self.num_retries = 4
         self.uuid = None
-        self.work_api = work_api
         self.stop_polling = threading.Event()
         self.poll_api = None
         self.pipeline = None
@@ -71,12 +70,20 @@ class ArvCwlRunner(object):
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
-        if self.work_api is None:
-            # todo: autodetect API to use.
-            self.work_api = "jobs"
-
-        if self.work_api not in ("containers", "jobs"):
-            raise Exception("Unsupported API '%s'" % self.work_api)
+        for api in ["jobs", "containers"]:
+            try:
+                methods = self.api._rootDesc.get('resources')[api]['methods']
+                if ('httpMethod' in methods['create'] and
+                    (work_api == api or work_api is None)):
+                    self.work_api = api
+                    break
+            except KeyError:
+                pass
+        if not self.work_api:
+            if work_api is None:
+                raise Exception("No supported APIs")
+            else:
+                raise Exception("Unsupported API '%s'" % work_api)
 
     def arv_make_tool(self, toolpath_object, **kwargs):
         kwargs["work_api"] = self.work_api
index 8269eeebdbd417ba908c8e6ca56e8535fd1ffcd9..a97453151b0568ab57f79ca5e30e932fa704d1fa 100644 (file)
@@ -257,7 +257,7 @@ class RunnerJob(Runner):
         job_spec = self.arvados_job_spec(*args, **kwargs)
 
         for k,v in job_spec["script_parameters"].items():
-            if isinstance(v, dict):
+            if v is False or v is None or isinstance(v, dict):
                 job_spec["script_parameters"][k] = {"value": v}
 
         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py
new file mode 100644 (file)
index 0000000..e5c6d67
--- /dev/null
@@ -0,0 +1,102 @@
+# Crunch script integration for running arvados-cwl-runner (importing
+# arvados_cwl module) inside a crunch job.
+#
+# This gets the job record, transforms the script parameters into a valid CWL
+# input object, then executes the CWL runner to run the underlying workflow or
+# tool.  When the workflow completes, record the output object in an output
+# collection for this runner job.
+
+import arvados
+import arvados_cwl
+import arvados.collection
+import arvados.util
+import cwltool.main
+import logging
+import os
+import json
+import argparse
+import re
+import functools
+
+from arvados.api import OrderedJsonModel
+from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
+from cwltool.load_tool import load_tool
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+def run():
+    # Print package versions
+    logger.info(arvados_cwl.versionstring())
+
+    api = arvados.api("v1")
+
+    arvados_cwl.add_arv_hints()
+
+    try:
+        job_order_object = arvados.current_job()['script_parameters']
+
+        pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
+
+        def keeppath(v):
+            if pdh_path.match(v):
+                return "keep:%s" % v
+            else:
+                return v
+
+        def keeppathObj(v):
+            v["location"] = keeppath(v["location"])
+
+        job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
+
+        for k,v in job_order_object.items():
+            if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
+                job_order_object[k] = {
+                    "class": "File",
+                    "location": "keep:%s" % v
+                }
+
+        adjustFileObjs(job_order_object, keeppathObj)
+        adjustDirObjs(job_order_object, keeppathObj)
+        normalizeFilesDirs(job_order_object)
+        adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
+
+        output_name = None
+        if "arv:output_name" in job_order_object:
+            output_name = job_order_object["arv:output_name"]
+            del job_order_object["arv:output_name"]
+
+        runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
+                                          output_name=output_name)
+
+        t = load_tool(job_order_object, runner.arv_make_tool)
+
+        args = argparse.Namespace()
+        args.project_uuid = arvados.current_job()["owner_uuid"]
+        args.enable_reuse = True
+        args.submit = False
+        args.debug = True
+        args.quiet = False
+        args.ignore_docker_for_reuse = False
+        args.basedir = os.getcwd()
+        args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
+        outputObj = runner.arv_executor(t, job_order_object, **vars(args))
+
+        if runner.final_output_collection:
+            outputCollection = runner.final_output_collection.portable_data_hash()
+        else:
+            outputCollection = None
+
+        api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+                                             body={
+                                                 'output': outputCollection,
+                                                 'success': True,
+                                                 'progress':1.0
+                                             }).execute()
+    except Exception as e:
+        logging.exception("Unhandled exception")
+        api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+                                             body={
+                                                 'output': None,
+                                                 'success': False,
+                                                 'progress':1.0
+                                             }).execute()
index 054d3530cfe174b9a5da3025d248097ca1062d7a..6d382450e2fa83f4c81b8567224f742c3c389db2 100644 (file)
@@ -129,8 +129,6 @@ def upload_instance(arvrunner, name, tool, job_order):
                                         job_order.get("id", "#"),
                                         False)
 
-        adjustDirObjs(job_order, trim_listing)
-
         if "id" in job_order:
             del job_order["id"]
 
@@ -153,7 +151,9 @@ class Runner(object):
 
     def arvados_job_spec(self, *args, **kwargs):
         self.name = os.path.basename(self.tool.tool["id"])
-        return upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
+        workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
+        adjustDirObjs(self.job_order, trim_listing)
+        return workflowmapper
 
     def done(self, record):
         if record["state"] == "Complete":
index 8d12aab7e2e6a931a3a92db17b587e5c3d8db036..7abc5676cc2f04efd0a74b8bac8271b7abab8b71 100644 (file)
@@ -1,10 +1,11 @@
+import functools
+import json
 import logging
 import mock
-import unittest
 import os
-import functools
-import json
+import unittest
 
+import arvados
 import arvados_cwl
 import cwltool.process
 from schema_salad.ref_resolver import Loader
@@ -213,62 +214,72 @@ class TestWorkflow(unittest.TestCase):
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
     @mock.patch("arvados.collection.Collection")
     def test_run(self, mockcollection):
-        try:
-            arvados_cwl.add_arv_hints()
+        arvados_cwl.add_arv_hints()
 
-            runner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
-            runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
-            runner.ignore_docker_for_reuse = False
-            runner.num_retries = 0
-            document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+        api = mock.MagicMock()
+        api._rootDesc = arvados.api('v1')._rootDesc
+        runner = arvados_cwl.ArvCwlRunner(api)
+        self.assertEqual(runner.work_api, 'jobs')
 
-            tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
-            metadata["cwlVersion"] = tool["cwlVersion"]
+        runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+        runner.ignore_docker_for_reuse = False
+        runner.num_retries = 0
+        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
 
-            mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+        tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
+        metadata["cwlVersion"] = tool["cwlVersion"]
 
-            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
-            arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                  basedir="", make_fs_access=make_fs_access, loader=document_loader,
-                                                  makeTool=runner.arv_make_tool, metadata=metadata)
-            arvtool.formatgraph = None
-            it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
-            it.next().run()
-            it.next().run()
+        mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
 
-            with open("tests/wf/scatter2_subwf.cwl") as f:
-                subwf = f.read()
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
+                                              basedir="", make_fs_access=make_fs_access, loader=document_loader,
+                                              makeTool=runner.arv_make_tool, metadata=metadata)
+        arvtool.formatgraph = None
+        it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
+        it.next().run()
+        it.next().run()
 
-            mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
-            mockcollection().open().__enter__().write.assert_has_calls([mock.call('{sleeptime: 5}')])
+        with open("tests/wf/scatter2_subwf.cwl") as f:
+            subwf = f.read()
 
-            runner.api.jobs().create.assert_called_with(
-                body={
-                    'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
-                    'repository': 'arvados',
-                    'script_version': 'master',
-                    'script': 'crunchrunner',
-                    'script_parameters': {
-                        'tasks': [{'task.env': {
-                            'HOME': '$(task.outdir)',
-                            'TMPDIR': '$(task.tmpdir)'},
-                                   'task.vwd': {
-                                       'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
-                                       'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
-                                   },
-                        'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
-                        'task.stdout': 'cwl.output.json'}]},
-                    'runtime_constraints': {
-                        'min_scratch_mb_per_node': 2048,
-                        'min_cores_per_node': 1,
-                        'docker_image': 'arvados/jobs',
-                        'min_ram_mb_per_node': 1024
-                    },
-                    'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'},
-                filters=[['repository', '=', 'arvados'],
-                         ['script', '=', 'crunchrunner'],
-                         ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
-                         ['docker_image_locator', 'in docker', 'arvados/jobs']],
-                find_or_create=True)
-        except:
-            logging.exception("")
+        mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
+        mockcollection().open().__enter__().write.assert_has_calls([mock.call('{sleeptime: 5}')])
+
+        runner.api.jobs().create.assert_called_with(
+            body={
+                'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
+                'repository': 'arvados',
+                'script_version': 'master',
+                'script': 'crunchrunner',
+                'script_parameters': {
+                    'tasks': [{'task.env': {
+                        'HOME': '$(task.outdir)',
+                        'TMPDIR': '$(task.tmpdir)'},
+                               'task.vwd': {
+                                   'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
+                                   'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
+                               },
+                    'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
+                    'task.stdout': 'cwl.output.json'}]},
+                'runtime_constraints': {
+                    'min_scratch_mb_per_node': 2048,
+                    'min_cores_per_node': 1,
+                    'docker_image': 'arvados/jobs',
+                    'min_ram_mb_per_node': 1024
+                },
+                'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'},
+            filters=[['repository', '=', 'arvados'],
+                     ['script', '=', 'crunchrunner'],
+                     ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
+                     ['docker_image_locator', 'in docker', 'arvados/jobs']],
+            find_or_create=True)
+
+    def test_default_work_api(self):
+        arvados_cwl.add_arv_hints()
+
+        api = mock.MagicMock()
+        api._rootDesc = arvados.api('v1')._rootDesc
+        del api._rootDesc.get('resources')['jobs']['methods']['create']
+        runner = arvados_cwl.ArvCwlRunner(api)
+        self.assertEqual(runner.work_api, 'containers')
index 267129715494b9f05bfe9a2df220e0514da5e81c..cd66eb15065059579e718150372d1f0c03247688 100644 (file)
@@ -1,20 +1,24 @@
+import functools
+import json
 import logging
 import mock
-import unittest
 import os
-import functools
-import json
 import StringIO
+import unittest
 
+import arvados
 import arvados_cwl
 
 class TestMakeOutput(unittest.TestCase):
+    def setUp(self):
+        self.api = mock.MagicMock()
+        self.api._rootDesc = arvados.api('v1')._rootDesc
+
     @mock.patch("arvados.collection.Collection")
     @mock.patch("arvados.collection.CollectionReader")
     def test_make_output_collection(self, reader, col):
-        api = mock.MagicMock()
         keep_client = mock.MagicMock()
-        runner = arvados_cwl.ArvCwlRunner(api, keep_client=keep_client)
+        runner = arvados_cwl.ArvCwlRunner(self.api, keep_client=keep_client)
         runner.project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
 
         final = mock.MagicMock()
index 7e13066d855a92e292420c229ba4c34f8e324e00..57958f78d0a41a97b9d5b0aa5f10a2cf10563b22 100644 (file)
@@ -21,10 +21,14 @@ def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPatter
         c.fn = fnPattern % (pdh, os.path.basename(c.fn))
 
 class TestPathmap(unittest.TestCase):
+    def setUp(self):
+        self.api = mock.MagicMock()
+        self.api._rootDesc = arvados.api('v1')._rootDesc
+
     def test_keepref(self):
         """Test direct keep references."""
 
-        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+        arvrunner = arvados_cwl.ArvCwlRunner(self.api)
 
         p = ArvPathMapper(arvrunner, [{
             "class": "File",
@@ -38,7 +42,7 @@ class TestPathmap(unittest.TestCase):
     def test_upload(self, upl):
         """Test pathmapper uploading files."""
 
-        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+        arvrunner = arvados_cwl.ArvCwlRunner(self.api)
 
         upl.side_effect = upload_mock
 
@@ -54,7 +58,7 @@ class TestPathmap(unittest.TestCase):
     def test_prev_uploaded(self, upl):
         """Test pathmapper handling previously uploaded files."""
 
-        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+        arvrunner = arvados_cwl.ArvCwlRunner(self.api)
         arvrunner.add_uploaded('tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File'))
 
         upl.side_effect = upload_mock
@@ -71,7 +75,7 @@ class TestPathmap(unittest.TestCase):
     @mock.patch("arvados.commands.run.statfile")
     def test_statfile(self, statfile, upl):
         """Test pathmapper handling ArvFile references."""
-        arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+        arvrunner = arvados_cwl.ArvCwlRunner(self.api)
 
         # An ArvFile object returned from arvados.commands.run.statfile means the file is located on a
         # keep mount, so we can construct a direct reference directly without upload.
index d3bdf8fedc30d897b323cd23fce5e74a017f3da9..0ff9da927afafb9908bc32ab747e2fb169c5a9dc 100644 (file)
@@ -1,16 +1,17 @@
-import arvados
-import arvados.keep
-import arvados.collection
-import arvados_cwl
 import copy
 import cStringIO
 import functools
 import hashlib
+import json
+import logging
 import mock
 import sys
 import unittest
-import json
-import logging
+
+import arvados
+import arvados.collection
+import arvados_cwl
+import arvados.keep
 
 from .matcher import JsonDiffMatcher
 
@@ -38,6 +39,7 @@ def stubs(func):
 
 
         stubs.api = mock.MagicMock()
+        stubs.api._rootDesc = arvados.api('v1')._rootDesc
         stubs.api.users().current().execute.return_value = {
             "uuid": stubs.fake_user_uuid,
         }
index 7b87aee6ab7b14c875aa044e3b88e11ed22bd567..a64eaacf8347eb2d6937c84e2c1b0598994ce937 100644 (file)
@@ -28,6 +28,11 @@ func (d Duration) String() string {
        return time.Duration(d).String()
 }
 
+// Duration returns a time.Duration
+func (d Duration) Duration() time.Duration {
+       return time.Duration(d)
+}
+
 // Value implements flag.Value
 func (d *Duration) Set(s string) error {
        dur, err := time.ParseDuration(s)
index 2217dd3352eae69255b74b4faa5a74425efca0ee..a46ca4cc55aa5c3faa8ccdbe5b73339aacfa50ef 100644 (file)
@@ -36,15 +36,19 @@ Alternately, if you already have a filled buffer and just want to read out from
 package streamer
 
 import (
+       "errors"
        "io"
 )
 
+var ErrAlreadyClosed = errors.New("cannot close a stream twice")
+
 type AsyncStream struct {
        buffer            []byte
        requests          chan sliceRequest
        add_reader        chan bool
        subtract_reader   chan bool
        wait_zero_readers chan bool
+       closed            bool
 }
 
 // Reads from the buffer managed by the Transfer()
@@ -55,7 +59,13 @@ type StreamReader struct {
 }
 
 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-       t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+       t := &AsyncStream{
+               buffer:            make([]byte, buffersize),
+               requests:          make(chan sliceRequest),
+               add_reader:        make(chan bool),
+               subtract_reader:   make(chan bool),
+               wait_zero_readers: make(chan bool),
+       }
 
        go t.transfer(source)
        go t.readersMonitor()
@@ -64,7 +74,13 @@ func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
 }
 
 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+       t := &AsyncStream{
+               buffer:            buf,
+               requests:          make(chan sliceRequest),
+               add_reader:        make(chan bool),
+               subtract_reader:   make(chan bool),
+               wait_zero_readers: make(chan bool),
+       }
 
        go t.transfer(nil)
        go t.readersMonitor()
@@ -115,16 +131,24 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 
 // Close the responses channel
 func (this *StreamReader) Close() error {
+       if this.stream == nil {
+               return ErrAlreadyClosed
+       }
        this.stream.subtract_reader <- true
        close(this.responses)
        this.stream = nil
        return nil
 }
 
-func (this *AsyncStream) Close() {
+func (this *AsyncStream) Close() error {
+       if this.closed {
+               return ErrAlreadyClosed
+       }
+       this.closed = true
        this.wait_zero_readers <- true
        close(this.requests)
        close(this.add_reader)
        close(this.subtract_reader)
        close(this.wait_zero_readers)
+       return nil
 }
index 80aeb268975d8acbc7f7d1c2e771c17e7adfa719..f5333c37c175be1774eab5318824bb018f47f31f 100644 (file)
@@ -365,3 +365,13 @@ func (s *StandaloneSuite) TestManyReaders(c *C) {
        writer.Write([]byte("baz"))
        writer.Close()
 }
+
+func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
+       buffer := make([]byte, 100)
+       tr := AsyncStreamFromSlice(buffer)
+       sr := tr.MakeStreamReader()
+       c.Check(sr.Close(), IsNil)
+       c.Check(sr.Close(), Equals, ErrAlreadyClosed)
+       c.Check(tr.Close(), IsNil)
+       c.Check(tr.Close(), Equals, ErrAlreadyClosed)
+}
index e72f67dce49049f37c9b2e68794eb62cc780297c..642b7ccbad51846a9f1c25acc86f6b0505897c62 100644 (file)
@@ -394,8 +394,7 @@ def run_keep(blob_signing_key=None, enforce_permissions=False, num_servers=2):
     with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
         keep_args['-blob-signing-key-file'] = f.name
         f.write(blob_signing_key)
-    if enforce_permissions:
-        keep_args['-enforce-permissions'] = 'true'
+    keep_args['-enforce-permissions'] = str(enforce_permissions).lower()
     with open(os.path.join(TEST_TMPDIR, "keep.data-manager-token-file"), "w") as f:
         keep_args['-data-manager-token-file'] = f.name
         f.write(auth_token('data_manager'))
index 5134fc4ce82d68bf4b00da91a4bedaa10e7f7e1e..5d9b031e0295a62b86a6f0b4d6e9c13cc784da70 100644 (file)
@@ -73,7 +73,7 @@ gem 'faye-websocket'
 gem 'themes_for_rails'
 
 gem 'arvados', '>= 0.1.20150615153458'
-gem 'arvados-cli', '>= 0.1.20151207150126'
+gem 'arvados-cli', '>= 0.1.20161017193526'
 
 # pg_power lets us use partial indexes in schema.rb in Rails 3
 gem 'pg_power'
index 1fb4369c0eca56815d189b4a9c5bf7f87ab9e4ea..6f7875163b63fa6af8462f2999e42bad4902f37d 100644 (file)
@@ -42,12 +42,12 @@ GEM
       i18n (~> 0)
       json (~> 1.7, >= 1.7.7)
       jwt (>= 0.1.5, < 2)
-    arvados-cli (0.1.20160503204200)
+    arvados-cli (0.1.20161017193526)
       activesupport (~> 3.2, >= 3.2.13)
       andand (~> 1.3, >= 1.3.3)
       arvados (~> 0.1, >= 0.1.20150128223554)
       curb (~> 0.8)
-      google-api-client (~> 0.6, >= 0.6.3, < 0.9)
+      google-api-client (~> 0.6, >= 0.6.3, < 0.8.9)
       json (~> 1.7, >= 1.7.7)
       oj (~> 2.0, >= 2.0.3)
       trollop (~> 2.0)
@@ -236,7 +236,7 @@ DEPENDENCIES
   acts_as_api
   andand
   arvados (>= 0.1.20150615153458)
-  arvados-cli (>= 0.1.20151207150126)
+  arvados-cli (>= 0.1.20161017193526)
   coffee-rails (~> 3.2.0)
   database_cleaner
   factory_girl_rails
index 3c5bf94d2c4b06f8d1a1e301971cdf39673d8a44..776f7e190e06ad0a486dad78c04affe84493175a 100644 (file)
@@ -25,6 +25,7 @@ class ApplicationController < ActionController::Base
 
   ERROR_ACTIONS = [:render_error, :render_not_found]
 
+  before_filter :disable_api_methods
   before_filter :set_cors_headers
   before_filter :respond_with_json_by_default
   before_filter :remote_ip
@@ -385,6 +386,13 @@ class ApplicationController < ActionController::Base
     end
   end
 
+  def disable_api_methods
+    if Rails.configuration.disable_api_methods.
+        include?(controller_name + "." + action_name)
+      send_error("Disabled", status: 404)
+    end
+  end
+
   def set_cors_headers
     response.headers['Access-Control-Allow-Origin'] = '*'
     response.headers['Access-Control-Allow-Methods'] = 'GET, HEAD, PUT, POST, DELETE'
index ba0f90f90c45d95925df0ae7892815d0b5c490b6..2c55b15068ca3e8e1a2046b44d20dc4fa86ba32e 100644 (file)
@@ -399,6 +399,10 @@ class Arvados::V1::SchemaController < ApplicationController
           end
         end
       end
+      Rails.configuration.disable_api_methods.each do |method|
+        ctrl, action = method.split('.', 2)
+        discovery[:resources][ctrl][:methods].delete(action.to_sym)
+      end
       discovery
     end
     send_json discovery
index 795b114bf91930447fd777686015ef97ce001a49..8bb27a705e7fdeba37b224ddbb405183f51dccc4 100644 (file)
@@ -45,6 +45,9 @@ class UserSessionsController < ApplicationController
                       :identity_url => omniauth['info']['identity_url'],
                       :is_active => Rails.configuration.new_users_are_active,
                       :owner_uuid => system_user_uuid)
+      if omniauth['info']['username']
+        user.set_initial_username(requested: omniauth['info']['username'])
+      end
       act_as_system_user do
         user.save or raise Exception.new(user.errors.messages)
       end
index 4a054413ce21e0076e79100934f20d248c49198d..8579509de70e9eff1c46d25563ca239fcf9dff8d 100644 (file)
@@ -314,7 +314,7 @@ class Collection < ArvadosModel
         # looks like a saved Docker image.
         manifest = Keep::Manifest.new(coll_match.manifest_text)
         if manifest.exact_file_count?(1) and
-            (manifest.files[0][1] =~ /^[0-9A-Fa-f]{64}\.tar$/)
+            (manifest.files[0][1] =~ /^(sha256:)?[0-9A-Fa-f]{64}\.tar$/)
           return [coll_match]
         end
       end
index 696b873bde383ade05993a7d7c33800b70dffe04..1798150af3e83145a49762a65b778f5adf96995c 100644 (file)
@@ -41,6 +41,7 @@ class ContainerRequest < ArvadosModel
     t.add :requesting_container_uuid
     t.add :runtime_constraints
     t.add :state
+    t.add :use_existing
   end
 
   # Supported states for a container request
@@ -120,7 +121,8 @@ class ContainerRequest < ArvadosModel
                  container_image: c_container_image,
                  mounts: c_mounts,
                  runtime_constraints: c_runtime_constraints}
-      reusable = Container.find_reusable(c_attrs)
+
+      reusable = self.use_existing ? Container.find_reusable(c_attrs) : nil
       if not reusable.nil?
         reusable
       else
@@ -234,7 +236,7 @@ class ContainerRequest < ArvadosModel
                      :container_image, :cwd, :description, :environment,
                      :filters, :mounts, :name, :output_path, :priority,
                      :properties, :requesting_container_uuid, :runtime_constraints,
-                     :state, :container_uuid
+                     :state, :container_uuid, :use_existing
 
     when Committed
       if container_uuid.nil?
index 18d33a6b0b0a4c2b69e27ec2e387e2be867398b4..9363cc4f02aa04d08552b9e343bbda9f8dcda5c1 100644 (file)
@@ -261,6 +261,25 @@ class User < ArvadosModel
     self.save!
   end
 
+  def set_initial_username(requested: false)
+    if !requested.is_a?(String) || requested.empty?
+      email_parts = email.partition("@")
+      local_parts = email_parts.first.partition("+")
+      if email_parts.any?(&:empty?)
+        return
+      elsif not local_parts.first.empty?
+        requested = local_parts.first
+      else
+        requested = email_parts.first
+      end
+    end
+    requested.sub!(/^[^A-Za-z]+/, "")
+    requested.gsub!(/[^A-Za-z0-9]/, "")
+    unless requested.empty?
+      self.username = find_usable_username_from(requested)
+    end
+  end
+
   protected
 
   def ensure_ownership_path_leads_to_user
@@ -326,23 +345,6 @@ class User < ArvadosModel
     nil
   end
 
-  def set_initial_username
-    email_parts = email.partition("@")
-    local_parts = email_parts.first.partition("+")
-    if email_parts.any?(&:empty?)
-      return
-    elsif not local_parts.first.empty?
-      base_username = local_parts.first
-    else
-      base_username = email_parts.first
-    end
-    base_username.sub!(/^[^A-Za-z]+/, "")
-    base_username.gsub!(/[^A-Za-z0-9]/, "")
-    unless base_username.empty?
-      self.username = find_usable_username_from(base_username)
-    end
-  end
-
   def prevent_privilege_escalation
     if current_user.andand.is_admin
       return true
index bc752ed1d5f4d92b5ead8df2314e80a751f7b697..5fe03024bf7d3e9ccaee6d108c889c1078421d36 100644 (file)
@@ -264,6 +264,11 @@ common:
   # Use at your own risk.
   unlogged_attributes: []
 
+  # API methods to disable. Disabled methods are not listed in the
+  # discovery document, and respond 404 to all requests.
+  # Example: ["jobs.create", "pipeline_instances.create"]
+  disable_api_methods: []
+
   ###
   ### Crunch, DNS & compute node management
   ###
diff --git a/services/api/db/migrate/20161019171346_add_use_existing_to_container_requests.rb b/services/api/db/migrate/20161019171346_add_use_existing_to_container_requests.rb
new file mode 100644 (file)
index 0000000..100b83d
--- /dev/null
@@ -0,0 +1,9 @@
+class AddUseExistingToContainerRequests < ActiveRecord::Migration
+  def up
+    add_column :container_requests, :use_existing, :boolean, :default => true
+  end
+
+  def down
+    remove_column :container_requests, :use_existing
+  end
+end
index fa4760d8a0df2ad03781fb37bcdfa58beba2f2ae..0db782af69484e6a8e0c476620891702055f36c7 100644 (file)
@@ -290,7 +290,8 @@ CREATE TABLE container_requests (
     expires_at timestamp without time zone,
     filters text,
     updated_at timestamp without time zone NOT NULL,
-    container_count integer DEFAULT 0
+    container_count integer DEFAULT 0,
+    use_existing boolean DEFAULT true
 );
 
 
@@ -2691,4 +2692,6 @@ INSERT INTO schema_migrations (version) VALUES ('20160901210110');
 
 INSERT INTO schema_migrations (version) VALUES ('20160909181442');
 
-INSERT INTO schema_migrations (version) VALUES ('20160926194129');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20160926194129');
+
+INSERT INTO schema_migrations (version) VALUES ('20161019171346');
\ No newline at end of file
index a7e8ff2fc8d7024b23b309c5b3e2de67abf5741f..1a6e38a4318657ccba4f3b86af08bb094401aa4c 100644 (file)
@@ -17,6 +17,7 @@ module OmniAuth
           :last_name => raw_info['info']['last_name'],
           :email => raw_info['info']['email'],
           :identity_url => raw_info['info']['identity_url'],
+          :username => raw_info['info']['username'],
         }
       end
 
index b1154a8399e478d9fd2147fbb5fdb6cc8d46e016..9f2f41030028f87308798321e2f111b9cd69193f 100644 (file)
@@ -99,7 +99,7 @@ docker_image:
   uuid: zzzzz-4zz18-1v45jub259sjjgb
   # This Collection has links with Docker image metadata.
   portable_data_hash: fa3c1a9cb6783f85f2ecda037e07b8c3+167
-  owner_uuid: qr1hi-tpzed-000000000000000
+  owner_uuid: zzzzz-tpzed-000000000000000
   created_at: 2014-06-11T17:22:54Z
   modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
   modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
@@ -108,12 +108,25 @@ docker_image:
   manifest_text: ". d21353cfe035e3e384563ee55eadbb2f+67108864 5c77a43e329b9838cbec18ff42790e57+55605760 0:122714624:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678.tar\n"
   name: docker_image
 
+# untagged docker image with sha256:{hash}.tar filename
+docker_image_1_12:
+  uuid: zzzzz-4zz18-1g4g0vhpjn9wq7i
+  portable_data_hash: d740a57097711e08eb9b2a93518f20ab+174
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  created_at: 2016-10-19 08:50:45.653552268 Z
+  modified_by_client_uuid: zzzzz-ozdt8-teyxzyd8qllg11h
+  modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  modified_at: 2016-10-19 08:50:45.652930000 Z
+  updated_at: 2016-10-19 08:50:45.652930000 Z
+  manifest_text: ". d21353cfe035e3e384563ee55eadbb2f+67108864 5c77a43e329b9838cbec18ff42790e57+55605760 0:122714624:sha256:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678.tar\n"
+  name: docker_image_1_12
+
 unlinked_docker_image:
   uuid: zzzzz-4zz18-d0d8z5wofvfgwad
   # This Collection contains a file that looks like a Docker image,
   # but has no Docker metadata links pointing to it.
   portable_data_hash: 9ae44d5792468c58bcf85ce7353c7027+124
-  owner_uuid: qr1hi-tpzed-000000000000000
+  owner_uuid: zzzzz-tpzed-000000000000000
   created_at: 2014-06-11T17:22:54Z
   modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
   modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
index b84c93df08a5993da973da4ebc05683ff08ad573..3c11b3e00940fefb644b2ea4b3e64f81ffdfed2b 100644 (file)
@@ -508,4 +508,17 @@ class Arvados::V1::JobsControllerTest < ActionController::TestCase
     assert_not_nil json_response["components"]
     assert_equal [], json_response["components"].keys
   end
+
+  test 'jobs.create disabled in config' do
+    Rails.configuration.disable_api_methods = ["jobs.create",
+                                               "pipeline_instances.create"]
+    authorize_with :active
+    post :create, job: {
+      script: "hash",
+      script_version: "master",
+      repository: "active/foo",
+      script_parameters: {}
+    }
+    assert_response 404
+  end
 end
index f651d81eb3dd13a049896e0e9cdccb87eda6d548..2e370ec9cd63db9b61f6e93ab15111d028f469e0 100644 (file)
@@ -42,4 +42,23 @@ class Arvados::V1::SchemaControllerTest < ActionController::TestCase
     discovery_doc = JSON.parse(@response.body)
     assert_equal 'aaa888fff', discovery_doc['source_version']
   end
+
+  test "empty disable_api_methods" do
+    get :index
+    assert_response :success
+    discovery_doc = JSON.parse(@response.body)
+    assert_equal('POST',
+                 discovery_doc['resources']['jobs']['methods']['create']['httpMethod'])
+  end
+
+  test "non-empty disable_api_methods" do
+    Rails.configuration.disable_api_methods =
+      ['jobs.create', 'pipeline_instances.create', 'pipeline_templates.create']
+    get :index
+    assert_response :success
+    discovery_doc = JSON.parse(@response.body)
+    ['jobs', 'pipeline_instances', 'pipeline_templates'].each do |r|
+      refute_includes(discovery_doc['resources'][r]['methods'].keys(), 'create')
+    end
+  end
 end
index 814e6eb670c0ca77c193ef10af805d34282708aa..7a9f9176d335c02c1a7dc425cf4212dfb85e0ea5 100644 (file)
@@ -5,7 +5,7 @@ class UserSessionsApiTest < ActionDispatch::IntegrationTest
     'https://wb.example.com'
   end
 
-  def mock_auth_with_email email
+  def mock_auth_with(email: nil, username: nil)
     mock = {
       'provider' => 'josh_id',
       'uid' => 'https://edward.example.com',
@@ -14,17 +14,30 @@ class UserSessionsApiTest < ActionDispatch::IntegrationTest
         'name' => 'Edward Example',
         'first_name' => 'Edward',
         'last_name' => 'Example',
-        'email' => email,
       },
     }
+    mock['info']['email'] = email unless email.nil?
+    mock['info']['username'] = username unless username.nil?
     post('/auth/josh_id/callback',
          {return_to: client_url},
          {'omniauth.auth' => mock})
     assert_response :redirect, 'Did not redirect to client with token'
   end
 
+  test 'assign username from sso' do
+    mock_auth_with(email: 'foo@example.com', username: 'bar')
+    u = assigns(:user)
+    assert_equal 'bar', u.username
+  end
+
+  test 'no assign username from sso' do
+    mock_auth_with(email: 'foo@example.com')
+    u = assigns(:user)
+    assert_equal 'foo', u.username
+  end
+
   test 'create new user during omniauth callback' do
-    mock_auth_with_email 'edward@example.com'
+    mock_auth_with(email: 'edward@example.com')
     assert_equal(0, @response.redirect_url.index(client_url),
                  'Redirected to wrong address after succesful login: was ' +
                  @response.redirect_url + ', expected ' + client_url + '[...]')
@@ -61,7 +74,7 @@ class UserSessionsApiTest < ActionDispatch::IntegrationTest
       Rails.configuration.auto_setup_new_users_with_repository =
         testcase[:cfg][:repo]
 
-      mock_auth_with_email testcase[:email]
+      mock_auth_with(email: testcase[:email])
       u = assigns(:user)
       vm_links = Link.where('link_class=? and tail_uuid=? and head_uuid like ?',
                             'permission', u.uuid,
index 1c5c7ae5cea5a55da5c2af9500e42321cd5811f8..ecfbe465e73e7f78598c5963a7fd62d287157f85 100644 (file)
@@ -384,10 +384,12 @@ class ContainerRequestTest < ActiveSupport::TestCase
 
   test "container_image_for_container(pdh)" do
     set_user_from_auth :active
-    pdh = collections(:docker_image).portable_data_hash
-    cr = ContainerRequest.new(container_image: pdh)
-    resolved = cr.send :container_image_for_container
-    assert_equal resolved, pdh
+    [:docker_image, :docker_image_1_12].each do |coll|
+      pdh = collections(coll).portable_data_hash
+      cr = ContainerRequest.new(container_image: pdh)
+      resolved = cr.send :container_image_for_container
+      assert_equal resolved, pdh
+    end
   end
 
   ['acbd18db4cc2f85cedef654fccc4a4d8+3',
@@ -410,10 +412,12 @@ class ContainerRequestTest < ActiveSupport::TestCase
   end
 
   [
-    [{"var" => "value1"}, {"var" => "value1"}],
-    [{"var" => "value1"}, {"var" => "value2"}]
-  ].each do |env1, env2|
-    test "Container request #{(env1 == env2) ? 'does' : 'does not'} reuse container when committed" do
+    [{"var" => "value1"}, {"var" => "value1"}, nil],
+    [{"var" => "value1"}, {"var" => "value1"}, true],
+    [{"var" => "value1"}, {"var" => "value1"}, false],
+    [{"var" => "value1"}, {"var" => "value2"}, nil],
+  ].each do |env1, env2, use_existing|
+    test "Container request #{((env1 == env2) and (use_existing.nil? or use_existing == true)) ? 'does' : 'does not'} reuse container when committed#{use_existing.nil? ? '' : use_existing ? ' and use_existing == true' : ' and use_existing == false'}" do
       common_attrs = {cwd: "test",
                       priority: 1,
                       command: ["echo", "hello"],
@@ -424,16 +428,27 @@ class ContainerRequestTest < ActiveSupport::TestCase
       set_user_from_auth :active
       cr1 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Committed,
                                                     environment: env1}))
-      cr2 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Uncommitted,
-                                                    environment: env2}))
+      if use_existing.nil?
+        # Testing with use_existing default value
+        cr2 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Uncommitted,
+                                                      environment: env2}))
+      else
+
+        cr2 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Uncommitted,
+                                                      environment: env2,
+                                                      use_existing: use_existing}))
+      end
       assert_not_nil cr1.container_uuid
       assert_nil cr2.container_uuid
 
-      # Update cr2 to commited state and check for container equality on both cases,
-      # when env1 and env2 are equal the same container should be assigned, and
-      # when env1 and env2 are different, cr2 container should be different.
+      # Update cr2 to commited state and check for container equality on different cases:
+      # * When env1 and env2 are equal and use_existing is true, the same container
+      #   should be assigned.
+      # * When use_existing is false, a different container should be assigned.
+      # * When env1 and env2 are different, a different container should be assigned.
       cr2.update_attributes!({state: ContainerRequest::Committed})
-      assert_equal (env1 == env2), (cr1.container_uuid == cr2.container_uuid)
+      assert_equal (cr2.use_existing == true and (env1 == env2)),
+                   (cr1.container_uuid == cr2.container_uuid)
     end
   end
 
index f45cc823ec7c87d310609f6d33073c4d82f8b662..0c1ce49592a6b08223271d440dca41f3a5d8fd46 100644 (file)
@@ -79,9 +79,9 @@ func doMain() error {
                // propagated to crunch-run via SLURM.
                os.Setenv("ARVADOS_API_HOST", theConfig.Client.APIHost)
                os.Setenv("ARVADOS_API_TOKEN", theConfig.Client.AuthToken)
-               os.Setenv("ARVADOS_API_INSECURE", "")
+               os.Setenv("ARVADOS_API_HOST_INSECURE", "")
                if theConfig.Client.Insecure {
-                       os.Setenv("ARVADOS_API_INSECURE", "1")
+                       os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
                }
                os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(theConfig.Client.KeepServiceURIs, " "))
                os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
@@ -183,9 +183,10 @@ func submit(dispatcher *dispatch.Dispatcher,
        squeueUpdater.SlurmLock.Lock()
        defer squeueUpdater.SlurmLock.Unlock()
 
+       log.Printf("sbatch starting: %+q", cmd.Args)
        err := cmd.Start()
        if err != nil {
-               submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+               submitErr = fmt.Errorf("Error starting sbatch: %v", err)
                return
        }
 
index fafa3c36073c2408c6265af8afef01bffbe2e44d..61decde61c4bd61d0a92e96bde20ff0c82780f57 100644 (file)
@@ -45,7 +45,11 @@ func (squeue *Squeue) RunSqueue() {
                log.Printf("Error creating stdout pipe for squeue: %v", err)
                return
        }
-       cmd.Start()
+       err = cmd.Start()
+       if err != nil {
+               log.Printf("Error running squeue: %v", err)
+               return
+       }
        scanner := bufio.NewScanner(sq)
        for scanner.Scan() {
                newSqueueContents = append(newSqueueContents, scanner.Text())
index 48cb02647cfd098cdc67796ba992ac5cba327bde..d2163f6b490376768383b260444d6be90a9ca1ed 100644 (file)
@@ -40,41 +40,29 @@ func readKeyFromFile(file string) (string, error) {
 }
 
 type azureVolumeAdder struct {
-       *volumeSet
+       *Config
 }
 
-func (s *azureVolumeAdder) Set(containerName string) error {
-       if trashLifetime != 0 {
-               return ErrNotImplemented
-       }
+// String implements flag.Value
+func (s *azureVolumeAdder) String() string {
+       return "-"
+}
 
-       if containerName == "" {
-               return errors.New("no container name given")
-       }
-       if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
-               return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
-       }
-       accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
-       if err != nil {
-               return err
-       }
-       azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
-       if err != nil {
-               return errors.New("creating Azure storage client: " + err.Error())
-       }
-       if flagSerializeIO {
-               log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
-       }
-       v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
-       if err := v.Check(); err != nil {
-               return err
-       }
-       *s.volumeSet = append(*s.volumeSet, v)
+func (s *azureVolumeAdder) Set(containerName string) error {
+       s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
+               ContainerName:         containerName,
+               StorageAccountName:    azureStorageAccountName,
+               StorageAccountKeyFile: azureStorageAccountKeyFile,
+               AzureReplication:      azureStorageReplication,
+               ReadOnly:              deprecated.flagReadonly,
+       })
        return nil
 }
 
 func init() {
-       flag.Var(&azureVolumeAdder{&volumes},
+       VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
+
+       flag.Var(&azureVolumeAdder{theConfig},
                "azure-storage-container-volume",
                "Use the given container as a storage volume. Can be given multiple times.")
        flag.StringVar(
@@ -86,7 +74,7 @@ func init() {
                &azureStorageAccountKeyFile,
                "azure-storage-account-key-file",
                "",
-               "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+               "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
        flag.IntVar(
                &azureStorageReplication,
                "azure-storage-replication",
@@ -102,41 +90,64 @@ func init() {
 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
 // container.
 type AzureBlobVolume struct {
-       azClient      storage.Client
-       bsClient      storage.BlobStorageClient
-       containerName string
-       readonly      bool
-       replication   int
+       StorageAccountName    string
+       StorageAccountKeyFile string
+       ContainerName         string
+       AzureReplication      int
+       ReadOnly              bool
+
+       azClient storage.Client
+       bsClient storage.BlobStorageClient
 }
 
-// NewAzureBlobVolume returns a new AzureBlobVolume using the given
-// client and container name. The replication argument specifies the
-// replication level to report when writing data.
-func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
-       return &AzureBlobVolume{
-               azClient:      client,
-               bsClient:      client.GetBlobService(),
-               containerName: containerName,
-               readonly:      readonly,
-               replication:   replication,
+// Examples implements VolumeWithExamples.
+func (*AzureBlobVolume) Examples() []Volume {
+       return []Volume{
+               &AzureBlobVolume{
+                       StorageAccountName:    "example-account-name",
+                       StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
+                       ContainerName:         "example-container-name",
+                       AzureReplication:      3,
+               },
        }
 }
 
-// Check returns nil if the volume is usable.
-func (v *AzureBlobVolume) Check() error {
-       ok, err := v.bsClient.ContainerExists(v.containerName)
+// Type implements Volume.
+func (v *AzureBlobVolume) Type() string {
+       return "Azure"
+}
+
+// Start implements Volume.
+func (v *AzureBlobVolume) Start() error {
+       if v.ContainerName == "" {
+               return errors.New("no container name given")
+       }
+       if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
+               return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
+       }
+       accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
+       if err != nil {
+               return err
+       }
+       v.azClient, err = storage.NewBasicClient(v.StorageAccountName, accountKey)
+       if err != nil {
+               return fmt.Errorf("creating Azure storage client: %s", err)
+       }
+       v.bsClient = v.azClient.GetBlobService()
+
+       ok, err := v.bsClient.ContainerExists(v.ContainerName)
        if err != nil {
                return err
        }
        if !ok {
-               return errors.New("container does not exist")
+               return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
        }
        return nil
 }
 
 // Return true if expires_at metadata attribute is found on the block
 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
-       metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+       metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
        if err != nil {
                return false, metadata, v.translateError(err)
        }
@@ -197,7 +208,7 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
        if azureMaxGetBytes < BlockSize {
                // Unfortunately the handler doesn't tell us how long the blob
                // is expected to be, so we have to ask Azure.
-               props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+               props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
                if err != nil {
                        return 0, v.translateError(err)
                }
@@ -228,9 +239,9 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
                        var rdr io.ReadCloser
                        var err error
                        if startPos == 0 && endPos == expectSize {
-                               rdr, err = v.bsClient.GetBlob(v.containerName, loc)
+                               rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
                        } else {
-                               rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+                               rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
                        }
                        if err != nil {
                                errors[p] = err
@@ -268,7 +279,7 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
        if trashed {
                return os.ErrNotExist
        }
-       rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+       rdr, err := v.bsClient.GetBlob(v.ContainerName, loc)
        if err != nil {
                return v.translateError(err)
        }
@@ -278,15 +289,15 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
 
 // Put stores a Keep block as a block blob in the container.
 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
-       return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
+       return v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
 }
 
 // Touch updates the last-modified property of a block blob.
 func (v *AzureBlobVolume) Touch(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        trashed, metadata, err := v.checkTrashed(loc)
@@ -298,7 +309,7 @@ func (v *AzureBlobVolume) Touch(loc string) error {
        }
 
        metadata["touch"] = fmt.Sprintf("%d", time.Now())
-       return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+       return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
 }
 
 // Mtime returns the last-modified property of a block blob.
@@ -311,7 +322,7 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
                return time.Time{}, os.ErrNotExist
        }
 
-       props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+       props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
        if err != nil {
                return time.Time{}, err
        }
@@ -326,7 +337,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
                Include: "metadata",
        }
        for {
-               resp, err := v.bsClient.ListBlobs(v.containerName, params)
+               resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
                if err != nil {
                        return err
                }
@@ -361,7 +372,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
 
 // Trash a Keep block.
 func (v *AzureBlobVolume) Trash(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
 
@@ -370,26 +381,26 @@ func (v *AzureBlobVolume) Trash(loc string) error {
        // we get the Etag before checking Mtime, and use If-Match to
        // ensure we don't delete data if Put() or Touch() happens
        // between our calls to Mtime() and DeleteBlob().
-       props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+       props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
        if err != nil {
                return err
        }
        if t, err := v.Mtime(loc); err != nil {
                return err
-       } else if time.Since(t) < blobSignatureTTL {
+       } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
                return nil
        }
 
-       // If trashLifetime == 0, just delete it
-       if trashLifetime == 0 {
-               return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+       // If TrashLifetime == 0, just delete it
+       if theConfig.TrashLifetime == 0 {
+               return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
                        "If-Match": props.Etag,
                })
        }
 
        // Otherwise, mark as trash
-       return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
-               "expires_at": fmt.Sprintf("%d", time.Now().Add(trashLifetime).Unix()),
+       return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
+               "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
        }, map[string]string{
                "If-Match": props.Etag,
        })
@@ -399,7 +410,7 @@ func (v *AzureBlobVolume) Trash(loc string) error {
 // Delete the expires_at metadata attribute
 func (v *AzureBlobVolume) Untrash(loc string) error {
        // if expires_at does not exist, return NotFoundError
-       metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+       metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
        if err != nil {
                return v.translateError(err)
        }
@@ -409,7 +420,7 @@ func (v *AzureBlobVolume) Untrash(loc string) error {
 
        // reset expires_at metadata attribute
        metadata["expires_at"] = ""
-       err = v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+       err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
        return v.translateError(err)
 }
 
@@ -424,19 +435,19 @@ func (v *AzureBlobVolume) Status() *VolumeStatus {
 
 // String returns a volume label, including the container name.
 func (v *AzureBlobVolume) String() string {
-       return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
+       return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
 }
 
 // Writable returns true, unless the -readonly flag was on when the
 // volume was added.
 func (v *AzureBlobVolume) Writable() bool {
-       return !v.readonly
+       return !v.ReadOnly
 }
 
 // Replication returns the replication level of the container, as
 // specified by the -azure-storage-replication argument.
 func (v *AzureBlobVolume) Replication() int {
-       return v.replication
+       return v.AzureReplication
 }
 
 // If possible, translate an Azure SDK error to a recognizable error
@@ -459,7 +470,7 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
        return keepBlockRegexp.MatchString(s)
 }
 
-// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
 // and deletes them from the volume.
 func (v *AzureBlobVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
@@ -467,7 +478,7 @@ func (v *AzureBlobVolume) EmptyTrash() {
        params := storage.ListBlobsParameters{Include: "metadata"}
 
        for {
-               resp, err := v.bsClient.ListBlobs(v.containerName, params)
+               resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
                if err != nil {
                        log.Printf("EmptyTrash: ListBlobs: %v", err)
                        break
@@ -491,7 +502,7 @@ func (v *AzureBlobVolume) EmptyTrash() {
                                continue
                        }
 
-                       err = v.bsClient.DeleteBlob(v.containerName, b.Name, map[string]string{
+                       err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
                                "If-Match": b.Properties.Etag,
                        })
                        if err != nil {
index 5d556b3e8c40eb242addf53f4996c49eb396138f..c8c898fe2da3957e3efdf069c5370e146ac5d693 100644 (file)
@@ -365,7 +365,13 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
                }
        }
 
-       v := NewAzureBlobVolume(azClient, container, readonly, replication)
+       v := &AzureBlobVolume{
+               ContainerName:    container,
+               ReadOnly:         readonly,
+               AzureReplication: replication,
+               azClient:         azClient,
+               bsClient:         azClient.GetBlobService(),
+       }
 
        return &TestableAzureBlobVolume{
                AzureBlobVolume: v,
@@ -570,11 +576,11 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
 }
 
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
-       v.azHandler.PutRaw(v.containerName, locator, data)
+       v.azHandler.PutRaw(v.ContainerName, locator, data)
 }
 
 func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
-       v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
+       v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
 }
 
 func (v *TestableAzureBlobVolume) Teardown() {
index 7b51b643cea54ba0b5643a2bf84c4ccfed9bcd5f..bce82377b5d0c2a0d56cb8fa5011c7944ad6a5ea 100644 (file)
@@ -12,12 +12,12 @@ type BufferPoolSuite struct{}
 // Initialize a default-sized buffer pool for the benefit of test
 // suites that don't run main().
 func init() {
-       bufs = newBufferPool(maxBuffers, BlockSize)
+       bufs = newBufferPool(theConfig.MaxBuffers, BlockSize)
 }
 
 // Restore sane default after bufferpool's own tests
 func (s *BufferPoolSuite) TearDownTest(c *C) {
-       bufs = newBufferPool(maxBuffers, BlockSize)
+       bufs = newBufferPool(theConfig.MaxBuffers, BlockSize)
 }
 
 func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
diff --git a/services/keepstore/config.go b/services/keepstore/config.go
new file mode 100644 (file)
index 0000000..9c318d1
--- /dev/null
@@ -0,0 +1,179 @@
+package main
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "io/ioutil"
+       "log"
+       "strings"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type Config struct {
+       Listen string
+
+       PIDFile string
+
+       MaxBuffers  int
+       MaxRequests int
+
+       BlobSignatureTTL    arvados.Duration
+       BlobSigningKeyFile  string
+       RequireSignatures   bool
+       SystemAuthTokenFile string
+       EnableDelete        bool
+       TrashLifetime       arvados.Duration
+       TrashCheckInterval  arvados.Duration
+
+       Volumes VolumeList
+
+       blobSigningKey  []byte
+       systemAuthToken string
+}
+
+var theConfig = DefaultConfig()
+
+// DefaultConfig returns the default configuration.
+func DefaultConfig() *Config {
+       return &Config{
+               Listen:             ":25107",
+               MaxBuffers:         128,
+               RequireSignatures:  true,
+               BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
+               TrashLifetime:      arvados.Duration(14 * 24 * time.Hour),
+               TrashCheckInterval: arvados.Duration(24 * time.Hour),
+               Volumes:            []Volume{},
+       }
+}
+
+// Start should be called exactly once: after setting all public
+// fields, and before using the config.
+func (cfg *Config) Start() error {
+       if cfg.MaxBuffers < 0 {
+               return fmt.Errorf("MaxBuffers must be greater than zero")
+       }
+       bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
+
+       if cfg.MaxRequests < 1 {
+               cfg.MaxRequests = cfg.MaxBuffers * 2
+               log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
+       }
+
+       if cfg.BlobSigningKeyFile != "" {
+               buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
+               if err != nil {
+                       return fmt.Errorf("reading blob signing key file: %s", err)
+               }
+               cfg.blobSigningKey = bytes.TrimSpace(buf)
+               if len(cfg.blobSigningKey) == 0 {
+                       return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
+               }
+       } else if cfg.RequireSignatures {
+               return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
+       } else {
+               log.Println("Running without a blob signing key. Block locators " +
+                       "returned by this server will not be signed, and will be rejected " +
+                       "by a server that enforces permissions.")
+               log.Println("To fix this, use the BlobSigningKeyFile config entry.")
+       }
+
+       if fn := cfg.SystemAuthTokenFile; fn != "" {
+               buf, err := ioutil.ReadFile(fn)
+               if err != nil {
+                       return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
+               }
+               cfg.systemAuthToken = strings.TrimSpace(string(buf))
+       }
+
+       if cfg.EnableDelete {
+               log.Print("Trash/delete features are enabled. WARNING: this has not " +
+                       "been extensively tested. You should disable this unless you can afford to lose data.")
+       }
+
+       if len(cfg.Volumes) == 0 {
+               if (&unixVolumeAdder{cfg}).Discover() == 0 {
+                       return fmt.Errorf("no volumes found")
+               }
+       }
+       for _, v := range cfg.Volumes {
+               if err := v.Start(); err != nil {
+                       return fmt.Errorf("volume %s: %s", v, err)
+               }
+               log.Printf("Using volume %v (writable=%v)", v, v.Writable())
+       }
+       return nil
+}
+
+// VolumeTypes is built up by init() funcs in the source files that
+// define the volume types.
+var VolumeTypes = []func() VolumeWithExamples{}
+
+type VolumeList []Volume
+
+// UnmarshalJSON, given an array of objects, deserializes each object
+// as the volume type indicated by the object's Type field.
+func (vols *VolumeList) UnmarshalJSON(data []byte) error {
+       typeMap := map[string]func() VolumeWithExamples{}
+       for _, factory := range VolumeTypes {
+               t := factory().Type()
+               if _, ok := typeMap[t]; ok {
+                       log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
+               }
+               typeMap[t] = factory
+       }
+
+       var mapList []map[string]interface{}
+       err := json.Unmarshal(data, &mapList)
+       if err != nil {
+               return err
+       }
+       for _, mapIn := range mapList {
+               typeIn, ok := mapIn["Type"].(string)
+               if !ok {
+                       return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
+               }
+               factory, ok := typeMap[typeIn]
+               if !ok {
+                       return fmt.Errorf("unsupported volume type %+q", typeIn)
+               }
+               data, err := json.Marshal(mapIn)
+               if err != nil {
+                       return err
+               }
+               vol := factory()
+               err = json.Unmarshal(data, vol)
+               if err != nil {
+                       return err
+               }
+               *vols = append(*vols, vol)
+       }
+       return nil
+}
+
+// MarshalJSON adds a "Type" field to each volume corresponding to its
+// Type().
+func (vl *VolumeList) MarshalJSON() ([]byte, error) {
+       data := []byte{'['}
+       for _, vs := range *vl {
+               j, err := json.Marshal(vs)
+               if err != nil {
+                       return nil, err
+               }
+               if len(data) > 1 {
+                       data = append(data, byte(','))
+               }
+               t, err := json.Marshal(vs.Type())
+               if err != nil {
+                       panic(err)
+               }
+               data = append(data, j[0])
+               data = append(data, []byte(`"Type":`)...)
+               data = append(data, t...)
+               data = append(data, byte(','))
+               data = append(data, j[1:]...)
+       }
+       return append(data, byte(']')), nil
+}
diff --git a/services/keepstore/deprecated.go b/services/keepstore/deprecated.go
new file mode 100644 (file)
index 0000000..7caa6b5
--- /dev/null
@@ -0,0 +1,43 @@
+package main
+
+import (
+       "flag"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type deprecatedOptions struct {
+       flagSerializeIO     bool
+       flagReadonly        bool
+       neverDelete         bool
+       signatureTTLSeconds int
+}
+
+var deprecated = deprecatedOptions{
+       neverDelete:         !theConfig.EnableDelete,
+       signatureTTLSeconds: int(theConfig.BlobSignatureTTL.Duration() / time.Second),
+}
+
+func (depr *deprecatedOptions) beforeFlagParse(cfg *Config) {
+       flag.StringVar(&cfg.Listen, "listen", cfg.Listen, "see Listen configuration")
+       flag.IntVar(&cfg.MaxBuffers, "max-buffers", cfg.MaxBuffers, "see MaxBuffers configuration")
+       flag.IntVar(&cfg.MaxRequests, "max-requests", cfg.MaxRequests, "see MaxRequests configuration")
+       flag.BoolVar(&depr.neverDelete, "never-delete", depr.neverDelete, "see EnableDelete configuration")
+       flag.BoolVar(&cfg.RequireSignatures, "enforce-permissions", cfg.RequireSignatures, "see RequireSignatures configuration")
+       flag.StringVar(&cfg.BlobSigningKeyFile, "permission-key-file", cfg.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
+       flag.StringVar(&cfg.BlobSigningKeyFile, "blob-signing-key-file", cfg.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
+       flag.StringVar(&cfg.SystemAuthTokenFile, "data-manager-token-file", cfg.SystemAuthTokenFile, "see SystemAuthToken`File` configuration")
+       flag.IntVar(&depr.signatureTTLSeconds, "permission-ttl", depr.signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
+       flag.IntVar(&depr.signatureTTLSeconds, "blob-signature-ttl", depr.signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
+       flag.Var(&cfg.TrashLifetime, "trash-lifetime", "see TrashLifetime configuration")
+       flag.BoolVar(&depr.flagSerializeIO, "serialize", depr.flagSerializeIO, "serialize read and write operations on the following volumes.")
+       flag.BoolVar(&depr.flagReadonly, "readonly", depr.flagReadonly, "do not write, delete, or touch anything on the following volumes.")
+       flag.StringVar(&cfg.PIDFile, "pid", cfg.PIDFile, "see `PIDFile` configuration")
+       flag.Var(&cfg.TrashCheckInterval, "trash-check-interval", "see TrashCheckInterval configuration")
+}
+
+func (depr *deprecatedOptions) afterFlagParse(cfg *Config) {
+       cfg.BlobSignatureTTL = arvados.Duration(depr.signatureTTLSeconds) * arvados.Duration(time.Second)
+       cfg.EnableDelete = !depr.neverDelete
+}
index 7c17424ba568227790469e4e32867f33fea8ff4e..dc9bcb117f0508e748a97ff3cb2a736aa5c00178 100644 (file)
@@ -20,6 +20,8 @@ import (
        "strings"
        "testing"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 // A RequestTester represents the parameters for an HTTP request to
@@ -52,13 +54,13 @@ func TestGetHandler(t *testing.T) {
 
        // Create locators for testing.
        // Turn on permission settings so we can generate signed locators.
-       enforcePermissions = true
-       PermissionSecret = []byte(knownKey)
-       blobSignatureTTL = 300 * time.Second
+       theConfig.RequireSignatures = true
+       theConfig.blobSigningKey = []byte(knownKey)
+       theConfig.BlobSignatureTTL.Set("5m")
 
        var (
                unsignedLocator  = "/" + TestHash
-               validTimestamp   = time.Now().Add(blobSignatureTTL)
+               validTimestamp   = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
                expiredTimestamp = time.Now().Add(-time.Hour)
                signedLocator    = "/" + SignLocator(TestHash, knownToken, validTimestamp)
                expiredLocator   = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
@@ -66,7 +68,7 @@ func TestGetHandler(t *testing.T) {
 
        // -----------------
        // Test unauthenticated request with permissions off.
-       enforcePermissions = false
+       theConfig.RequireSignatures = false
 
        // Unauthenticated request, unsigned locator
        // => OK
@@ -90,7 +92,7 @@ func TestGetHandler(t *testing.T) {
 
        // ----------------
        // Permissions: on.
-       enforcePermissions = true
+       theConfig.RequireSignatures = true
 
        // Authenticated request, signed locator
        // => OK
@@ -175,8 +177,8 @@ func TestPutHandler(t *testing.T) {
        // ------------------
        // With a server key.
 
-       PermissionSecret = []byte(knownKey)
-       blobSignatureTTL = 300 * time.Second
+       theConfig.blobSigningKey = []byte(knownKey)
+       theConfig.BlobSignatureTTL.Set("5m")
 
        // When a permission key is available, the locator returned
        // from an authenticated PUT request will be signed.
@@ -220,7 +222,7 @@ func TestPutHandler(t *testing.T) {
 
 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
        defer teardown()
-       dataManagerToken = "fake-data-manager-token"
+       theConfig.systemAuthToken = "fake-data-manager-token"
        vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
        vols[0].Readonly = true
        KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
@@ -232,15 +234,15 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
                        requestBody: TestBlock,
                })
        defer func(orig bool) {
-               neverDelete = orig
-       }(neverDelete)
-       neverDelete = false
+               theConfig.EnableDelete = orig
+       }(theConfig.EnableDelete)
+       theConfig.EnableDelete = true
        IssueRequest(
                &RequestTester{
                        method:      "DELETE",
                        uri:         "/" + TestHash,
                        requestBody: TestBlock,
-                       apiToken:    dataManagerToken,
+                       apiToken:    theConfig.systemAuthToken,
                })
        type expect struct {
                volnum    int
@@ -274,7 +276,7 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
 //   - authenticated   /index/prefix request | superuser
 //
 // The only /index requests that should succeed are those issued by the
-// superuser. They should pass regardless of the value of enforcePermissions.
+// superuser. They should pass regardless of the value of RequireSignatures.
 //
 func TestIndexHandler(t *testing.T) {
        defer teardown()
@@ -291,7 +293,7 @@ func TestIndexHandler(t *testing.T) {
        vols[0].Put(TestHash+".meta", []byte("metadata"))
        vols[1].Put(TestHash2+".meta", []byte("metadata"))
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        unauthenticatedReq := &RequestTester{
                method: "GET",
@@ -305,7 +307,7 @@ func TestIndexHandler(t *testing.T) {
        superuserReq := &RequestTester{
                method:   "GET",
                uri:      "/index",
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        unauthPrefixReq := &RequestTester{
                method: "GET",
@@ -319,32 +321,32 @@ func TestIndexHandler(t *testing.T) {
        superuserPrefixReq := &RequestTester{
                method:   "GET",
                uri:      "/index/" + TestHash[0:3],
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        superuserNoSuchPrefixReq := &RequestTester{
                method:   "GET",
                uri:      "/index/abcd",
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        superuserInvalidPrefixReq := &RequestTester{
                method:   "GET",
                uri:      "/index/xyz",
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
 
        // -------------------------------------------------------------
        // Only the superuser should be allowed to issue /index requests.
 
        // ---------------------------
-       // enforcePermissions enabled
+       // RequireSignatures enabled
        // This setting should not affect tests passing.
-       enforcePermissions = true
+       theConfig.RequireSignatures = true
 
        // unauthenticated /index request
        // => UnauthorizedError
        response := IssueRequest(unauthenticatedReq)
        ExpectStatusCode(t,
-               "enforcePermissions on, unauthenticated request",
+               "RequireSignatures on, unauthenticated request",
                UnauthorizedError.HTTPCode,
                response)
 
@@ -381,9 +383,9 @@ func TestIndexHandler(t *testing.T) {
                response)
 
        // ----------------------------
-       // enforcePermissions disabled
+       // RequireSignatures disabled
        // Valid Request should still pass.
-       enforcePermissions = false
+       theConfig.RequireSignatures = false
 
        // superuser /index request
        // => OK
@@ -477,15 +479,15 @@ func TestDeleteHandler(t *testing.T) {
        vols := KeepVM.AllWritable()
        vols[0].Put(TestHash, TestBlock)
 
-       // Explicitly set the blobSignatureTTL to 0 for these
+       // Explicitly set the BlobSignatureTTL to 0 for these
        // tests, to ensure the MockVolume deletes the blocks
        // even though they have just been created.
-       blobSignatureTTL = time.Duration(0)
+       theConfig.BlobSignatureTTL = arvados.Duration(0)
 
        var userToken = "NOT DATA MANAGER TOKEN"
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
-       neverDelete = false
+       theConfig.EnableDelete = true
 
        unauthReq := &RequestTester{
                method: "DELETE",
@@ -501,13 +503,13 @@ func TestDeleteHandler(t *testing.T) {
        superuserExistingBlockReq := &RequestTester{
                method:   "DELETE",
                uri:      "/" + TestHash,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
 
        superuserNonexistentBlockReq := &RequestTester{
                method:   "DELETE",
                uri:      "/" + TestHash2,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
 
        // Unauthenticated request returns PermissionError.
@@ -538,14 +540,14 @@ func TestDeleteHandler(t *testing.T) {
                http.StatusNotFound,
                response)
 
-       // Authenticated admin request for existing block while neverDelete is set.
-       neverDelete = true
+       // Authenticated admin request for existing block while EnableDelete is false.
+       theConfig.EnableDelete = false
        response = IssueRequest(superuserExistingBlockReq)
        ExpectStatusCode(t,
                "authenticated request, existing block, method disabled",
                MethodDisabledError.HTTPCode,
                response)
-       neverDelete = false
+       theConfig.EnableDelete = true
 
        // Authenticated admin request for existing block.
        response = IssueRequest(superuserExistingBlockReq)
@@ -568,10 +570,10 @@ func TestDeleteHandler(t *testing.T) {
                t.Error("superuserExistingBlockReq: block not deleted")
        }
 
-       // A DELETE request on a block newer than blobSignatureTTL
+       // A DELETE request on a block newer than BlobSignatureTTL
        // should return success but leave the block on the volume.
        vols[0].Put(TestHash, TestBlock)
-       blobSignatureTTL = time.Hour
+       theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
 
        response = IssueRequest(superuserExistingBlockReq)
        ExpectStatusCode(t,
@@ -623,7 +625,7 @@ func TestPullHandler(t *testing.T) {
        defer teardown()
 
        var userToken = "USER TOKEN"
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        pullq = NewWorkQueue()
 
@@ -668,13 +670,13 @@ func TestPullHandler(t *testing.T) {
                },
                {
                        "Valid pull request from the data manager",
-                       RequestTester{"/pull", dataManagerToken, "PUT", goodJSON},
+                       RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
                        http.StatusOK,
                        "Received 3 pull requests\n",
                },
                {
                        "Invalid pull request from the data manager",
-                       RequestTester{"/pull", dataManagerToken, "PUT", badJSON},
+                       RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
                        http.StatusBadRequest,
                        "",
                },
@@ -729,7 +731,7 @@ func TestTrashHandler(t *testing.T) {
        defer teardown()
 
        var userToken = "USER TOKEN"
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        trashq = NewWorkQueue()
 
@@ -772,13 +774,13 @@ func TestTrashHandler(t *testing.T) {
                },
                {
                        "Valid trash list from the data manager",
-                       RequestTester{"/trash", dataManagerToken, "PUT", goodJSON},
+                       RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
                        http.StatusOK,
                        "Received 3 trash requests\n",
                },
                {
                        "Invalid trash list from the data manager",
-                       RequestTester{"/trash", dataManagerToken, "PUT", badJSON},
+                       RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
                        http.StatusBadRequest,
                        "",
                },
@@ -873,7 +875,7 @@ func TestPutNeedsOnlyOneBuffer(t *testing.T) {
        select {
        case <-ok:
        case <-time.After(time.Second):
-               t.Fatal("PUT deadlocks with maxBuffers==1")
+               t.Fatal("PUT deadlocks with MaxBuffers==1")
        }
 }
 
@@ -888,7 +890,7 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
 
        ok := make(chan bool)
        go func() {
-               for i := 0; i < maxBuffers+1; i++ {
+               for i := 0; i < theConfig.MaxBuffers+1; i++ {
                        // Unauthenticated request, no server key
                        // => OK (unsigned response)
                        unsignedLocator := "/" + TestHash
@@ -925,9 +927,9 @@ func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
 
 func TestGetHandlerClientDisconnect(t *testing.T) {
        defer func(was bool) {
-               enforcePermissions = was
-       }(enforcePermissions)
-       enforcePermissions = false
+               theConfig.RequireSignatures = was
+       }(theConfig.RequireSignatures)
+       theConfig.RequireSignatures = false
 
        defer func(orig *bufferPool) {
                bufs = orig
@@ -975,7 +977,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
 
 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
 // leak.
-func TestGetHandlerNoBufferleak(t *testing.T) {
+func TestGetHandlerNoBufferLeak(t *testing.T) {
        defer teardown()
 
        // Prepare two test Keep volumes. Our block is stored on the second volume.
@@ -989,7 +991,7 @@ func TestGetHandlerNoBufferleak(t *testing.T) {
 
        ok := make(chan bool)
        go func() {
-               for i := 0; i < maxBuffers+1; i++ {
+               for i := 0; i < theConfig.MaxBuffers+1; i++ {
                        // Unauthenticated request, unsigned locator
                        // => OK
                        unsignedLocator := "/" + TestHash
@@ -1040,7 +1042,7 @@ func TestUntrashHandler(t *testing.T) {
        vols := KeepVM.AllWritable()
        vols[0].Put(TestHash, TestBlock)
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        // unauthenticatedReq => UnauthorizedError
        unauthenticatedReq := &RequestTester{
@@ -1070,7 +1072,7 @@ func TestUntrashHandler(t *testing.T) {
        datamanagerWithBadHashReq := &RequestTester{
                method:   "PUT",
                uri:      "/untrash/thisisnotalocator",
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        response = IssueRequest(datamanagerWithBadHashReq)
        ExpectStatusCode(t,
@@ -1082,7 +1084,7 @@ func TestUntrashHandler(t *testing.T) {
        datamanagerWrongMethodReq := &RequestTester{
                method:   "GET",
                uri:      "/untrash/" + TestHash,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        response = IssueRequest(datamanagerWrongMethodReq)
        ExpectStatusCode(t,
@@ -1094,7 +1096,7 @@ func TestUntrashHandler(t *testing.T) {
        datamanagerReq := &RequestTester{
                method:   "PUT",
                uri:      "/untrash/" + TestHash,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        response = IssueRequest(datamanagerReq)
        ExpectStatusCode(t,
@@ -1119,13 +1121,13 @@ func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
        KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
        defer KeepVM.Close()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        // datamanagerReq => StatusOK
        datamanagerReq := &RequestTester{
                method:   "PUT",
                uri:      "/untrash/" + TestHash,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        response := IssueRequest(datamanagerReq)
        ExpectStatusCode(t,
index a6798a9f72bb6355ba8b5f6d9cb7d58f3ffe69e9..54b8b485e1dc99d491bd94d4b5b888b60b990b13 100644 (file)
@@ -71,7 +71,7 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 
 // GetBlockHandler is a HandleFunc to address Get block requests.
 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
-       if enforcePermissions {
+       if theConfig.RequireSignatures {
                locator := req.URL.Path[1:] // strip leading slash
                if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
                        http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
@@ -185,8 +185,8 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
        // return it to the client.
        returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
        apiToken := GetAPIToken(req)
-       if PermissionSecret != nil && apiToken != "" {
-               expiry := time.Now().Add(blobSignatureTTL)
+       if theConfig.blobSigningKey != nil && apiToken != "" {
+               expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
                returnHash = SignLocator(returnHash, apiToken, expiry)
        }
        resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
@@ -196,7 +196,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
-       if !IsDataManagerToken(GetAPIToken(req)) {
+       if !IsSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
@@ -334,7 +334,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       if neverDelete {
+       if !theConfig.EnableDelete {
                http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
                return
        }
@@ -419,7 +419,7 @@ type PullRequest struct {
 // PullHandler processes "PUT /pull" requests for the data manager.
 func PullHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
-       if !IsDataManagerToken(GetAPIToken(req)) {
+       if !IsSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
@@ -455,7 +455,7 @@ type TrashRequest struct {
 // TrashHandler processes /trash requests.
 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
-       if !IsDataManagerToken(GetAPIToken(req)) {
+       if !IsSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
@@ -485,7 +485,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
-       if !IsDataManagerToken(GetAPIToken(req)) {
+       if !IsSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
@@ -746,7 +746,7 @@ func CanDelete(apiToken string) bool {
        }
        // Blocks may be deleted only when Keep has been configured with a
        // data manager.
-       if IsDataManagerToken(apiToken) {
+       if IsSystemAuth(apiToken) {
                return true
        }
        // TODO(twp): look up apiToken with the API server
@@ -755,8 +755,8 @@ func CanDelete(apiToken string) bool {
        return false
 }
 
-// IsDataManagerToken returns true if apiToken represents the data
-// manager's token.
-func IsDataManagerToken(apiToken string) bool {
-       return dataManagerToken != "" && apiToken == dataManagerToken
+// IsSystemAuth returns true if the given token is allowed to perform
+// system level actions like deleting data.
+func IsSystemAuth(token string) bool {
+       return token != "" && token == theConfig.systemAuthToken
 }
index a04bc0b6fdf93296846ee5be48ffac46b535106f..3fb86bc0f147182087cb845ebb4b250891507a27 100644 (file)
@@ -1,32 +1,23 @@
 package main
 
 import (
-       "bytes"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "io/ioutil"
        "log"
        "net"
        "net/http"
        "os"
        "os/signal"
-       "strings"
        "syscall"
        "time"
-)
 
-// ======================
-// Configuration settings
-//
-// TODO(twp): make all of these configurable via command line flags
-// and/or configuration file settings.
-
-// Default TCP address on which to listen for requests.
-// Initialized by the --listen flag.
-const DefaultAddr = ":25107"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/coreos/go-systemd/daemon"
+       "github.com/ghodss/yaml"
+)
 
 // A Keep "block" is 64MB.
 const BlockSize = 64 * 1024 * 1024
@@ -38,36 +29,6 @@ const MinFreeKilobytes = BlockSize / 1024
 // ProcMounts /proc/mounts
 var ProcMounts = "/proc/mounts"
 
-// enforcePermissions controls whether permission signatures
-// should be enforced (affecting GET and DELETE requests).
-// Initialized by the -enforce-permissions flag.
-var enforcePermissions bool
-
-// blobSignatureTTL is the time duration for which new permission
-// signatures (returned by PUT requests) will be valid.
-// Initialized by the -permission-ttl flag.
-var blobSignatureTTL time.Duration
-
-// dataManagerToken represents the API token used by the
-// Data Manager, and is required on certain privileged operations.
-// Initialized by the -data-manager-token-file flag.
-var dataManagerToken string
-
-// neverDelete can be used to prevent the DELETE handler from
-// actually deleting anything.
-var neverDelete = true
-
-// trashLifetime is the time duration after a block is trashed
-// during which it can be recovered using an /untrash request
-// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
-var trashLifetime time.Duration
-
-// trashCheckInterval is the time duration at which the emptyTrash goroutine
-// will check and delete expired trashed blocks. Default is one day.
-// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
-var trashCheckInterval time.Duration
-
-var maxBuffers = 128
 var bufs *bufferPool
 
 // KeepError types.
@@ -121,135 +82,50 @@ var KeepVM VolumeManager
 var pullq *WorkQueue
 var trashq *WorkQueue
 
-type volumeSet []Volume
-
-var (
-       flagSerializeIO bool
-       flagReadonly    bool
-       volumes         volumeSet
-)
-
-func (vs *volumeSet) String() string {
-       if vs == nil {
-               return "[]"
-       }
-       return fmt.Sprintf("%+v", (*vs)[:])
-}
-
-// TODO(twp): continue moving as much code as possible out of main
-// so it can be effectively tested. Esp. handling and postprocessing
-// of command line flags (identifying Keep volumes and initializing
-// permission arguments).
-
 func main() {
-       log.Println("keepstore starting, pid", os.Getpid())
-       defer log.Println("keepstore exiting, pid", os.Getpid())
+       deprecated.beforeFlagParse(theConfig)
 
-       var (
-               dataManagerTokenFile string
-               listen               string
-               blobSigningKeyFile   string
-               permissionTTLSec     int
-               pidfile              string
-               maxRequests          int
-       )
-       flag.StringVar(
-               &dataManagerTokenFile,
-               "data-manager-token-file",
-               "",
-               "File with the API token used by the Data Manager. All DELETE "+
-                       "requests or GET /index requests must carry this token.")
-       flag.BoolVar(
-               &enforcePermissions,
-               "enforce-permissions",
-               false,
-               "Enforce permission signatures on requests.")
-       flag.StringVar(
-               &listen,
-               "listen",
-               DefaultAddr,
-               "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
-       flag.IntVar(
-               &maxRequests,
-               "max-requests",
-               0,
-               "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
-       flag.BoolVar(
-               &neverDelete,
-               "never-delete",
-               true,
-               "If true, nothing will be deleted. "+
-                       "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
-                       "You should leave this option alone unless you can afford to lose data.")
-       flag.StringVar(
-               &blobSigningKeyFile,
-               "permission-key-file",
-               "",
-               "Synonym for -blob-signing-key-file.")
-       flag.StringVar(
-               &blobSigningKeyFile,
-               "blob-signing-key-file",
-               "",
-               "File containing the secret key for generating and verifying "+
-                       "blob permission signatures.")
-       flag.IntVar(
-               &permissionTTLSec,
-               "permission-ttl",
-               0,
-               "Synonym for -blob-signature-ttl.")
-       flag.IntVar(
-               &permissionTTLSec,
-               "blob-signature-ttl",
-               2*7*24*3600,
-               "Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. "+
-                       "See services/api/config/application.default.yml.")
-       flag.BoolVar(
-               &flagSerializeIO,
-               "serialize",
-               false,
-               "Serialize read and write operations on the following volumes.")
-       flag.BoolVar(
-               &flagReadonly,
-               "readonly",
-               false,
-               "Do not write, delete, or touch anything on the following volumes.")
-       flag.StringVar(
-               &pidfile,
-               "pid",
-               "",
-               "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
-       flag.IntVar(
-               &maxBuffers,
-               "max-buffers",
-               maxBuffers,
-               fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
-       flag.DurationVar(
-               &trashLifetime,
-               "trash-lifetime",
-               0,
-               "Time duration after a block is trashed during which it can be recovered using an /untrash request")
-       flag.DurationVar(
-               &trashCheckInterval,
-               "trash-check-interval",
-               24*time.Hour,
-               "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
+       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
 
+       defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
+       var configPath string
+       flag.StringVar(
+               &configPath,
+               "config",
+               defaultConfigPath,
+               "YAML or JSON configuration file `path`")
+       flag.Usage = usage
        flag.Parse()
 
-       if maxBuffers < 0 {
-               log.Fatal("-max-buffers must be greater than zero.")
+       deprecated.afterFlagParse(theConfig)
+
+       err := config.LoadFile(theConfig, configPath)
+       if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
+               log.Fatal(err)
+       }
+
+       if *dumpConfig {
+               y, err := yaml.Marshal(theConfig)
+               if err != nil {
+                       log.Fatal(err)
+               }
+               os.Stdout.Write(y)
+               os.Exit(0)
        }
-       bufs = newBufferPool(maxBuffers, BlockSize)
 
-       if pidfile != "" {
+       err = theConfig.Start()
+
+       if pidfile := theConfig.PIDFile; pidfile != "" {
                f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
                if err != nil {
                        log.Fatalf("open pidfile (%s): %s", pidfile, err)
                }
+               defer f.Close()
                err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
                if err != nil {
                        log.Fatalf("flock pidfile (%s): %s", pidfile, err)
                }
+               defer os.Remove(pidfile)
                err = f.Truncate(0)
                if err != nil {
                        log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
@@ -262,74 +138,22 @@ func main() {
                if err != nil {
                        log.Fatalf("sync pidfile (%s): %s", pidfile, err)
                }
-               defer f.Close()
-               defer os.Remove(pidfile)
-       }
-
-       if len(volumes) == 0 {
-               if (&unixVolumeAdder{&volumes}).Discover() == 0 {
-                       log.Fatal("No volumes found.")
-               }
-       }
-
-       for _, v := range volumes {
-               log.Printf("Using volume %v (writable=%v)", v, v.Writable())
        }
 
-       // Initialize data manager token and permission key.
-       // If these tokens are specified but cannot be read,
-       // raise a fatal error.
-       if dataManagerTokenFile != "" {
-               if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
-                       dataManagerToken = strings.TrimSpace(string(buf))
-               } else {
-                       log.Fatalf("reading data manager token: %s\n", err)
-               }
-       }
-
-       if neverDelete != true {
-               log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
-                       "been extensively tested. You should leave this option alone unless you can afford to lose data.")
-       }
-
-       if blobSigningKeyFile != "" {
-               if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
-                       PermissionSecret = bytes.TrimSpace(buf)
-               } else {
-                       log.Fatalf("reading permission key: %s\n", err)
-               }
-       }
-
-       blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
-
-       if PermissionSecret == nil {
-               if enforcePermissions {
-                       log.Fatal("-enforce-permissions requires a permission key")
-               } else {
-                       log.Println("Running without a PermissionSecret. Block locators " +
-                               "returned by this server will not be signed, and will be rejected " +
-                               "by a server that enforces permissions.")
-                       log.Println("To fix this, use the -blob-signing-key-file flag " +
-                               "to specify the file containing the permission key.")
-               }
-       }
-
-       if maxRequests <= 0 {
-               maxRequests = maxBuffers * 2
-               log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
-       }
+       log.Println("keepstore starting, pid", os.Getpid())
+       defer log.Println("keepstore exiting, pid", os.Getpid())
 
        // Start a round-robin VolumeManager with the volumes we have found.
-       KeepVM = MakeRRVolumeManager(volumes)
+       KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
-       // Middleware stack: logger, maxRequests limiter, method handlers
+       // Middleware stack: logger, MaxRequests limiter, method handlers
        http.Handle("/", &LoggingRESTRouter{
-               httpserver.NewRequestLimiter(maxRequests,
+               httpserver.NewRequestLimiter(theConfig.MaxRequests,
                        MakeRESTRouter()),
        })
 
        // Set up a TCP listener.
-       listener, err := net.Listen("tcp", listen)
+       listener, err := net.Listen("tcp", theConfig.Listen)
        if err != nil {
                log.Fatal(err)
        }
@@ -351,7 +175,7 @@ func main() {
 
        // Start emptyTrash goroutine
        doneEmptyingTrash := make(chan bool)
-       go emptyTrash(doneEmptyingTrash, trashCheckInterval)
+       go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
 
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
@@ -365,24 +189,27 @@ func main() {
        signal.Notify(term, syscall.SIGTERM)
        signal.Notify(term, syscall.SIGINT)
 
-       log.Println("listening at", listen)
-       srv := &http.Server{Addr: listen}
+       if _, err := daemon.SdNotify("READY=1"); err != nil {
+               log.Printf("Error notifying init daemon: %v", err)
+       }
+       log.Println("listening at", listener.Addr())
+       srv := &http.Server{}
        srv.Serve(listener)
 }
 
-// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
-func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
-       ticker := time.NewTicker(trashCheckInterval)
+// Periodically (once per interval) invoke EmptyTrash on all volumes.
+func emptyTrash(done <-chan bool, interval time.Duration) {
+       ticker := time.NewTicker(interval)
 
        for {
                select {
                case <-ticker.C:
-                       for _, v := range volumes {
+                       for _, v := range theConfig.Volumes {
                                if v.Writable() {
                                        v.EmptyTrash()
                                }
                        }
-               case <-doneEmptyingTrash:
+               case <-done:
                        ticker.Stop()
                        return
                }
diff --git a/services/keepstore/keepstore.service b/services/keepstore/keepstore.service
new file mode 100644 (file)
index 0000000..b9e2793
--- /dev/null
@@ -0,0 +1,13 @@
+[Unit]
+Description=Arvados Keep Storage Daemon
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/keepstore/keepstore.yml
+
+[Service]
+Type=notify
+ExecStart=/usr/bin/keepstore
+Restart=always
+
+[Install]
+WantedBy=multi-user.target
index c0adbc0bd74dad7d115dfe70a3374b2815704c56..dc6af0fa0d651c79cd6694a006fd3ad83ba2d677 100644 (file)
@@ -341,23 +341,23 @@ func TestDiscoverTmpfs(t *testing.T) {
        f.Close()
        ProcMounts = f.Name()
 
-       resultVols := volumeSet{}
-       added := (&unixVolumeAdder{&resultVols}).Discover()
+       cfg := &Config{}
+       added := (&unixVolumeAdder{cfg}).Discover()
 
-       if added != len(resultVols) {
+       if added != len(cfg.Volumes) {
                t.Errorf("Discover returned %d, but added %d volumes",
-                       added, len(resultVols))
+                       added, len(cfg.Volumes))
        }
        if added != len(tempVols) {
                t.Errorf("Discover returned %d but we set up %d volumes",
                        added, len(tempVols))
        }
        for i, tmpdir := range tempVols {
-               if tmpdir != resultVols[i].(*UnixVolume).root {
+               if tmpdir != cfg.Volumes[i].(*UnixVolume).Root {
                        t.Errorf("Discover returned %s, expected %s\n",
-                               resultVols[i].(*UnixVolume).root, tmpdir)
+                               cfg.Volumes[i].(*UnixVolume).Root, tmpdir)
                }
-               if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
+               if expectReadonly := i%2 == 1; expectReadonly != cfg.Volumes[i].(*UnixVolume).ReadOnly {
                        t.Errorf("Discover added %s with readonly=%v, should be %v",
                                tmpdir, !expectReadonly, expectReadonly)
                }
@@ -381,10 +381,10 @@ func TestDiscoverNone(t *testing.T) {
        f.Close()
        ProcMounts = f.Name()
 
-       resultVols := volumeSet{}
-       added := (&unixVolumeAdder{&resultVols}).Discover()
-       if added != 0 || len(resultVols) != 0 {
-               t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
+       cfg := &Config{}
+       added := (&unixVolumeAdder{cfg}).Discover()
+       if added != 0 || len(cfg.Volumes) != 0 {
+               t.Fatalf("got %d, %v; expected 0, []", added, cfg.Volumes)
        }
 }
 
@@ -442,8 +442,8 @@ func MakeTestVolumeManager(numVolumes int) VolumeManager {
 
 // teardown cleans up after each test.
 func teardown() {
-       dataManagerToken = ""
-       enforcePermissions = false
-       PermissionSecret = nil
+       theConfig.systemAuthToken = ""
+       theConfig.RequireSignatures = false
+       theConfig.blobSigningKey = nil
        KeepVM = nil
 }
index 9cd97bd3b746b1d66c0eba3b002fe5c9b8d70083..38445d982b438e3b744f09c74727d81d11304c84 100644 (file)
@@ -5,15 +5,10 @@ import (
        "time"
 )
 
-// The PermissionSecret is the secret key used to generate SHA1
-// digests for permission hints. apiserver and Keep must use the same
-// key.
-var PermissionSecret []byte
-
 // SignLocator takes a blobLocator, an apiToken and an expiry time, and
 // returns a signed locator string.
 func SignLocator(blobLocator, apiToken string, expiry time.Time) string {
-       return keepclient.SignLocator(blobLocator, apiToken, expiry, blobSignatureTTL, PermissionSecret)
+       return keepclient.SignLocator(blobLocator, apiToken, expiry, theConfig.BlobSignatureTTL.Duration(), theConfig.blobSigningKey)
 }
 
 // VerifySignature returns nil if the signature on the signedLocator
@@ -22,7 +17,7 @@ func SignLocator(blobLocator, apiToken string, expiry time.Time) string {
 // something the client could have figured out independently) or
 // PermissionError.
 func VerifySignature(signedLocator, apiToken string) error {
-       err := keepclient.VerifySignature(signedLocator, apiToken, blobSignatureTTL, PermissionSecret)
+       err := keepclient.VerifySignature(signedLocator, apiToken, theConfig.BlobSignatureTTL.Duration(), theConfig.blobSigningKey)
        if err == keepclient.ErrSignatureExpired {
                return ExpiredError
        } else if err != nil {
index 43717b23720d8c71b32c126810f8e39dd41a0429..8e47e4a4429c77c99df8b155f33dd6d56b801855 100644 (file)
@@ -4,6 +4,8 @@ import (
        "strconv"
        "testing"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 const (
@@ -17,7 +19,7 @@ const (
                "gokee3eamvjy8qq1fvy238838enjmy5wzy2md7yvsitp5vztft6j4q866efym7e6" +
                "vu5wm9fpnwjyxfldw3vbo01mgjs75rgo7qioh8z8ij7jpyp8508okhgbbex3ceei" +
                "786u5rw2a9gx743dj3fgq2irk"
-       knownSignatureTTL  = 1209600 * time.Second
+       knownSignatureTTL  = arvados.Duration(24 * 14 * time.Hour)
        knownSignature     = "89118b78732c33104a4d6231e8b5a5fa1e4301e3"
        knownTimestamp     = "7fffffff"
        knownSigHint       = "+A" + knownSignature + "@" + knownTimestamp
@@ -26,8 +28,8 @@ const (
 
 func TestSignLocator(t *testing.T) {
        defer func(b []byte) {
-               PermissionSecret = b
-       }(PermissionSecret)
+               theConfig.blobSigningKey = b
+       }(theConfig.blobSigningKey)
 
        tsInt, err := strconv.ParseInt(knownTimestamp, 16, 0)
        if err != nil {
@@ -35,33 +37,33 @@ func TestSignLocator(t *testing.T) {
        }
        t0 := time.Unix(tsInt, 0)
 
-       blobSignatureTTL = knownSignatureTTL
+       theConfig.BlobSignatureTTL = knownSignatureTTL
 
-       PermissionSecret = []byte(knownKey)
+       theConfig.blobSigningKey = []byte(knownKey)
        if x := SignLocator(knownLocator, knownToken, t0); x != knownSignedLocator {
                t.Fatalf("Got %+q, expected %+q", x, knownSignedLocator)
        }
 
-       PermissionSecret = []byte("arbitrarykey")
+       theConfig.blobSigningKey = []byte("arbitrarykey")
        if x := SignLocator(knownLocator, knownToken, t0); x == knownSignedLocator {
-               t.Fatalf("Got same signature %+q, even though PermissionSecret changed", x)
+               t.Fatalf("Got same signature %+q, even though blobSigningKey changed", x)
        }
 }
 
 func TestVerifyLocator(t *testing.T) {
        defer func(b []byte) {
-               PermissionSecret = b
-       }(PermissionSecret)
+               theConfig.blobSigningKey = b
+       }(theConfig.blobSigningKey)
 
-       blobSignatureTTL = knownSignatureTTL
+       theConfig.BlobSignatureTTL = knownSignatureTTL
 
-       PermissionSecret = []byte(knownKey)
+       theConfig.blobSigningKey = []byte(knownKey)
        if err := VerifySignature(knownSignedLocator, knownToken); err != nil {
                t.Fatal(err)
        }
 
-       PermissionSecret = []byte("arbitrarykey")
+       theConfig.blobSigningKey = []byte("arbitrarykey")
        if err := VerifySignature(knownSignedLocator, knownToken); err == nil {
-               t.Fatal("Verified signature even with wrong PermissionSecret")
+               t.Fatal("Verified signature even with wrong blobSigningKey")
        }
 }
index 4d85d5fd20cf6abe408035d294e3d85c5d011251..43a6de68443f693acb85696a0f5938836a34b2f7 100644 (file)
@@ -84,10 +84,10 @@ type PullWorkerTestData struct {
 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_locators",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "hello",
@@ -101,10 +101,10 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_one_locator",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "hola",
@@ -118,10 +118,10 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_get_one_locator",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "unused",
@@ -135,10 +135,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_get_two_locators",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "unused",
@@ -152,10 +152,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_put_one_locator",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "hello hello",
@@ -169,10 +169,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_put_two_locators",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "hello again",
@@ -195,10 +195,10 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_repla
        pullq.ReplaceQueue(makeTestWorkList(firstInput))
        testPullLists["Added_before_actual_test_item"] = string(1)
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_items_latest_replacing_old",
-               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "hola de nuevo",
@@ -210,14 +210,14 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_repla
 }
 
 // In this case, the item will not be placed on pullq
-func (s *PullWorkerTestSuite) TestPullWorker_invalid_dataManagerToken(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
        defer teardown()
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_locators",
-               req:          RequestTester{"/pull", "invalid_dataManagerToken", "PUT", firstPullList},
+               req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
                responseCode: http.StatusUnauthorized,
                responseBody: "Unauthorized\n",
                readContent:  "hello",
index 1a2a47b0df3b27d9c38ae7f4c8986a0f48f933a2..caed35b670e9484e9978e771e0e8c2aea2532e52 100644 (file)
@@ -11,8 +11,10 @@ import (
        "os"
        "regexp"
        "strings"
+       "sync"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/aws"
        "github.com/AdRoll/goamz/s3"
 )
@@ -39,7 +41,12 @@ const (
 )
 
 type s3VolumeAdder struct {
-       *volumeSet
+       *Config
+}
+
+// String implements flag.Value
+func (s *s3VolumeAdder) String() string {
+       return "-"
 }
 
 func (s *s3VolumeAdder) Set(bucketName string) error {
@@ -49,39 +56,21 @@ func (s *s3VolumeAdder) Set(bucketName string) error {
        if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
                return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
        }
-       region, ok := aws.Regions[s3RegionName]
-       if s3Endpoint == "" {
-               if !ok {
-                       return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
-               }
-       } else {
-               if ok {
-                       return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
-                               "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
-               }
-               region = aws.Region{
-                       Name:       s3RegionName,
-                       S3Endpoint: s3Endpoint,
-               }
-       }
-       var err error
-       var auth aws.Auth
-       auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
-       if err != nil {
-               return err
-       }
-       auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
-       if err != nil {
-               return err
-       }
-       if flagSerializeIO {
+       if deprecated.flagSerializeIO {
                log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
        }
-       v := NewS3Volume(auth, region, bucketName, s3RaceWindow, flagReadonly, s3Replication)
-       if err := v.Check(); err != nil {
-               return err
-       }
-       *s.volumeSet = append(*s.volumeSet, v)
+       s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
+               Bucket:        bucketName,
+               AccessKeyFile: s3AccessKeyFile,
+               SecretKeyFile: s3SecretKeyFile,
+               Endpoint:      s3Endpoint,
+               Region:        s3RegionName,
+               RaceWindow:    arvados.Duration(s3RaceWindow),
+               S3Replication: s3Replication,
+               UnsafeDelete:  s3UnsafeDelete,
+               ReadOnly:      deprecated.flagReadonly,
+               IndexPageSize: 1000,
+       })
        return nil
 }
 
@@ -93,7 +82,9 @@ func s3regions() (okList []string) {
 }
 
 func init() {
-       flag.Var(&s3VolumeAdder{&volumes},
+       VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
+
+       flag.Var(&s3VolumeAdder{theConfig},
                "s3-bucket-volume",
                "Use the given bucket as a storage volume. Can be given multiple times.")
        flag.StringVar(
@@ -110,12 +101,12 @@ func init() {
                &s3AccessKeyFile,
                "s3-access-key-file",
                "",
-               "File containing the access key used for subsequent -s3-bucket-volume arguments.")
+               "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
        flag.StringVar(
                &s3SecretKeyFile,
                "s3-secret-key-file",
                "",
-               "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
+               "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
        flag.DurationVar(
                &s3RaceWindow,
                "s3-race-window",
@@ -135,32 +126,87 @@ func init() {
 
 // S3Volume implements Volume using an S3 bucket.
 type S3Volume struct {
-       *s3.Bucket
-       raceWindow    time.Duration
-       readonly      bool
-       replication   int
-       indexPageSize int
-}
-
-// NewS3Volume returns a new S3Volume using the given auth, region,
-// and bucket name. The replication argument specifies the replication
-// level to report when writing data.
-func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow time.Duration, readonly bool, replication int) *S3Volume {
-       return &S3Volume{
-               Bucket: &s3.Bucket{
-                       S3:   s3.New(auth, region),
-                       Name: bucket,
+       AccessKeyFile      string
+       SecretKeyFile      string
+       Endpoint           string
+       Region             string
+       Bucket             string
+       LocationConstraint bool
+       IndexPageSize      int
+       S3Replication      int
+       RaceWindow         arvados.Duration
+       ReadOnly           bool
+       UnsafeDelete       bool
+
+       bucket *s3.Bucket
+
+       startOnce sync.Once
+}
+
+// Examples implements VolumeWithExamples.
+func (*S3Volume) Examples() []Volume {
+       return []Volume{
+               &S3Volume{
+                       AccessKeyFile: "/etc/aws_s3_access_key.txt",
+                       SecretKeyFile: "/etc/aws_s3_secret_key.txt",
+                       Endpoint:      "",
+                       Region:        "us-east-1",
+                       Bucket:        "example-bucket-name",
+                       IndexPageSize: 1000,
+                       S3Replication: 2,
+                       RaceWindow:    arvados.Duration(24 * time.Hour),
+               },
+               &S3Volume{
+                       AccessKeyFile: "/etc/gce_s3_access_key.txt",
+                       SecretKeyFile: "/etc/gce_s3_secret_key.txt",
+                       Endpoint:      "https://storage.googleapis.com",
+                       Region:        "",
+                       Bucket:        "example-bucket-name",
+                       IndexPageSize: 1000,
+                       S3Replication: 2,
+                       RaceWindow:    arvados.Duration(24 * time.Hour),
                },
-               raceWindow:    raceWindow,
-               readonly:      readonly,
-               replication:   replication,
-               indexPageSize: 1000,
        }
 }
 
-// Check returns an error if the volume is inaccessible (e.g., config
-// error).
-func (v *S3Volume) Check() error {
+// Type implements Volume.
+func (*S3Volume) Type() string {
+       return "S3"
+}
+
+// Start populates private fields and verifies the configuration is
+// valid.
+func (v *S3Volume) Start() error {
+       region, ok := aws.Regions[v.Region]
+       if v.Endpoint == "" {
+               if !ok {
+                       return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
+               }
+       } else if ok {
+               return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
+                       "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
+       } else {
+               region = aws.Region{
+                       Name:                 v.Region,
+                       S3Endpoint:           v.Endpoint,
+                       S3LocationConstraint: v.LocationConstraint,
+               }
+       }
+
+       var err error
+       var auth aws.Auth
+       auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
+       if err != nil {
+               return err
+       }
+       auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
+       if err != nil {
+               return err
+       }
+       v.bucket = &s3.Bucket{
+               S3:   s3.New(auth, region),
+               Name: v.Bucket,
+       }
        return nil
 }
 
@@ -170,12 +216,12 @@ func (v *S3Volume) Check() error {
 // disappeared in a Trash race, getReader calls fixRace to recover the
 // data, and tries again.
 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
-       rdr, err = v.Bucket.GetReader(loc)
+       rdr, err = v.bucket.GetReader(loc)
        err = v.translateError(err)
        if err == nil || !os.IsNotExist(err) {
                return
        }
-       _, err = v.Bucket.Head("recent/"+loc, nil)
+       _, err = v.bucket.Head("recent/"+loc, nil)
        err = v.translateError(err)
        if err != nil {
                // If we can't read recent/X, there's no point in
@@ -186,7 +232,7 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
                err = os.ErrNotExist
                return
        }
-       rdr, err = v.Bucket.GetReader(loc)
+       rdr, err = v.bucket.GetReader(loc)
        if err != nil {
                log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
                err = v.translateError(err)
@@ -223,7 +269,7 @@ func (v *S3Volume) Compare(loc string, expect []byte) error {
 
 // Put writes a block.
 func (v *S3Volume) Put(loc string, block []byte) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        var opts s3.Options
@@ -234,20 +280,20 @@ func (v *S3Volume) Put(loc string, block []byte) error {
                }
                opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
        }
-       err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
+       err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
        if err != nil {
                return v.translateError(err)
        }
-       err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
        return v.translateError(err)
 }
 
 // Touch sets the timestamp for the given locator to the current time.
 func (v *S3Volume) Touch(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
-       _, err := v.Bucket.Head(loc, nil)
+       _, err := v.bucket.Head(loc, nil)
        err = v.translateError(err)
        if os.IsNotExist(err) && v.fixRace(loc) {
                // The data object got trashed in a race, but fixRace
@@ -255,27 +301,27 @@ func (v *S3Volume) Touch(loc string) error {
        } else if err != nil {
                return err
        }
-       err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
        return v.translateError(err)
 }
 
 // Mtime returns the stored timestamp for the given locator.
 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
-       _, err := v.Bucket.Head(loc, nil)
+       _, err := v.bucket.Head(loc, nil)
        if err != nil {
                return zeroTime, v.translateError(err)
        }
-       resp, err := v.Bucket.Head("recent/"+loc, nil)
+       resp, err := v.bucket.Head("recent/"+loc, nil)
        err = v.translateError(err)
        if os.IsNotExist(err) {
                // The data object X exists, but recent/X is missing.
-               err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+               err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
                if err != nil {
                        log.Printf("error: creating %q: %s", "recent/"+loc, err)
                        return zeroTime, v.translateError(err)
                }
                log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
-               resp, err = v.Bucket.Head("recent/"+loc, nil)
+               resp, err = v.bucket.Head("recent/"+loc, nil)
                if err != nil {
                        log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
                        return zeroTime, v.translateError(err)
@@ -292,14 +338,14 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
        // Use a merge sort to find matching sets of X and recent/X.
        dataL := s3Lister{
-               Bucket:   v.Bucket,
+               Bucket:   v.bucket,
                Prefix:   prefix,
-               PageSize: v.indexPageSize,
+               PageSize: v.IndexPageSize,
        }
        recentL := s3Lister{
-               Bucket:   v.Bucket,
+               Bucket:   v.bucket,
                Prefix:   "recent/" + prefix,
-               PageSize: v.indexPageSize,
+               PageSize: v.IndexPageSize,
        }
        for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
                if data.Key >= "g" {
@@ -346,19 +392,19 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 
 // Trash a Keep block.
 func (v *S3Volume) Trash(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        if t, err := v.Mtime(loc); err != nil {
                return err
-       } else if time.Since(t) < blobSignatureTTL {
+       } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
                return nil
        }
-       if trashLifetime == 0 {
+       if theConfig.TrashLifetime == 0 {
                if !s3UnsafeDelete {
                        return ErrS3TrashDisabled
                }
-               return v.Bucket.Del(loc)
+               return v.bucket.Del(loc)
        }
        err := v.checkRaceWindow(loc)
        if err != nil {
@@ -368,13 +414,13 @@ func (v *S3Volume) Trash(loc string) error {
        if err != nil {
                return err
        }
-       return v.translateError(v.Bucket.Del(loc))
+       return v.translateError(v.bucket.Del(loc))
 }
 
 // checkRaceWindow returns a non-nil error if trash/loc is, or might
 // be, in the race window (i.e., it's not safe to trash loc).
 func (v *S3Volume) checkRaceWindow(loc string) error {
-       resp, err := v.Bucket.Head("trash/"+loc, nil)
+       resp, err := v.bucket.Head("trash/"+loc, nil)
        err = v.translateError(err)
        if os.IsNotExist(err) {
                // OK, trash/X doesn't exist so we're not in the race
@@ -390,7 +436,7 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
                // Can't parse timestamp
                return err
        }
-       safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow))
+       safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
        if safeWindow <= 0 {
                // We can't count on "touch trash/X" to prolong
                // trash/X's lifetime. The new timestamp might not
@@ -408,10 +454,10 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
 // (PutCopy returns 200 OK if the request was received, even if the
 // copy failed).
 func (v *S3Volume) safeCopy(dst, src string) error {
-       resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
+       resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
                ContentType:       "application/octet-stream",
                MetadataDirective: "REPLACE",
-       }, v.Bucket.Name+"/"+src)
+       }, v.bucket.Name+"/"+src)
        err = v.translateError(err)
        if err != nil {
                return err
@@ -446,7 +492,7 @@ func (v *S3Volume) Untrash(loc string) error {
        if err != nil {
                return err
        }
-       err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
        return v.translateError(err)
 }
 
@@ -463,19 +509,19 @@ func (v *S3Volume) Status() *VolumeStatus {
 
 // String implements fmt.Stringer.
 func (v *S3Volume) String() string {
-       return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
+       return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
 }
 
 // Writable returns false if all future Put, Mtime, and Delete calls
 // are expected to fail.
 func (v *S3Volume) Writable() bool {
-       return !v.readonly
+       return !v.ReadOnly
 }
 
 // Replication returns the storage redundancy of the underlying
 // device. Configured via command line flag.
 func (v *S3Volume) Replication() int {
-       return v.replication
+       return v.S3Replication
 }
 
 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
@@ -489,7 +535,7 @@ func (v *S3Volume) isKeepBlock(s string) bool {
 // there was a race between Put and Trash, fixRace recovers from the
 // race by Untrashing the block.
 func (v *S3Volume) fixRace(loc string) bool {
-       trash, err := v.Bucket.Head("trash/"+loc, nil)
+       trash, err := v.bucket.Head("trash/"+loc, nil)
        if err != nil {
                if !os.IsNotExist(v.translateError(err)) {
                        log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
@@ -502,7 +548,7 @@ func (v *S3Volume) fixRace(loc string) bool {
                return false
        }
 
-       recent, err := v.Bucket.Head("recent/"+loc, nil)
+       recent, err := v.bucket.Head("recent/"+loc, nil)
        if err != nil {
                log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
                return false
@@ -514,13 +560,13 @@ func (v *S3Volume) fixRace(loc string) bool {
        }
 
        ageWhenTrashed := trashTime.Sub(recentTime)
-       if ageWhenTrashed >= blobSignatureTTL {
+       if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
                // No evidence of a race: block hasn't been written
                // since it became eligible for Trash. No fix needed.
                return false
        }
 
-       log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, blobSignatureTTL)
+       log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
        log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
        err = v.safeCopy(loc, "trash/"+loc)
        if err != nil {
@@ -545,16 +591,16 @@ func (v *S3Volume) translateError(err error) error {
        return err
 }
 
-// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
 // and deletes them from the volume.
 func (v *S3Volume) EmptyTrash() {
        var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
 
        // Use a merge sort to find matching sets of trash/X and recent/X.
        trashL := s3Lister{
-               Bucket:   v.Bucket,
+               Bucket:   v.bucket,
                Prefix:   "trash/",
-               PageSize: v.indexPageSize,
+               PageSize: v.IndexPageSize,
        }
        // Define "ready to delete" as "...when EmptyTrash started".
        startT := time.Now()
@@ -571,7 +617,7 @@ func (v *S3Volume) EmptyTrash() {
                        log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
                        continue
                }
-               recent, err := v.Bucket.Head("recent/"+loc, nil)
+               recent, err := v.bucket.Head("recent/"+loc, nil)
                if err != nil && os.IsNotExist(v.translateError(err)) {
                        log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
                        err = v.Untrash(loc)
@@ -588,21 +634,21 @@ func (v *S3Volume) EmptyTrash() {
                        log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
                        continue
                }
-               if trashT.Sub(recentT) < blobSignatureTTL {
-                       if age := startT.Sub(recentT); age >= blobSignatureTTL-v.raceWindow {
+               if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
+                       if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
                                // recent/loc is too old to protect
                                // loc from being Trashed again during
                                // the raceWindow that starts if we
                                // delete trash/X now.
                                //
-                               // Note this means (trashCheckInterval
-                               // < blobSignatureTTL - raceWindow) is
+                               // Note this means (TrashCheckInterval
+                               // < BlobSignatureTTL - raceWindow) is
                                // necessary to avoid starvation.
                                log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
                                v.fixRace(loc)
                                v.Touch(loc)
                                continue
-                       } else if _, err := v.Bucket.Head(loc, nil); os.IsNotExist(err) {
+                       } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
                                log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
                                v.fixRace(loc)
                                continue
@@ -611,10 +657,10 @@ func (v *S3Volume) EmptyTrash() {
                                continue
                        }
                }
-               if startT.Sub(trashT) < trashLifetime {
+               if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
                        continue
                }
-               err = v.Bucket.Del(trash.Key)
+               err = v.bucket.Del(trash.Key)
                if err != nil {
                        log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
                        continue
@@ -622,9 +668,9 @@ func (v *S3Volume) EmptyTrash() {
                bytesDeleted += trash.Size
                blocksDeleted++
 
-               _, err = v.Bucket.Head(loc, nil)
+               _, err = v.bucket.Head(loc, nil)
                if os.IsNotExist(err) {
-                       err = v.Bucket.Del("recent/" + loc)
+                       err = v.bucket.Del("recent/" + loc)
                        if err != nil {
                                log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
                        }
index 6ba390426f51dbe4f8ef1d6ac2ce2a27b4d3f3b2..76dcbc9f9ea2f8fb680a25a31b84735f991b1b51 100644 (file)
@@ -4,23 +4,17 @@ import (
        "bytes"
        "crypto/md5"
        "fmt"
+       "io/ioutil"
        "log"
        "os"
        "time"
 
-       "github.com/AdRoll/goamz/aws"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
        check "gopkg.in/check.v1"
 )
 
-type TestableS3Volume struct {
-       *S3Volume
-       server      *s3test.Server
-       c           *check.C
-       serverClock *fakeClock
-}
-
 const (
        TestBucketName = "testbucket"
 )
@@ -42,30 +36,6 @@ func init() {
        s3UnsafeDelete = true
 }
 
-func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
-       clock := &fakeClock{}
-       srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
-       c.Assert(err, check.IsNil)
-       auth := aws.Auth{}
-       region := aws.Region{
-               Name:                 "test-region-1",
-               S3Endpoint:           srv.URL(),
-               S3LocationConstraint: true,
-       }
-       bucket := &s3.Bucket{
-               S3:   s3.New(auth, region),
-               Name: TestBucketName,
-       }
-       err = bucket.PutBucket(s3.ACL("private"))
-       c.Assert(err, check.IsNil)
-
-       return &TestableS3Volume{
-               S3Volume:    NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
-               server:      srv,
-               serverClock: clock,
-       }
-}
-
 var _ = check.Suite(&StubbedS3Suite{})
 
 type StubbedS3Suite struct {
@@ -76,19 +46,19 @@ func (s *StubbedS3Suite) TestGeneric(c *check.C) {
        DoGenericVolumeTests(c, func(t TB) TestableVolume {
                // Use a negative raceWindow so s3test's 1-second
                // timestamp precision doesn't confuse fixRace.
-               return NewTestableS3Volume(c, -2*time.Second, false, 2)
+               return s.newTestableVolume(c, -2*time.Second, false, 2)
        })
 }
 
 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
        DoGenericVolumeTests(c, func(t TB) TestableVolume {
-               return NewTestableS3Volume(c, -2*time.Second, true, 2)
+               return s.newTestableVolume(c, -2*time.Second, true, 2)
        })
 }
 
 func (s *StubbedS3Suite) TestIndex(c *check.C) {
-       v := NewTestableS3Volume(c, 0, false, 2)
-       v.indexPageSize = 3
+       v := s.newTestableVolume(c, 0, false, 2)
+       v.IndexPageSize = 3
        for i := 0; i < 256; i++ {
                v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
        }
@@ -112,14 +82,14 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
 }
 
 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
-       defer func(tl, bs time.Duration) {
-               trashLifetime = tl
-               blobSignatureTTL = bs
-       }(trashLifetime, blobSignatureTTL)
-       trashLifetime = time.Hour
-       blobSignatureTTL = time.Hour
+       defer func(tl, bs arvados.Duration) {
+               theConfig.TrashLifetime = tl
+               theConfig.BlobSignatureTTL = bs
+       }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
+       theConfig.TrashLifetime.Set("1h")
+       theConfig.BlobSignatureTTL.Set("1h")
 
-       v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
+       v := s.newTestableVolume(c, 5*time.Minute, false, 2)
        var none time.Time
 
        putS3Obj := func(t time.Time, key string, data []byte) {
@@ -127,7 +97,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        return
                }
                v.serverClock.now = &t
-               v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+               v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
        }
 
        t0 := time.Now()
@@ -214,12 +184,12 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        false, false, false, true, false, false,
                },
                {
-                       "Erroneously trashed during a race, detected before trashLifetime",
+                       "Erroneously trashed during a race, detected before TrashLifetime",
                        none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
                        true, false, true, true, true, false,
                },
                {
-                       "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
+                       "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
                        none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
                        true, false, true, true, true, false,
                },
@@ -286,7 +256,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                // freshAfterEmpty
                loc, blk = setupScenario()
                v.EmptyTrash()
-               _, err = v.Bucket.Head("trash/"+loc, nil)
+               _, err = v.bucket.Head("trash/"+loc, nil)
                c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
                if scenario.freshAfterEmpty {
                        t, err := v.Mtime(loc)
@@ -307,9 +277,51 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
        }
 }
 
+type TestableS3Volume struct {
+       *S3Volume
+       server      *s3test.Server
+       c           *check.C
+       serverClock *fakeClock
+}
+
+func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
+       clock := &fakeClock{}
+       srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
+       c.Assert(err, check.IsNil)
+
+       tmp, err := ioutil.TempFile("", "keepstore")
+       c.Assert(err, check.IsNil)
+       defer os.Remove(tmp.Name())
+       _, err = tmp.Write([]byte("xxx\n"))
+       c.Assert(err, check.IsNil)
+       c.Assert(tmp.Close(), check.IsNil)
+
+       v := &TestableS3Volume{
+               S3Volume: &S3Volume{
+                       Bucket:             TestBucketName,
+                       AccessKeyFile:      tmp.Name(),
+                       SecretKeyFile:      tmp.Name(),
+                       Endpoint:           srv.URL(),
+                       Region:             "test-region-1",
+                       LocationConstraint: true,
+                       RaceWindow:         arvados.Duration(raceWindow),
+                       S3Replication:      replication,
+                       UnsafeDelete:       s3UnsafeDelete,
+                       ReadOnly:           readonly,
+                       IndexPageSize:      1000,
+               },
+               server:      srv,
+               serverClock: clock,
+       }
+       c.Assert(v.Start(), check.IsNil)
+       err = v.bucket.PutBucket(s3.ACL("private"))
+       c.Assert(err, check.IsNil)
+       return v
+}
+
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
-       err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+       err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
                log.Printf("PutRaw: %+v", err)
        }
@@ -320,7 +332,7 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 // while we do this.
 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
        v.serverClock.now = &lastPut
-       err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
                panic(err)
        }
index d11bc05192246a75e8ba4c95bd544b0712279ff6..27d6216d01633feca360de94f0a8febaabfb475a 100644 (file)
@@ -4,6 +4,8 @@ import (
        "errors"
        "log"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
@@ -23,13 +25,13 @@ func RunTrashWorker(trashq *WorkQueue) {
 // TrashItem deletes the indicated block from every writable volume.
 func TrashItem(trashRequest TrashRequest) {
        reqMtime := time.Unix(0, trashRequest.BlockMtime)
-       if time.Since(reqMtime) < blobSignatureTTL {
+       if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
                log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
-                       time.Since(reqMtime),
+                       arvados.Duration(time.Since(reqMtime)),
                        trashRequest.Locator,
                        trashRequest.BlockMtime,
                        reqMtime,
-                       blobSignatureTTL)
+                       theConfig.BlobSignatureTTL)
                return
        }
 
@@ -44,8 +46,8 @@ func TrashItem(trashRequest TrashRequest) {
                        continue
                }
 
-               if neverDelete {
-                       err = errors.New("did not delete block because neverDelete is true")
+               if !theConfig.EnableDelete {
+                       err = errors.New("did not delete block because EnableDelete is false")
                } else {
                        err = volume.Trash(trashRequest.Locator)
                }
index 94798d95acfd85216ad60982b71282d84530ef7d..5ec413d1bde899d606a6792f40ffd3afe65f3615 100644 (file)
@@ -31,7 +31,7 @@ type TrashWorkerTestData struct {
    Expect no errors.
 */
 func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: "5d41402abc4b2a76b9719d911017c592",
                Block1:   []byte("hello"),
@@ -53,7 +53,7 @@ func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
    Expect the second locator in volume 2 to be unaffected.
 */
 func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -75,7 +75,7 @@ func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
    Expect the first locator in volume 1 to be unaffected.
 */
 func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -97,7 +97,7 @@ func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
    Expect locator to be deleted from both volumes.
 */
 func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -119,7 +119,7 @@ func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
    Delete the second and expect the first to be still around.
 */
 func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -143,7 +143,7 @@ func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *test
    Expect the other unaffected.
 */
 func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -166,7 +166,7 @@ func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
    will not be deleted because its Mtime is within the trash life time.
 */
 func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
-       neverDelete = false
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -188,11 +188,11 @@ func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(
        performTrashWorkerTest(testData, t)
 }
 
-/* Delete a block with matching mtime for locator in both volumes, but neverDelete is true,
+/* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
    so block won't be deleted.
 */
-func TestTrashWorkerIntegration_NeverDelete(t *testing.T) {
-       neverDelete = true
+func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) {
+       theConfig.EnableDelete = false
        testData := TrashWorkerTestData{
                Locator1: TestHash,
                Block1:   TestBlock,
@@ -231,7 +231,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
                }
        }
 
-       oldBlockTime := time.Now().Add(-blobSignatureTTL - time.Minute)
+       oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute)
 
        // Create TrashRequest for the test
        trashRequest := TrashRequest{
diff --git a/services/keepstore/usage.go b/services/keepstore/usage.go
new file mode 100644 (file)
index 0000000..29f89f5
--- /dev/null
@@ -0,0 +1,124 @@
+package main
+
+import (
+       "flag"
+       "fmt"
+       "os"
+       "sort"
+       "strings"
+
+       "github.com/ghodss/yaml"
+)
+
+func usage() {
+       c := DefaultConfig()
+       knownTypes := []string{}
+       for _, vt := range VolumeTypes {
+               c.Volumes = append(c.Volumes, vt().Examples()...)
+               knownTypes = append(knownTypes, vt().Type())
+       }
+       exampleConfigFile, err := yaml.Marshal(c)
+       if err != nil {
+               panic(err)
+       }
+       sort.Strings(knownTypes)
+       knownTypeList := strings.Join(knownTypes, ", ")
+       fmt.Fprintf(os.Stderr, `
+
+keepstore provides a content-addressed data store backed by a local filesystem or networked storage.
+
+Usage: keepstore -config path/to/keepstore.yml
+       keepstore [OPTIONS] -dump-config
+
+NOTE: All options (other than -config) are deprecated in favor of YAML
+      configuration. Use -dump-config to translate existing
+      configurations to YAML format.
+
+Options:
+`)
+       flag.PrintDefaults()
+       fmt.Fprintf(os.Stderr, `
+Example config file:
+
+%s
+
+Listen:
+
+    Local port to listen on. Can be "address:port" or ":port", where
+    "address" is a host IP address or name and "port" is a port number
+    or name.
+
+PIDFile:
+
+   Path to write PID file during startup. This file is kept open and
+   locked with LOCK_EX until keepstore exits, so "fuser -k pidfile" is
+   one way to shut down. Exit immediately if there is an error
+   opening, locking, or writing the PID file.
+
+MaxBuffers:
+
+    Maximum RAM to use for data buffers, given in multiples of block
+    size (64 MiB). When this limit is reached, HTTP requests requiring
+    buffers (like GET and PUT) will wait for buffer space to be
+    released.
+
+MaxRequests:
+
+    Maximum concurrent requests. When this limit is reached, new
+    requests will receive 503 responses. Note: this limit does not
+    include idle connections from clients using HTTP keepalive, so it
+    does not strictly limit the number of concurrent connections. If
+    omitted or zero, the default is 2 * MaxBuffers.
+
+BlobSigningKeyFile:
+
+    Local file containing the secret blob signing key (used to
+    generate and verify blob signatures).  This key should be
+    identical to the API server's blob_signing_key configuration
+    entry.
+
+RequireSignatures:
+
+    Honor read requests only if a valid signature is provided.  This
+    should be true, except for development use and when migrating from
+    a very old version.
+
+BlobSignatureTTL:
+
+    Duration for which new permission signatures (returned in PUT
+    responses) will be valid.  This should be equal to the API
+    server's blob_signature_ttl configuration entry.
+
+SystemAuthTokenFile:
+
+    Local file containing the Arvados API token used by keep-balance
+    or data manager.  Delete, trash, and index requests are honored
+    only for this token.
+
+EnableDelete:
+
+    Enable trash and delete features. If false, trash lists will be
+    accepted but blocks will not be trashed or deleted.
+
+TrashLifetime:
+
+    Time duration after a block is trashed during which it can be
+    recovered using an /untrash request.
+
+TrashCheckInterval:
+
+    How often to check for (and delete) trashed blocks whose
+    TrashLifetime has expired.
+
+Volumes:
+
+    List of storage volumes. If omitted or empty, the default is to
+    use all directories named "keep" that exist in the top level
+    directory of a mount point at startup time.
+
+    Volume types: %s
+
+    (See volume configuration examples above.)
+
+`, exampleConfigFile, knownTypeList)
+}
index 8ae6660fd477fa90365a019c837a121a08cc9595..6e01e75b879b339232603d38f93cb040ecc6d86c 100644 (file)
@@ -10,6 +10,15 @@ import (
 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
 // etc.
 type Volume interface {
+       // Volume type as specified in config file. Examples: "S3",
+       // "Directory".
+       Type() string
+
+       // Do whatever private setup tasks and configuration checks
+       // are needed. Return non-nil if the volume is unusable (e.g.,
+       // invalid config).
+       Start() error
+
        // Get a block: copy the block data into buf, and return the
        // number of bytes copied.
        //
@@ -150,7 +159,7 @@ type Volume interface {
        // loc is as described in Get.
        //
        // If the timestamp for the given locator is newer than
-       // blobSignatureTTL, Trash must not trash the data.
+       // BlobSignatureTTL, Trash must not trash the data.
        //
        // If a Trash operation overlaps with any Touch or Put
        // operations on the same locator, the implementation must
@@ -171,7 +180,7 @@ type Volume interface {
        // reliably or fail outright.
        //
        // Corollary: A successful Touch or Put guarantees a block
-       // will not be trashed for at least blobSignatureTTL
+       // will not be trashed for at least BlobSignatureTTL
        // seconds.
        Trash(loc string) error
 
@@ -204,11 +213,18 @@ type Volume interface {
        // responses to PUT requests.
        Replication() int
 
-       // EmptyTrash looks for trashed blocks that exceeded trashLifetime
+       // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
        // and deletes them from the volume.
        EmptyTrash()
 }
 
+// A VolumeWithExamples provides example configs to display in the
+// -help message.
+type VolumeWithExamples interface {
+       Volume
+       Examples() []Volume
+}
+
 // A VolumeManager tells callers which volumes can read, which volumes
 // can write, and on which volume the next write should be attempted.
 type VolumeManager interface {
index bc3e537a89a815037102af7fb920e8b9d2b84f61..1738fe9b513bb4d86482ceede86a04539d29d418 100644 (file)
@@ -11,6 +11,7 @@ import (
        "strings"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 )
 
@@ -430,7 +431,7 @@ func testIndexTo(t TB, factory TestableVolumeFactory) {
 func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
-       blobSignatureTTL = 300 * time.Second
+       theConfig.BlobSignatureTTL.Set("5m")
 
        if v.Writable() == false {
                return
@@ -451,19 +452,19 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
 }
 
 // Calling Delete() for a block with a timestamp older than
-// blobSignatureTTL seconds in the past should delete the data.
+// BlobSignatureTTL seconds in the past should delete the data.
 // Test is intended for only writable volumes
 func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
-       blobSignatureTTL = 300 * time.Second
+       theConfig.BlobSignatureTTL.Set("5m")
 
        if v.Writable() == false {
                return
        }
 
        v.Put(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        if err := v.Trash(TestHash); err != nil {
                t.Error(err)
@@ -733,7 +734,7 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
        }
 }
 
-// With trashLifetime != 0, perform:
+// With TrashLifetime != 0, perform:
 // Trash an old block - which either raises ErrNotImplemented or succeeds
 // Untrash -  which either raises ErrNotImplemented or succeeds
 // Get - which must succeed
@@ -741,14 +742,14 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
        defer func() {
-               trashLifetime = 0
+               theConfig.TrashLifetime = 0
        }()
 
-       trashLifetime = 3600 * time.Second
+       theConfig.TrashLifetime.Set("1h")
 
        // put block and backdate it
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        buf := make([]byte, BlockSize)
        n, err := v.Get(TestHash, buf)
@@ -795,9 +796,9 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
-       defer func(orig time.Duration) {
-               trashLifetime = orig
-       }(trashLifetime)
+       defer func(orig arvados.Duration) {
+               theConfig.TrashLifetime = orig
+       }(theConfig.TrashLifetime)
 
        checkGet := func() error {
                buf := make([]byte, BlockSize)
@@ -830,10 +831,10 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 
        // First set: EmptyTrash before reaching the trash deadline.
 
-       trashLifetime = time.Hour
+       theConfig.TrashLifetime.Set("1h")
 
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        err := checkGet()
        if err != nil {
@@ -844,7 +845,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        err = v.Trash(TestHash)
        if err == MethodDisabledError || err == ErrNotImplemented {
                // Skip the trash tests for read-only volumes, and
-               // volume types that don't support trashLifetime>0.
+               // volume types that don't support TrashLifetime>0.
                return
        }
 
@@ -878,7 +879,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        }
 
        // Because we Touch'ed, need to backdate again for next set of tests
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        // If the only block in the trash has already been untrashed,
        // most volumes will fail a subsequent Untrash with a 404, but
@@ -896,11 +897,11 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        }
 
        // Untrash might have updated the timestamp, so backdate again
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        // Second set: EmptyTrash after the trash deadline has passed.
 
-       trashLifetime = time.Nanosecond
+       theConfig.TrashLifetime.Set("1ns")
 
        err = v.Trash(TestHash)
        if err != nil {
@@ -925,7 +926,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        // Trash it again, and this time call EmptyTrash so it really
        // goes away.
        // (In Azure volumes, un/trash changes Mtime, so first backdate again)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
        err = v.Trash(TestHash)
        err = checkGet()
        if err == nil || !os.IsNotExist(err) {
@@ -950,9 +951,9 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        // un-trashed copy doesn't get deleted along with it.
 
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
-       trashLifetime = time.Nanosecond
+       theConfig.TrashLifetime.Set("1ns")
        err = v.Trash(TestHash)
        if err != nil {
                t.Fatal(err)
@@ -963,7 +964,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        }
 
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
        // EmptyTrash should not delete the untrashed copy.
        v.EmptyTrash()
@@ -978,18 +979,18 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        // untrash the block whose deadline is "C".
 
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
-       trashLifetime = time.Nanosecond
+       theConfig.TrashLifetime.Set("1ns")
        err = v.Trash(TestHash)
        if err != nil {
                t.Fatal(err)
        }
 
        v.PutRaw(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
-       trashLifetime = time.Hour
+       theConfig.TrashLifetime.Set("1h")
        err = v.Trash(TestHash)
        if err != nil {
                t.Fatal(err)
index 5671b8d4a9fd7405f8ca7fd35a18fdf10289a059..6ab386aec4fcc7774af90c4fa5ca879258ac9404 100644 (file)
@@ -189,7 +189,7 @@ func (v *MockVolume) Trash(loc string) error {
                return MethodDisabledError
        }
        if _, ok := v.Store[loc]; ok {
-               if time.Since(v.Timestamps[loc]) < blobSignatureTTL {
+               if time.Since(v.Timestamps[loc]) < time.Duration(theConfig.BlobSignatureTTL) {
                        return nil
                }
                delete(v.Store, loc)
@@ -198,7 +198,14 @@ func (v *MockVolume) Trash(loc string) error {
        return os.ErrNotExist
 }
 
-// TBD
+func (v *MockVolume) Type() string {
+       return "Mock"
+}
+
+func (v *MockVolume) Start() error {
+       return nil
+}
+
 func (v *MockVolume) Untrash(loc string) error {
        return nil
 }
index 5982fb0484eae0a37ef09e24b9526492c5b0459f..b5753dec04638927162a328d2a43f2fd4e567a50 100644 (file)
@@ -2,7 +2,6 @@ package main
 
 import (
        "bufio"
-       "errors"
        "flag"
        "fmt"
        "io"
@@ -19,11 +18,16 @@ import (
 )
 
 type unixVolumeAdder struct {
-       *volumeSet
+       *Config
 }
 
-func (vs *unixVolumeAdder) Set(value string) error {
-       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+// String implements flag.Value
+func (s *unixVolumeAdder) String() string {
+       return "-"
+}
+
+func (vs *unixVolumeAdder) Set(path string) error {
+       if dirs := strings.Split(path, ","); len(dirs) > 1 {
                log.Print("DEPRECATED: using comma-separated volume list.")
                for _, dir := range dirs {
                        if err := vs.Set(dir); err != nil {
@@ -32,33 +36,19 @@ func (vs *unixVolumeAdder) Set(value string) error {
                }
                return nil
        }
-       if len(value) == 0 || value[0] != '/' {
-               return errors.New("Invalid volume: must begin with '/'.")
-       }
-       if _, err := os.Stat(value); err != nil {
-               return err
-       }
-       var locker sync.Locker
-       if flagSerializeIO {
-               locker = &sync.Mutex{}
-       }
-       *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
-               root:     value,
-               locker:   locker,
-               readonly: flagReadonly,
+       vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
+               Root:      path,
+               ReadOnly:  deprecated.flagReadonly,
+               Serialize: deprecated.flagSerializeIO,
        })
        return nil
 }
 
 func init() {
-       flag.Var(
-               &unixVolumeAdder{&volumes},
-               "volumes",
-               "Deprecated synonym for -volume.")
-       flag.Var(
-               &unixVolumeAdder{&volumes},
-               "volume",
-               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+       VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
+
+       flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
+       flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
 }
 
 // Discover adds a UnixVolume for every directory named "keep" that is
@@ -89,10 +79,10 @@ func (vs *unixVolumeAdder) Discover() int {
                }
                // Set the -readonly flag (but only for this volume)
                // if the filesystem is mounted readonly.
-               flagReadonlyWas := flagReadonly
+               flagReadonlyWas := deprecated.flagReadonly
                for _, fsopt := range strings.Split(args[3], ",") {
                        if fsopt == "ro" {
-                               flagReadonly = true
+                               deprecated.flagReadonly = true
                                break
                        }
                        if fsopt == "rw" {
@@ -104,24 +94,62 @@ func (vs *unixVolumeAdder) Discover() int {
                } else {
                        added++
                }
-               flagReadonly = flagReadonlyWas
+               deprecated.flagReadonly = flagReadonlyWas
        }
        return added
 }
 
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       // path to the volume's root directory
-       root string
+       Root                 string // path to the volume's root directory
+       ReadOnly             bool
+       Serialize            bool
+       DirectoryReplication int
+
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
-       locker   sync.Locker
-       readonly bool
+       locker sync.Locker
+}
+
+// Examples implements VolumeWithExamples.
+func (*UnixVolume) Examples() []Volume {
+       return []Volume{
+               &UnixVolume{
+                       Root:                 "/mnt/local-disk",
+                       Serialize:            true,
+                       DirectoryReplication: 1,
+               },
+               &UnixVolume{
+                       Root:                 "/mnt/network-disk",
+                       Serialize:            false,
+                       DirectoryReplication: 2,
+               },
+       }
+}
+
+// Type implements Volume
+func (v *UnixVolume) Type() string {
+       return "Directory"
+}
+
+// Start implements Volume
+func (v *UnixVolume) Start() error {
+       if v.Serialize {
+               v.locker = &sync.Mutex{}
+       }
+       if !strings.HasPrefix(v.Root, "/") {
+               return fmt.Errorf("volume root does not start with '/': %q", v.Root)
+       }
+       if v.DirectoryReplication == 0 {
+               v.DirectoryReplication = 1
+       }
+       _, err := os.Stat(v.Root)
+       return err
 }
 
 // Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        p := v.blockPath(loc)
@@ -218,7 +246,7 @@ func (v *UnixVolume) Compare(loc string, expect []byte) error {
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
 func (v *UnixVolume) Put(loc string, block []byte) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        if v.IsFull() {
@@ -268,14 +296,14 @@ func (v *UnixVolume) Status() *VolumeStatus {
        var fs syscall.Statfs_t
        var devnum uint64
 
-       if fi, err := os.Stat(v.root); err == nil {
+       if fi, err := os.Stat(v.Root); err == nil {
                devnum = fi.Sys().(*syscall.Stat_t).Dev
        } else {
                log.Printf("%s: os.Stat: %s\n", v, err)
                return nil
        }
 
-       err := syscall.Statfs(v.root, &fs)
+       err := syscall.Statfs(v.Root, &fs)
        if err != nil {
                log.Printf("%s: statfs: %s\n", v, err)
                return nil
@@ -285,7 +313,7 @@ func (v *UnixVolume) Status() *VolumeStatus {
        // uses fs.Blocks - fs.Bfree.
        free := fs.Bavail * uint64(fs.Bsize)
        used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
-       return &VolumeStatus{v.root, devnum, free, used}
+       return &VolumeStatus{v.Root, devnum, free, used}
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
@@ -307,7 +335,7 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 //
 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
        var lastErr error
-       rootdir, err := os.Open(v.root)
+       rootdir, err := os.Open(v.Root)
        if err != nil {
                return err
        }
@@ -326,7 +354,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                if !blockDirRe.MatchString(names[0]) {
                        continue
                }
-               blockdirpath := filepath.Join(v.root, names[0])
+               blockdirpath := filepath.Join(v.Root, names[0])
                blockdir, err := os.Open(blockdirpath)
                if err != nil {
                        log.Print("Error reading ", blockdirpath, ": ", err)
@@ -360,9 +388,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 }
 
 // Trash trashes the block data from the unix storage
-// If trashLifetime == 0, the block is deleted
+// If TrashLifetime == 0, the block is deleted
 // Else, the block is renamed as path/{loc}.trash.{deadline},
-// where deadline = now + trashLifetime
+// where deadline = now + TrashLifetime
 func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
@@ -372,7 +400,7 @@ func (v *UnixVolume) Trash(loc string) error {
        // Trash() will read the correct up-to-date timestamp and choose not to
        // trash the file.
 
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        if v.locker != nil {
@@ -397,21 +425,21 @@ func (v *UnixVolume) Trash(loc string) error {
        // anyway (because the permission signatures have expired).
        if fi, err := os.Stat(p); err != nil {
                return err
-       } else if time.Since(fi.ModTime()) < blobSignatureTTL {
+       } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
                return nil
        }
 
-       if trashLifetime == 0 {
+       if theConfig.TrashLifetime == 0 {
                return os.Remove(p)
        }
-       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
 }
 
 // Untrash moves block from trash back into store
 // Look for path/{loc}.trash.{deadline} in storage,
 // and rename the first such file as path/{loc}
 func (v *UnixVolume) Untrash(loc string) (err error) {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
 
@@ -446,7 +474,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
 func (v *UnixVolume) blockDir(loc string) string {
-       return filepath.Join(v.root, loc[0:3])
+       return filepath.Join(v.Root, loc[0:3])
 }
 
 // blockPath returns the fully qualified pathname for the path to loc
@@ -459,7 +487,7 @@ func (v *UnixVolume) blockPath(loc string) string {
 // MinFreeKilobytes.
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
-       fullSymlink := v.root + "/full"
+       fullSymlink := v.Root + "/full"
 
        // Check if the volume has been marked as full in the last hour.
        if link, err := os.Readlink(fullSymlink); err == nil {
@@ -491,7 +519,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
 //
 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
        var fs syscall.Statfs_t
-       err = syscall.Statfs(v.root, &fs)
+       err = syscall.Statfs(v.Root, &fs)
        if err == nil {
                // Statfs output is not guaranteed to measure free
                // space in terms of 1K blocks.
@@ -501,19 +529,19 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
 }
 
 func (v *UnixVolume) String() string {
-       return fmt.Sprintf("[UnixVolume %s]", v.root)
+       return fmt.Sprintf("[UnixVolume %s]", v.Root)
 }
 
 // Writable returns false if all future Put, Mtime, and Delete calls
 // are expected to fail.
 func (v *UnixVolume) Writable() bool {
-       return !v.readonly
+       return !v.ReadOnly
 }
 
 // Replication returns the number of replicas promised by the
-// underlying device (currently assumed to be 1).
+// underlying device (as specified in configuration).
 func (v *UnixVolume) Replication() int {
-       return 1
+       return v.DirectoryReplication
 }
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
@@ -546,7 +574,7 @@ func (v *UnixVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int
 
-       err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+       err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
                if err != nil {
                        log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
                        return nil
index c95538bc4da380f7af5561984d7a069324cea970..887247d3c3956e9475edf8437c913b3e1fc922c9 100644 (file)
@@ -30,9 +30,9 @@ func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVol
        }
        return &TestableUnixVolume{
                UnixVolume: UnixVolume{
-                       root:     d,
+                       Root:     d,
+                       ReadOnly: readonly,
                        locker:   locker,
-                       readonly: readonly,
                },
                t: t,
        }
@@ -42,9 +42,9 @@ func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVol
 // the volume is readonly.
 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
        defer func(orig bool) {
-               v.readonly = orig
-       }(v.readonly)
-       v.readonly = false
+               v.ReadOnly = orig
+       }(v.ReadOnly)
+       v.ReadOnly = false
        err := v.Put(locator, data)
        if err != nil {
                v.t.Fatal(err)
@@ -59,7 +59,7 @@ func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
 }
 
 func (v *TestableUnixVolume) Teardown() {
-       if err := os.RemoveAll(v.root); err != nil {
+       if err := os.RemoveAll(v.Root); err != nil {
                v.t.Fatal(err)
        }
 }
@@ -101,6 +101,19 @@ func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
        })
 }
 
+func TestReplicationDefault1(t *testing.T) {
+       v := &UnixVolume{
+               Root:     "/",
+               ReadOnly: true,
+       }
+       if err := v.Start(); err != nil {
+               t.Error(err)
+       }
+       if got := v.Replication(); got != 1 {
+               t.Errorf("Replication() returned %d, expected 1 if no config given", got)
+       }
+}
+
 func TestGetNotFound(t *testing.T) {
        v := NewTestableUnixVolume(t, false, false)
        defer v.Teardown()
@@ -126,7 +139,7 @@ func TestPut(t *testing.T) {
        if err != nil {
                t.Error(err)
        }
-       p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+       p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
        if buf, err := ioutil.ReadFile(p); err != nil {
                t.Error(err)
        } else if bytes.Compare(buf, TestBlock) != 0 {
@@ -139,7 +152,7 @@ func TestPutBadVolume(t *testing.T) {
        v := NewTestableUnixVolume(t, false, false)
        defer v.Teardown()
 
-       os.Chmod(v.root, 000)
+       os.Chmod(v.Root, 000)
        err := v.Put(TestHash, TestBlock)
        if err == nil {
                t.Error("Write should have failed")
@@ -178,7 +191,7 @@ func TestIsFull(t *testing.T) {
        v := NewTestableUnixVolume(t, false, false)
        defer v.Teardown()
 
-       fullPath := v.root + "/full"
+       fullPath := v.Root + "/full"
        now := fmt.Sprintf("%d", time.Now().Unix())
        os.Symlink(now, fullPath)
        if !v.IsFull() {
@@ -200,8 +213,8 @@ func TestNodeStatus(t *testing.T) {
 
        // Get node status and make a basic sanity check.
        volinfo := v.Status()
-       if volinfo.MountPoint != v.root {
-               t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
+       if volinfo.MountPoint != v.Root {
+               t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
        }
        if volinfo.DeviceNum == 0 {
                t.Errorf("uninitialized device_num in %v", volinfo)
@@ -301,7 +314,7 @@ func TestUnixVolumeCompare(t *testing.T) {
                t.Errorf("Got err %q, expected %q", err, DiskHashError)
        }
 
-       p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+       p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
        os.Chmod(p, 000)
        err = v.Compare(TestHash, TestBlock)
        if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
index b25ed942fb350751d5d476889ca164897feca160..720c6364b58be7aa53d9273be2db07fdb7fe5a66 100755 (executable)
@@ -14,6 +14,13 @@ req_envs.each do |k|
   end
 end
 
+exclusive_mode = ARGV.index("--exclusive")
+exclusive_banner = "#######################################################################################
+#  THIS FILE IS MANAGED BY #{$0} -- CHANGES WILL BE OVERWRITTEN  #
+#######################################################################################\n\n"
+start_banner = "### BEGIN Arvados-managed keys -- changes between markers will be overwritten\n"
+end_banner = "### END Arvados-managed keys -- changes between markers will be overwritten\n"
+
 keys = ''
 
 seen = Hash.new
@@ -87,20 +94,36 @@ begin
     @homedir = Etc.getpwnam(l[:username]).dir
     userdotssh = File.join(@homedir, ".ssh")
     Dir.mkdir(userdotssh) if !File.exists?(userdotssh)
-    @key = "#######################################################################################
-#  THIS FILE IS MANAGED BY #{$0} -- CHANGES WILL BE OVERWRITTEN  #
-#######################################################################################\n\n"
-    @key += keys[l[:username]].join("\n") + "\n"
-    userauthkeys = File.join(userdotssh, "authorized_keys")
-    if !File.exists?(userauthkeys) or IO::read(userauthkeys) != @key then
-      f = File.new(userauthkeys, 'w')
-      f.write(@key)
+
+    newkeys = "###\n###\n" + keys[l[:username]].join("\n") + "\n###\n###\n"
+
+    keysfile = File.join(userdotssh, "authorized_keys")
+
+    if File.exists?(keysfile)
+      oldkeys = IO::read(keysfile)
+    else
+      oldkeys = ""
+    end
+
+    if exclusive_mode
+      newkeys = exclusive_banner + newkeys
+    elsif oldkeys.start_with?(exclusive_banner)
+      newkeys = start_banner + newkeys + end_banner
+    elsif (m = /^(.*?\n|)#{start_banner}(.*?\n|)#{end_banner}(.*)/m.match(oldkeys))
+      newkeys = m[1] + start_banner + newkeys + end_banner + m[3]
+    else
+      newkeys = start_banner + newkeys + end_banner + oldkeys
+    end
+
+    if oldkeys != newkeys then
+      f = File.new(keysfile, 'w')
+      f.write(newkeys)
       f.close()
     end
     FileUtils.chown_R(l[:username], nil, userdotssh)
     File.chmod(0700, userdotssh)
     File.chmod(0750, @homedir)
-    File.chmod(0600, userauthkeys)
+    File.chmod(0600, keysfile)
   end
 
   devnull.close