page.assert_selector 'a,button', text: 'Pause'
# Wait for pipeline run to complete
- wait_until_page_has 'Complete', pipeline_config['max_wait_seconds']
+ wait_until_page_has 'completed', pipeline_config['max_wait_seconds']
end
end
# Looks for the text_to_look_for for up to the max_time provided
def wait_until_page_has text_to_look_for, max_time=30
max_time = 30 if (!max_time || (max_time.to_s != max_time.to_i.to_s))
+ text_found = false
Timeout.timeout(max_time) do
- loop until page.has_text?(text_to_look_for)
+ until text_found do
+ visit_page_with_token 'active', current_path
+ text_found = has_text?(text_to_look_for)
+ end
end
end
-
end
#!/usr/bin/env python
-# Crunch script integration for running arvados-cwl-runner (importing
-# arvados_cwl module) inside a crunch job.
-#
+# Crunch script integration for running arvados-cwl-runner inside a crunch job.
+
+import arvados_cwl
+import sys
+
+try:
+ # Use the crunch script defined in the arvados_cwl package. This helps
+ # prevent the crunch script from going out of sync with the rest of the
+ # arvados_cwl package.
+ import arvados_cwl.crunch_script
+ arvados_cwl.crunch_script.run()
+ sys.exit()
+except ImportError:
+ pass
+
+# When running against an older arvados-cwl-runner package without
+# arvados_cwl.crunch_script, fall back to the old code.
+
+
# This gets the job record, transforms the script parameters into a valid CWL
# input object, then executes the CWL runner to run the underlying workflow or
# tool. When the workflow completes, record the output object in an output
# collection for this runner job.
import arvados
-import arvados_cwl
import arvados.collection
import arvados.util
import cwltool.main
@components.each do |componentname, component|
component[:script_parameters].each do |parametername, parameter|
parameter = { :value => parameter } unless parameter.is_a? Hash
- value =
- (params["#{componentname}::#{parametername}"] ||
- parameter[:value] ||
- (parameter[:output_of].nil? &&
- (params[parametername.to_s] ||
- parameter[:default])) ||
- nil)
- if value.nil? and
- ![false,'false',0,'0'].index parameter[:required]
- if parameter[:output_of]
- if not @components[parameter[:output_of].intern]
- errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
- end
- next
+ if params.has_key?("#{componentname}::#{parametername}")
+ value = params["#{componentname}::#{parametername}"]
+ elsif parameter.has_key?(:value)
+ value = parameter[:value]
+ elsif parameter.has_key?(:output_of)
+ if !@components[parameter[:output_of].intern]
+ errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
+ else
+ # value will be filled in later when the upstream
+ # component's output becomes known
end
+ next
+ elsif params.has_key?(parametername.to_s)
+ value = params[parametername.to_s]
+ elsif parameter.has_key?(:default)
+ value = parameter[:default]
+ else
errors << [componentname, parametername, "required parameter is missing"]
+ next
end
debuglog "parameter #{componentname}::#{parametername} == #{value}"
self.uploaded = {}
self.num_retries = 4
self.uuid = None
- self.work_api = work_api
self.stop_polling = threading.Event()
self.poll_api = None
self.pipeline = None
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
- if self.work_api is None:
- # todo: autodetect API to use.
- self.work_api = "jobs"
-
- if self.work_api not in ("containers", "jobs"):
- raise Exception("Unsupported API '%s'" % self.work_api)
+ for api in ["jobs", "containers"]:
+ try:
+ methods = self.api._rootDesc.get('resources')[api]['methods']
+ if ('httpMethod' in methods['create'] and
+ (work_api == api or work_api is None)):
+ self.work_api = api
+ break
+ except KeyError:
+ pass
+ if not self.work_api:
+ if work_api is None:
+ raise Exception("No supported APIs")
+ else:
+ raise Exception("Unsupported API '%s'" % work_api)
def arv_make_tool(self, toolpath_object, **kwargs):
kwargs["work_api"] = self.work_api
job_spec = self.arvados_job_spec(*args, **kwargs)
for k,v in job_spec["script_parameters"].items():
- if isinstance(v, dict):
+ if v is False or v is None or isinstance(v, dict):
job_spec["script_parameters"][k] = {"value": v}
self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
--- /dev/null
+# Crunch script integration for running arvados-cwl-runner (importing
+# arvados_cwl module) inside a crunch job.
+#
+# This gets the job record, transforms the script parameters into a valid CWL
+# input object, then executes the CWL runner to run the underlying workflow or
+# tool. When the workflow completes, record the output object in an output
+# collection for this runner job.
+
+import arvados
+import arvados_cwl
+import arvados.collection
+import arvados.util
+import cwltool.main
+import logging
+import os
+import json
+import argparse
+import re
+import functools
+
+from arvados.api import OrderedJsonModel
+from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
+from cwltool.load_tool import load_tool
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+def run():
+ # Print package versions
+ logger.info(arvados_cwl.versionstring())
+
+ api = arvados.api("v1")
+
+ arvados_cwl.add_arv_hints()
+
+ try:
+ job_order_object = arvados.current_job()['script_parameters']
+
+ pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
+
+ def keeppath(v):
+ if pdh_path.match(v):
+ return "keep:%s" % v
+ else:
+ return v
+
+ def keeppathObj(v):
+ v["location"] = keeppath(v["location"])
+
+ job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
+
+ for k,v in job_order_object.items():
+ if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
+ job_order_object[k] = {
+ "class": "File",
+ "location": "keep:%s" % v
+ }
+
+ adjustFileObjs(job_order_object, keeppathObj)
+ adjustDirObjs(job_order_object, keeppathObj)
+ normalizeFilesDirs(job_order_object)
+ adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
+
+ output_name = None
+ if "arv:output_name" in job_order_object:
+ output_name = job_order_object["arv:output_name"]
+ del job_order_object["arv:output_name"]
+
+ runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
+ output_name=output_name)
+
+ t = load_tool(job_order_object, runner.arv_make_tool)
+
+ args = argparse.Namespace()
+ args.project_uuid = arvados.current_job()["owner_uuid"]
+ args.enable_reuse = True
+ args.submit = False
+ args.debug = True
+ args.quiet = False
+ args.ignore_docker_for_reuse = False
+ args.basedir = os.getcwd()
+ args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
+ outputObj = runner.arv_executor(t, job_order_object, **vars(args))
+
+ if runner.final_output_collection:
+ outputCollection = runner.final_output_collection.portable_data_hash()
+ else:
+ outputCollection = None
+
+ api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+ body={
+ 'output': outputCollection,
+ 'success': True,
+ 'progress':1.0
+ }).execute()
+ except Exception as e:
+ logging.exception("Unhandled exception")
+ api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+ body={
+ 'output': None,
+ 'success': False,
+ 'progress':1.0
+ }).execute()
job_order.get("id", "#"),
False)
- adjustDirObjs(job_order, trim_listing)
-
if "id" in job_order:
del job_order["id"]
def arvados_job_spec(self, *args, **kwargs):
self.name = os.path.basename(self.tool.tool["id"])
- return upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
+ workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
+ adjustDirObjs(self.job_order, trim_listing)
+ return workflowmapper
def done(self, record):
if record["state"] == "Complete":
+import functools
+import json
import logging
import mock
-import unittest
import os
-import functools
-import json
+import unittest
+import arvados
import arvados_cwl
import cwltool.process
from schema_salad.ref_resolver import Loader
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.collection.Collection")
def test_run(self, mockcollection):
- try:
- arvados_cwl.add_arv_hints()
+ arvados_cwl.add_arv_hints()
- runner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.ignore_docker_for_reuse = False
- runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ api = mock.MagicMock()
+ api._rootDesc = arvados.api('v1')._rootDesc
+ runner = arvados_cwl.ArvCwlRunner(api)
+ self.assertEqual(runner.work_api, 'jobs')
- tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
- metadata["cwlVersion"] = tool["cwlVersion"]
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+ runner.num_retries = 0
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
- mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+ tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
+ metadata["cwlVersion"] = tool["cwlVersion"]
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=document_loader,
- makeTool=runner.arv_make_tool, metadata=metadata)
- arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
- it.next().run()
- it.next().run()
+ mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
- with open("tests/wf/scatter2_subwf.cwl") as f:
- subwf = f.read()
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
+ basedir="", make_fs_access=make_fs_access, loader=document_loader,
+ makeTool=runner.arv_make_tool, metadata=metadata)
+ arvtool.formatgraph = None
+ it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
+ it.next().run()
+ it.next().run()
- mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
- mockcollection().open().__enter__().write.assert_has_calls([mock.call('{sleeptime: 5}')])
+ with open("tests/wf/scatter2_subwf.cwl") as f:
+ subwf = f.read()
- runner.api.jobs().create.assert_called_with(
- body={
- 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
- 'repository': 'arvados',
- 'script_version': 'master',
- 'script': 'crunchrunner',
- 'script_parameters': {
- 'tasks': [{'task.env': {
- 'HOME': '$(task.outdir)',
- 'TMPDIR': '$(task.tmpdir)'},
- 'task.vwd': {
- 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
- 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
- },
- 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
- 'task.stdout': 'cwl.output.json'}]},
- 'runtime_constraints': {
- 'min_scratch_mb_per_node': 2048,
- 'min_cores_per_node': 1,
- 'docker_image': 'arvados/jobs',
- 'min_ram_mb_per_node': 1024
- },
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'},
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']],
- find_or_create=True)
- except:
- logging.exception("")
+ mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
+ mockcollection().open().__enter__().write.assert_has_calls([mock.call('{sleeptime: 5}')])
+
+ runner.api.jobs().create.assert_called_with(
+ body={
+ 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
+ 'repository': 'arvados',
+ 'script_version': 'master',
+ 'script': 'crunchrunner',
+ 'script_parameters': {
+ 'tasks': [{'task.env': {
+ 'HOME': '$(task.outdir)',
+ 'TMPDIR': '$(task.tmpdir)'},
+ 'task.vwd': {
+ 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
+ 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
+ },
+ 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
+ 'task.stdout': 'cwl.output.json'}]},
+ 'runtime_constraints': {
+ 'min_scratch_mb_per_node': 2048,
+ 'min_cores_per_node': 1,
+ 'docker_image': 'arvados/jobs',
+ 'min_ram_mb_per_node': 1024
+ },
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'},
+ filters=[['repository', '=', 'arvados'],
+ ['script', '=', 'crunchrunner'],
+ ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
+ ['docker_image_locator', 'in docker', 'arvados/jobs']],
+ find_or_create=True)
+
+ def test_default_work_api(self):
+ arvados_cwl.add_arv_hints()
+
+ api = mock.MagicMock()
+ api._rootDesc = arvados.api('v1')._rootDesc
+ del api._rootDesc.get('resources')['jobs']['methods']['create']
+ runner = arvados_cwl.ArvCwlRunner(api)
+ self.assertEqual(runner.work_api, 'containers')
+import functools
+import json
import logging
import mock
-import unittest
import os
-import functools
-import json
import StringIO
+import unittest
+import arvados
import arvados_cwl
class TestMakeOutput(unittest.TestCase):
+ def setUp(self):
+ self.api = mock.MagicMock()
+ self.api._rootDesc = arvados.api('v1')._rootDesc
+
@mock.patch("arvados.collection.Collection")
@mock.patch("arvados.collection.CollectionReader")
def test_make_output_collection(self, reader, col):
- api = mock.MagicMock()
keep_client = mock.MagicMock()
- runner = arvados_cwl.ArvCwlRunner(api, keep_client=keep_client)
+ runner = arvados_cwl.ArvCwlRunner(self.api, keep_client=keep_client)
runner.project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
final = mock.MagicMock()
c.fn = fnPattern % (pdh, os.path.basename(c.fn))
class TestPathmap(unittest.TestCase):
+ def setUp(self):
+ self.api = mock.MagicMock()
+ self.api._rootDesc = arvados.api('v1')._rootDesc
+
def test_keepref(self):
"""Test direct keep references."""
- arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+ arvrunner = arvados_cwl.ArvCwlRunner(self.api)
p = ArvPathMapper(arvrunner, [{
"class": "File",
def test_upload(self, upl):
"""Test pathmapper uploading files."""
- arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+ arvrunner = arvados_cwl.ArvCwlRunner(self.api)
upl.side_effect = upload_mock
def test_prev_uploaded(self, upl):
"""Test pathmapper handling previously uploaded files."""
- arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+ arvrunner = arvados_cwl.ArvCwlRunner(self.api)
arvrunner.add_uploaded('tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File'))
upl.side_effect = upload_mock
@mock.patch("arvados.commands.run.statfile")
def test_statfile(self, statfile, upl):
"""Test pathmapper handling ArvFile references."""
- arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+ arvrunner = arvados_cwl.ArvCwlRunner(self.api)
# An ArvFile object returned from arvados.commands.run.statfile means the file is located on a
# keep mount, so we can construct a direct reference directly without upload.
-import arvados
-import arvados.keep
-import arvados.collection
-import arvados_cwl
import copy
import cStringIO
import functools
import hashlib
+import json
+import logging
import mock
import sys
import unittest
-import json
-import logging
+
+import arvados
+import arvados.collection
+import arvados_cwl
+import arvados.keep
from .matcher import JsonDiffMatcher
stubs.api = mock.MagicMock()
+ stubs.api._rootDesc = arvados.api('v1')._rootDesc
stubs.api.users().current().execute.return_value = {
"uuid": stubs.fake_user_uuid,
}
return time.Duration(d).String()
}
+// Duration returns a time.Duration
+func (d Duration) Duration() time.Duration {
+ return time.Duration(d)
+}
+
// Value implements flag.Value
func (d *Duration) Set(s string) error {
dur, err := time.ParseDuration(s)
package streamer
import (
+ "errors"
"io"
)
+var ErrAlreadyClosed = errors.New("cannot close a stream twice")
+
type AsyncStream struct {
buffer []byte
requests chan sliceRequest
add_reader chan bool
subtract_reader chan bool
wait_zero_readers chan bool
+ closed bool
}
// Reads from the buffer managed by the Transfer()
}
func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
- t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+ t := &AsyncStream{
+ buffer: make([]byte, buffersize),
+ requests: make(chan sliceRequest),
+ add_reader: make(chan bool),
+ subtract_reader: make(chan bool),
+ wait_zero_readers: make(chan bool),
+ }
go t.transfer(source)
go t.readersMonitor()
}
func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+ t := &AsyncStream{
+ buffer: buf,
+ requests: make(chan sliceRequest),
+ add_reader: make(chan bool),
+ subtract_reader: make(chan bool),
+ wait_zero_readers: make(chan bool),
+ }
go t.transfer(nil)
go t.readersMonitor()
// Close the responses channel
func (this *StreamReader) Close() error {
+ if this.stream == nil {
+ return ErrAlreadyClosed
+ }
this.stream.subtract_reader <- true
close(this.responses)
this.stream = nil
return nil
}
-func (this *AsyncStream) Close() {
+func (this *AsyncStream) Close() error {
+ if this.closed {
+ return ErrAlreadyClosed
+ }
+ this.closed = true
this.wait_zero_readers <- true
close(this.requests)
close(this.add_reader)
close(this.subtract_reader)
close(this.wait_zero_readers)
+ return nil
}
writer.Write([]byte("baz"))
writer.Close()
}
+
+func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
+ buffer := make([]byte, 100)
+ tr := AsyncStreamFromSlice(buffer)
+ sr := tr.MakeStreamReader()
+ c.Check(sr.Close(), IsNil)
+ c.Check(sr.Close(), Equals, ErrAlreadyClosed)
+ c.Check(tr.Close(), IsNil)
+ c.Check(tr.Close(), Equals, ErrAlreadyClosed)
+}
with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
keep_args['-blob-signing-key-file'] = f.name
f.write(blob_signing_key)
- if enforce_permissions:
- keep_args['-enforce-permissions'] = 'true'
+ keep_args['-enforce-permissions'] = str(enforce_permissions).lower()
with open(os.path.join(TEST_TMPDIR, "keep.data-manager-token-file"), "w") as f:
keep_args['-data-manager-token-file'] = f.name
f.write(auth_token('data_manager'))
gem 'themes_for_rails'
gem 'arvados', '>= 0.1.20150615153458'
-gem 'arvados-cli', '>= 0.1.20151207150126'
+gem 'arvados-cli', '>= 0.1.20161017193526'
# pg_power lets us use partial indexes in schema.rb in Rails 3
gem 'pg_power'
i18n (~> 0)
json (~> 1.7, >= 1.7.7)
jwt (>= 0.1.5, < 2)
- arvados-cli (0.1.20160503204200)
+ arvados-cli (0.1.20161017193526)
activesupport (~> 3.2, >= 3.2.13)
andand (~> 1.3, >= 1.3.3)
arvados (~> 0.1, >= 0.1.20150128223554)
curb (~> 0.8)
- google-api-client (~> 0.6, >= 0.6.3, < 0.9)
+ google-api-client (~> 0.6, >= 0.6.3, < 0.8.9)
json (~> 1.7, >= 1.7.7)
oj (~> 2.0, >= 2.0.3)
trollop (~> 2.0)
acts_as_api
andand
arvados (>= 0.1.20150615153458)
- arvados-cli (>= 0.1.20151207150126)
+ arvados-cli (>= 0.1.20161017193526)
coffee-rails (~> 3.2.0)
database_cleaner
factory_girl_rails
ERROR_ACTIONS = [:render_error, :render_not_found]
+ before_filter :disable_api_methods
before_filter :set_cors_headers
before_filter :respond_with_json_by_default
before_filter :remote_ip
end
end
+ def disable_api_methods
+ if Rails.configuration.disable_api_methods.
+ include?(controller_name + "." + action_name)
+ send_error("Disabled", status: 404)
+ end
+ end
+
def set_cors_headers
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, HEAD, PUT, POST, DELETE'
end
end
end
+ Rails.configuration.disable_api_methods.each do |method|
+ ctrl, action = method.split('.', 2)
+ discovery[:resources][ctrl][:methods].delete(action.to_sym)
+ end
discovery
end
send_json discovery
:identity_url => omniauth['info']['identity_url'],
:is_active => Rails.configuration.new_users_are_active,
:owner_uuid => system_user_uuid)
+ if omniauth['info']['username']
+ user.set_initial_username(requested: omniauth['info']['username'])
+ end
act_as_system_user do
user.save or raise Exception.new(user.errors.messages)
end
# looks like a saved Docker image.
manifest = Keep::Manifest.new(coll_match.manifest_text)
if manifest.exact_file_count?(1) and
- (manifest.files[0][1] =~ /^[0-9A-Fa-f]{64}\.tar$/)
+ (manifest.files[0][1] =~ /^(sha256:)?[0-9A-Fa-f]{64}\.tar$/)
return [coll_match]
end
end
t.add :requesting_container_uuid
t.add :runtime_constraints
t.add :state
+ t.add :use_existing
end
# Supported states for a container request
container_image: c_container_image,
mounts: c_mounts,
runtime_constraints: c_runtime_constraints}
- reusable = Container.find_reusable(c_attrs)
+
+ reusable = self.use_existing ? Container.find_reusable(c_attrs) : nil
if not reusable.nil?
reusable
else
:container_image, :cwd, :description, :environment,
:filters, :mounts, :name, :output_path, :priority,
:properties, :requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid
+ :state, :container_uuid, :use_existing
when Committed
if container_uuid.nil?
self.save!
end
+ def set_initial_username(requested: false)
+ if !requested.is_a?(String) || requested.empty?
+ email_parts = email.partition("@")
+ local_parts = email_parts.first.partition("+")
+ if email_parts.any?(&:empty?)
+ return
+ elsif not local_parts.first.empty?
+ requested = local_parts.first
+ else
+ requested = email_parts.first
+ end
+ end
+ requested.sub!(/^[^A-Za-z]+/, "")
+ requested.gsub!(/[^A-Za-z0-9]/, "")
+ unless requested.empty?
+ self.username = find_usable_username_from(requested)
+ end
+ end
+
protected
def ensure_ownership_path_leads_to_user
nil
end
- def set_initial_username
- email_parts = email.partition("@")
- local_parts = email_parts.first.partition("+")
- if email_parts.any?(&:empty?)
- return
- elsif not local_parts.first.empty?
- base_username = local_parts.first
- else
- base_username = email_parts.first
- end
- base_username.sub!(/^[^A-Za-z]+/, "")
- base_username.gsub!(/[^A-Za-z0-9]/, "")
- unless base_username.empty?
- self.username = find_usable_username_from(base_username)
- end
- end
-
def prevent_privilege_escalation
if current_user.andand.is_admin
return true
# Use at your own risk.
unlogged_attributes: []
+ # API methods to disable. Disabled methods are not listed in the
+ # discovery document, and respond 404 to all requests.
+ # Example: ["jobs.create", "pipeline_instances.create"]
+ disable_api_methods: []
+
###
### Crunch, DNS & compute node management
###
--- /dev/null
+class AddUseExistingToContainerRequests < ActiveRecord::Migration
+ def up
+ add_column :container_requests, :use_existing, :boolean, :default => true
+ end
+
+ def down
+ remove_column :container_requests, :use_existing
+ end
+end
expires_at timestamp without time zone,
filters text,
updated_at timestamp without time zone NOT NULL,
- container_count integer DEFAULT 0
+ container_count integer DEFAULT 0,
+ use_existing boolean DEFAULT true
);
INSERT INTO schema_migrations (version) VALUES ('20160909181442');
-INSERT INTO schema_migrations (version) VALUES ('20160926194129');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20160926194129');
+
+INSERT INTO schema_migrations (version) VALUES ('20161019171346');
\ No newline at end of file
:last_name => raw_info['info']['last_name'],
:email => raw_info['info']['email'],
:identity_url => raw_info['info']['identity_url'],
+ :username => raw_info['info']['username'],
}
end
uuid: zzzzz-4zz18-1v45jub259sjjgb
# This Collection has links with Docker image metadata.
portable_data_hash: fa3c1a9cb6783f85f2ecda037e07b8c3+167
- owner_uuid: qr1hi-tpzed-000000000000000
+ owner_uuid: zzzzz-tpzed-000000000000000
created_at: 2014-06-11T17:22:54Z
modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
manifest_text: ". d21353cfe035e3e384563ee55eadbb2f+67108864 5c77a43e329b9838cbec18ff42790e57+55605760 0:122714624:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678.tar\n"
name: docker_image
+# untagged docker image with sha256:{hash}.tar filename
+docker_image_1_12:
+ uuid: zzzzz-4zz18-1g4g0vhpjn9wq7i
+ portable_data_hash: d740a57097711e08eb9b2a93518f20ab+174
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2016-10-19 08:50:45.653552268 Z
+ modified_by_client_uuid: zzzzz-ozdt8-teyxzyd8qllg11h
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2016-10-19 08:50:45.652930000 Z
+ updated_at: 2016-10-19 08:50:45.652930000 Z
+ manifest_text: ". d21353cfe035e3e384563ee55eadbb2f+67108864 5c77a43e329b9838cbec18ff42790e57+55605760 0:122714624:sha256:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678.tar\n"
+ name: docker_image_1_12
+
unlinked_docker_image:
uuid: zzzzz-4zz18-d0d8z5wofvfgwad
# This Collection contains a file that looks like a Docker image,
# but has no Docker metadata links pointing to it.
portable_data_hash: 9ae44d5792468c58bcf85ce7353c7027+124
- owner_uuid: qr1hi-tpzed-000000000000000
+ owner_uuid: zzzzz-tpzed-000000000000000
created_at: 2014-06-11T17:22:54Z
modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
assert_not_nil json_response["components"]
assert_equal [], json_response["components"].keys
end
+
+ test 'jobs.create disabled in config' do
+ Rails.configuration.disable_api_methods = ["jobs.create",
+ "pipeline_instances.create"]
+ authorize_with :active
+ post :create, job: {
+ script: "hash",
+ script_version: "master",
+ repository: "active/foo",
+ script_parameters: {}
+ }
+ assert_response 404
+ end
end
discovery_doc = JSON.parse(@response.body)
assert_equal 'aaa888fff', discovery_doc['source_version']
end
+
+ test "empty disable_api_methods" do
+ get :index
+ assert_response :success
+ discovery_doc = JSON.parse(@response.body)
+ assert_equal('POST',
+ discovery_doc['resources']['jobs']['methods']['create']['httpMethod'])
+ end
+
+ test "non-empty disable_api_methods" do
+ Rails.configuration.disable_api_methods =
+ ['jobs.create', 'pipeline_instances.create', 'pipeline_templates.create']
+ get :index
+ assert_response :success
+ discovery_doc = JSON.parse(@response.body)
+ ['jobs', 'pipeline_instances', 'pipeline_templates'].each do |r|
+ refute_includes(discovery_doc['resources'][r]['methods'].keys(), 'create')
+ end
+ end
end
'https://wb.example.com'
end
- def mock_auth_with_email email
+ def mock_auth_with(email: nil, username: nil)
mock = {
'provider' => 'josh_id',
'uid' => 'https://edward.example.com',
'name' => 'Edward Example',
'first_name' => 'Edward',
'last_name' => 'Example',
- 'email' => email,
},
}
+ mock['info']['email'] = email unless email.nil?
+ mock['info']['username'] = username unless username.nil?
post('/auth/josh_id/callback',
{return_to: client_url},
{'omniauth.auth' => mock})
assert_response :redirect, 'Did not redirect to client with token'
end
+ test 'assign username from sso' do
+ mock_auth_with(email: 'foo@example.com', username: 'bar')
+ u = assigns(:user)
+ assert_equal 'bar', u.username
+ end
+
+ test 'no assign username from sso' do
+ mock_auth_with(email: 'foo@example.com')
+ u = assigns(:user)
+ assert_equal 'foo', u.username
+ end
+
test 'create new user during omniauth callback' do
- mock_auth_with_email 'edward@example.com'
+ mock_auth_with(email: 'edward@example.com')
assert_equal(0, @response.redirect_url.index(client_url),
'Redirected to wrong address after succesful login: was ' +
@response.redirect_url + ', expected ' + client_url + '[...]')
Rails.configuration.auto_setup_new_users_with_repository =
testcase[:cfg][:repo]
- mock_auth_with_email testcase[:email]
+ mock_auth_with(email: testcase[:email])
u = assigns(:user)
vm_links = Link.where('link_class=? and tail_uuid=? and head_uuid like ?',
'permission', u.uuid,
test "container_image_for_container(pdh)" do
set_user_from_auth :active
- pdh = collections(:docker_image).portable_data_hash
- cr = ContainerRequest.new(container_image: pdh)
- resolved = cr.send :container_image_for_container
- assert_equal resolved, pdh
+ [:docker_image, :docker_image_1_12].each do |coll|
+ pdh = collections(coll).portable_data_hash
+ cr = ContainerRequest.new(container_image: pdh)
+ resolved = cr.send :container_image_for_container
+ assert_equal resolved, pdh
+ end
end
['acbd18db4cc2f85cedef654fccc4a4d8+3',
end
[
- [{"var" => "value1"}, {"var" => "value1"}],
- [{"var" => "value1"}, {"var" => "value2"}]
- ].each do |env1, env2|
- test "Container request #{(env1 == env2) ? 'does' : 'does not'} reuse container when committed" do
+ [{"var" => "value1"}, {"var" => "value1"}, nil],
+ [{"var" => "value1"}, {"var" => "value1"}, true],
+ [{"var" => "value1"}, {"var" => "value1"}, false],
+ [{"var" => "value1"}, {"var" => "value2"}, nil],
+ ].each do |env1, env2, use_existing|
+ test "Container request #{((env1 == env2) and (use_existing.nil? or use_existing == true)) ? 'does' : 'does not'} reuse container when committed#{use_existing.nil? ? '' : use_existing ? ' and use_existing == true' : ' and use_existing == false'}" do
common_attrs = {cwd: "test",
priority: 1,
command: ["echo", "hello"],
set_user_from_auth :active
cr1 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Committed,
environment: env1}))
- cr2 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Uncommitted,
- environment: env2}))
+ if use_existing.nil?
+ # Testing with use_existing default value
+ cr2 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Uncommitted,
+ environment: env2}))
+ else
+
+ cr2 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Uncommitted,
+ environment: env2,
+ use_existing: use_existing}))
+ end
assert_not_nil cr1.container_uuid
assert_nil cr2.container_uuid
- # Update cr2 to commited state and check for container equality on both cases,
- # when env1 and env2 are equal the same container should be assigned, and
- # when env1 and env2 are different, cr2 container should be different.
+ # Update cr2 to commited state and check for container equality on different cases:
+ # * When env1 and env2 are equal and use_existing is true, the same container
+ # should be assigned.
+ # * When use_existing is false, a different container should be assigned.
+ # * When env1 and env2 are different, a different container should be assigned.
cr2.update_attributes!({state: ContainerRequest::Committed})
- assert_equal (env1 == env2), (cr1.container_uuid == cr2.container_uuid)
+ assert_equal (cr2.use_existing == true and (env1 == env2)),
+ (cr1.container_uuid == cr2.container_uuid)
end
end
// propagated to crunch-run via SLURM.
os.Setenv("ARVADOS_API_HOST", theConfig.Client.APIHost)
os.Setenv("ARVADOS_API_TOKEN", theConfig.Client.AuthToken)
- os.Setenv("ARVADOS_API_INSECURE", "")
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "")
if theConfig.Client.Insecure {
- os.Setenv("ARVADOS_API_INSECURE", "1")
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(theConfig.Client.KeepServiceURIs, " "))
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
squeueUpdater.SlurmLock.Lock()
defer squeueUpdater.SlurmLock.Unlock()
+ log.Printf("sbatch starting: %+q", cmd.Args)
err := cmd.Start()
if err != nil {
- submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+ submitErr = fmt.Errorf("Error starting sbatch: %v", err)
return
}
log.Printf("Error creating stdout pipe for squeue: %v", err)
return
}
- cmd.Start()
+ err = cmd.Start()
+ if err != nil {
+ log.Printf("Error running squeue: %v", err)
+ return
+ }
scanner := bufio.NewScanner(sq)
for scanner.Scan() {
newSqueueContents = append(newSqueueContents, scanner.Text())
}
type azureVolumeAdder struct {
- *volumeSet
+ *Config
}
-func (s *azureVolumeAdder) Set(containerName string) error {
- if trashLifetime != 0 {
- return ErrNotImplemented
- }
+// String implements flag.Value
+func (s *azureVolumeAdder) String() string {
+ return "-"
+}
- if containerName == "" {
- return errors.New("no container name given")
- }
- if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
- return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
- }
- accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
- if err != nil {
- return err
- }
- azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
- if err != nil {
- return errors.New("creating Azure storage client: " + err.Error())
- }
- if flagSerializeIO {
- log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
- }
- v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
- if err := v.Check(); err != nil {
- return err
- }
- *s.volumeSet = append(*s.volumeSet, v)
+func (s *azureVolumeAdder) Set(containerName string) error {
+ s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
+ ContainerName: containerName,
+ StorageAccountName: azureStorageAccountName,
+ StorageAccountKeyFile: azureStorageAccountKeyFile,
+ AzureReplication: azureStorageReplication,
+ ReadOnly: deprecated.flagReadonly,
+ })
return nil
}
func init() {
- flag.Var(&azureVolumeAdder{&volumes},
+ VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
+
+ flag.Var(&azureVolumeAdder{theConfig},
"azure-storage-container-volume",
"Use the given container as a storage volume. Can be given multiple times.")
flag.StringVar(
&azureStorageAccountKeyFile,
"azure-storage-account-key-file",
"",
- "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+ "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
flag.IntVar(
&azureStorageReplication,
"azure-storage-replication",
// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
// container.
type AzureBlobVolume struct {
- azClient storage.Client
- bsClient storage.BlobStorageClient
- containerName string
- readonly bool
- replication int
+ StorageAccountName string
+ StorageAccountKeyFile string
+ ContainerName string
+ AzureReplication int
+ ReadOnly bool
+
+ azClient storage.Client
+ bsClient storage.BlobStorageClient
}
-// NewAzureBlobVolume returns a new AzureBlobVolume using the given
-// client and container name. The replication argument specifies the
-// replication level to report when writing data.
-func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
- return &AzureBlobVolume{
- azClient: client,
- bsClient: client.GetBlobService(),
- containerName: containerName,
- readonly: readonly,
- replication: replication,
+// Examples implements VolumeWithExamples.
+func (*AzureBlobVolume) Examples() []Volume {
+ return []Volume{
+ &AzureBlobVolume{
+ StorageAccountName: "example-account-name",
+ StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
+ ContainerName: "example-container-name",
+ AzureReplication: 3,
+ },
}
}
-// Check returns nil if the volume is usable.
-func (v *AzureBlobVolume) Check() error {
- ok, err := v.bsClient.ContainerExists(v.containerName)
+// Type implements Volume.
+func (v *AzureBlobVolume) Type() string {
+ return "Azure"
+}
+
+// Start implements Volume.
+func (v *AzureBlobVolume) Start() error {
+ if v.ContainerName == "" {
+ return errors.New("no container name given")
+ }
+ if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
+ return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
+ }
+ accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
+ if err != nil {
+ return err
+ }
+ v.azClient, err = storage.NewBasicClient(v.StorageAccountName, accountKey)
+ if err != nil {
+ return fmt.Errorf("creating Azure storage client: %s", err)
+ }
+ v.bsClient = v.azClient.GetBlobService()
+
+ ok, err := v.bsClient.ContainerExists(v.ContainerName)
if err != nil {
return err
}
if !ok {
- return errors.New("container does not exist")
+ return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
}
return nil
}
// Return true if expires_at metadata attribute is found on the block
func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
- metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+ metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
if err != nil {
return false, metadata, v.translateError(err)
}
if azureMaxGetBytes < BlockSize {
// Unfortunately the handler doesn't tell us how long the blob
// is expected to be, so we have to ask Azure.
- props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
if err != nil {
return 0, v.translateError(err)
}
var rdr io.ReadCloser
var err error
if startPos == 0 && endPos == expectSize {
- rdr, err = v.bsClient.GetBlob(v.containerName, loc)
+ rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
} else {
- rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+ rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
}
if err != nil {
errors[p] = err
if trashed {
return os.ErrNotExist
}
- rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+ rdr, err := v.bsClient.GetBlob(v.ContainerName, loc)
if err != nil {
return v.translateError(err)
}
// Put stores a Keep block as a block blob in the container.
func (v *AzureBlobVolume) Put(loc string, block []byte) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
- return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
+ return v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
}
// Touch updates the last-modified property of a block blob.
func (v *AzureBlobVolume) Touch(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
trashed, metadata, err := v.checkTrashed(loc)
}
metadata["touch"] = fmt.Sprintf("%d", time.Now())
- return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+ return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
}
// Mtime returns the last-modified property of a block blob.
return time.Time{}, os.ErrNotExist
}
- props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
if err != nil {
return time.Time{}, err
}
Include: "metadata",
}
for {
- resp, err := v.bsClient.ListBlobs(v.containerName, params)
+ resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
if err != nil {
return err
}
// Trash a Keep block.
func (v *AzureBlobVolume) Trash(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
// we get the Etag before checking Mtime, and use If-Match to
// ensure we don't delete data if Put() or Touch() happens
// between our calls to Mtime() and DeleteBlob().
- props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
if err != nil {
return err
}
if t, err := v.Mtime(loc); err != nil {
return err
- } else if time.Since(t) < blobSignatureTTL {
+ } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
return nil
}
- // If trashLifetime == 0, just delete it
- if trashLifetime == 0 {
- return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+ // If TrashLifetime == 0, just delete it
+ if theConfig.TrashLifetime == 0 {
+ return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
"If-Match": props.Etag,
})
}
// Otherwise, mark as trash
- return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
- "expires_at": fmt.Sprintf("%d", time.Now().Add(trashLifetime).Unix()),
+ return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
+ "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
}, map[string]string{
"If-Match": props.Etag,
})
// Delete the expires_at metadata attribute
func (v *AzureBlobVolume) Untrash(loc string) error {
// if expires_at does not exist, return NotFoundError
- metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+ metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
if err != nil {
return v.translateError(err)
}
// reset expires_at metadata attribute
metadata["expires_at"] = ""
- err = v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+ err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
return v.translateError(err)
}
// String returns a volume label, including the container name.
func (v *AzureBlobVolume) String() string {
- return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
+ return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
}
// Writable returns true, unless the -readonly flag was on when the
// volume was added.
func (v *AzureBlobVolume) Writable() bool {
- return !v.readonly
+ return !v.ReadOnly
}
// Replication returns the replication level of the container, as
// specified by the -azure-storage-replication argument.
func (v *AzureBlobVolume) Replication() int {
- return v.replication
+ return v.AzureReplication
}
// If possible, translate an Azure SDK error to a recognizable error
return keepBlockRegexp.MatchString(s)
}
-// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
// and deletes them from the volume.
func (v *AzureBlobVolume) EmptyTrash() {
var bytesDeleted, bytesInTrash int64
params := storage.ListBlobsParameters{Include: "metadata"}
for {
- resp, err := v.bsClient.ListBlobs(v.containerName, params)
+ resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
if err != nil {
log.Printf("EmptyTrash: ListBlobs: %v", err)
break
continue
}
- err = v.bsClient.DeleteBlob(v.containerName, b.Name, map[string]string{
+ err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
"If-Match": b.Properties.Etag,
})
if err != nil {
}
}
- v := NewAzureBlobVolume(azClient, container, readonly, replication)
+ v := &AzureBlobVolume{
+ ContainerName: container,
+ ReadOnly: readonly,
+ AzureReplication: replication,
+ azClient: azClient,
+ bsClient: azClient.GetBlobService(),
+ }
return &TestableAzureBlobVolume{
AzureBlobVolume: v,
}
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
- v.azHandler.PutRaw(v.containerName, locator, data)
+ v.azHandler.PutRaw(v.ContainerName, locator, data)
}
func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
- v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
+ v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
}
func (v *TestableAzureBlobVolume) Teardown() {
// Initialize a default-sized buffer pool for the benefit of test
// suites that don't run main().
func init() {
- bufs = newBufferPool(maxBuffers, BlockSize)
+ bufs = newBufferPool(theConfig.MaxBuffers, BlockSize)
}
// Restore sane default after bufferpool's own tests
func (s *BufferPoolSuite) TearDownTest(c *C) {
- bufs = newBufferPool(maxBuffers, BlockSize)
+ bufs = newBufferPool(theConfig.MaxBuffers, BlockSize)
}
func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
--- /dev/null
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type Config struct {
+ Listen string
+
+ PIDFile string
+
+ MaxBuffers int
+ MaxRequests int
+
+ BlobSignatureTTL arvados.Duration
+ BlobSigningKeyFile string
+ RequireSignatures bool
+ SystemAuthTokenFile string
+ EnableDelete bool
+ TrashLifetime arvados.Duration
+ TrashCheckInterval arvados.Duration
+
+ Volumes VolumeList
+
+ blobSigningKey []byte
+ systemAuthToken string
+}
+
+var theConfig = DefaultConfig()
+
+// DefaultConfig returns the default configuration.
+func DefaultConfig() *Config {
+ return &Config{
+ Listen: ":25107",
+ MaxBuffers: 128,
+ RequireSignatures: true,
+ BlobSignatureTTL: arvados.Duration(14 * 24 * time.Hour),
+ TrashLifetime: arvados.Duration(14 * 24 * time.Hour),
+ TrashCheckInterval: arvados.Duration(24 * time.Hour),
+ Volumes: []Volume{},
+ }
+}
+
+// Start should be called exactly once: after setting all public
+// fields, and before using the config.
+func (cfg *Config) Start() error {
+ if cfg.MaxBuffers < 0 {
+ return fmt.Errorf("MaxBuffers must be greater than zero")
+ }
+ bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
+
+ if cfg.MaxRequests < 1 {
+ cfg.MaxRequests = cfg.MaxBuffers * 2
+ log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
+ }
+
+ if cfg.BlobSigningKeyFile != "" {
+ buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
+ if err != nil {
+ return fmt.Errorf("reading blob signing key file: %s", err)
+ }
+ cfg.blobSigningKey = bytes.TrimSpace(buf)
+ if len(cfg.blobSigningKey) == 0 {
+ return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
+ }
+ } else if cfg.RequireSignatures {
+ return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
+ } else {
+ log.Println("Running without a blob signing key. Block locators " +
+ "returned by this server will not be signed, and will be rejected " +
+ "by a server that enforces permissions.")
+ log.Println("To fix this, use the BlobSigningKeyFile config entry.")
+ }
+
+ if fn := cfg.SystemAuthTokenFile; fn != "" {
+ buf, err := ioutil.ReadFile(fn)
+ if err != nil {
+ return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
+ }
+ cfg.systemAuthToken = strings.TrimSpace(string(buf))
+ }
+
+ if cfg.EnableDelete {
+ log.Print("Trash/delete features are enabled. WARNING: this has not " +
+ "been extensively tested. You should disable this unless you can afford to lose data.")
+ }
+
+ if len(cfg.Volumes) == 0 {
+ if (&unixVolumeAdder{cfg}).Discover() == 0 {
+ return fmt.Errorf("no volumes found")
+ }
+ }
+ for _, v := range cfg.Volumes {
+ if err := v.Start(); err != nil {
+ return fmt.Errorf("volume %s: %s", v, err)
+ }
+ log.Printf("Using volume %v (writable=%v)", v, v.Writable())
+ }
+ return nil
+}
+
+// VolumeTypes is built up by init() funcs in the source files that
+// define the volume types.
+var VolumeTypes = []func() VolumeWithExamples{}
+
+type VolumeList []Volume
+
+// UnmarshalJSON, given an array of objects, deserializes each object
+// as the volume type indicated by the object's Type field.
+func (vols *VolumeList) UnmarshalJSON(data []byte) error {
+ typeMap := map[string]func() VolumeWithExamples{}
+ for _, factory := range VolumeTypes {
+ t := factory().Type()
+ if _, ok := typeMap[t]; ok {
+ log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
+ }
+ typeMap[t] = factory
+ }
+
+ var mapList []map[string]interface{}
+ err := json.Unmarshal(data, &mapList)
+ if err != nil {
+ return err
+ }
+ for _, mapIn := range mapList {
+ typeIn, ok := mapIn["Type"].(string)
+ if !ok {
+ return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
+ }
+ factory, ok := typeMap[typeIn]
+ if !ok {
+ return fmt.Errorf("unsupported volume type %+q", typeIn)
+ }
+ data, err := json.Marshal(mapIn)
+ if err != nil {
+ return err
+ }
+ vol := factory()
+ err = json.Unmarshal(data, vol)
+ if err != nil {
+ return err
+ }
+ *vols = append(*vols, vol)
+ }
+ return nil
+}
+
+// MarshalJSON adds a "Type" field to each volume corresponding to its
+// Type().
+func (vl *VolumeList) MarshalJSON() ([]byte, error) {
+ data := []byte{'['}
+ for _, vs := range *vl {
+ j, err := json.Marshal(vs)
+ if err != nil {
+ return nil, err
+ }
+ if len(data) > 1 {
+ data = append(data, byte(','))
+ }
+ t, err := json.Marshal(vs.Type())
+ if err != nil {
+ panic(err)
+ }
+ data = append(data, j[0])
+ data = append(data, []byte(`"Type":`)...)
+ data = append(data, t...)
+ data = append(data, byte(','))
+ data = append(data, j[1:]...)
+ }
+ return append(data, byte(']')), nil
+}
--- /dev/null
+package main
+
+import (
+ "flag"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type deprecatedOptions struct {
+ flagSerializeIO bool
+ flagReadonly bool
+ neverDelete bool
+ signatureTTLSeconds int
+}
+
+var deprecated = deprecatedOptions{
+ neverDelete: !theConfig.EnableDelete,
+ signatureTTLSeconds: int(theConfig.BlobSignatureTTL.Duration() / time.Second),
+}
+
+func (depr *deprecatedOptions) beforeFlagParse(cfg *Config) {
+ flag.StringVar(&cfg.Listen, "listen", cfg.Listen, "see Listen configuration")
+ flag.IntVar(&cfg.MaxBuffers, "max-buffers", cfg.MaxBuffers, "see MaxBuffers configuration")
+ flag.IntVar(&cfg.MaxRequests, "max-requests", cfg.MaxRequests, "see MaxRequests configuration")
+ flag.BoolVar(&depr.neverDelete, "never-delete", depr.neverDelete, "see EnableDelete configuration")
+ flag.BoolVar(&cfg.RequireSignatures, "enforce-permissions", cfg.RequireSignatures, "see RequireSignatures configuration")
+ flag.StringVar(&cfg.BlobSigningKeyFile, "permission-key-file", cfg.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
+ flag.StringVar(&cfg.BlobSigningKeyFile, "blob-signing-key-file", cfg.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
+ flag.StringVar(&cfg.SystemAuthTokenFile, "data-manager-token-file", cfg.SystemAuthTokenFile, "see SystemAuthToken`File` configuration")
+ flag.IntVar(&depr.signatureTTLSeconds, "permission-ttl", depr.signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
+ flag.IntVar(&depr.signatureTTLSeconds, "blob-signature-ttl", depr.signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
+ flag.Var(&cfg.TrashLifetime, "trash-lifetime", "see TrashLifetime configuration")
+ flag.BoolVar(&depr.flagSerializeIO, "serialize", depr.flagSerializeIO, "serialize read and write operations on the following volumes.")
+ flag.BoolVar(&depr.flagReadonly, "readonly", depr.flagReadonly, "do not write, delete, or touch anything on the following volumes.")
+ flag.StringVar(&cfg.PIDFile, "pid", cfg.PIDFile, "see `PIDFile` configuration")
+ flag.Var(&cfg.TrashCheckInterval, "trash-check-interval", "see TrashCheckInterval configuration")
+}
+
+func (depr *deprecatedOptions) afterFlagParse(cfg *Config) {
+ cfg.BlobSignatureTTL = arvados.Duration(depr.signatureTTLSeconds) * arvados.Duration(time.Second)
+ cfg.EnableDelete = !depr.neverDelete
+}
"strings"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
// A RequestTester represents the parameters for an HTTP request to
// Create locators for testing.
// Turn on permission settings so we can generate signed locators.
- enforcePermissions = true
- PermissionSecret = []byte(knownKey)
- blobSignatureTTL = 300 * time.Second
+ theConfig.RequireSignatures = true
+ theConfig.blobSigningKey = []byte(knownKey)
+ theConfig.BlobSignatureTTL.Set("5m")
var (
unsignedLocator = "/" + TestHash
- validTimestamp = time.Now().Add(blobSignatureTTL)
+ validTimestamp = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
expiredTimestamp = time.Now().Add(-time.Hour)
signedLocator = "/" + SignLocator(TestHash, knownToken, validTimestamp)
expiredLocator = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
// -----------------
// Test unauthenticated request with permissions off.
- enforcePermissions = false
+ theConfig.RequireSignatures = false
// Unauthenticated request, unsigned locator
// => OK
// ----------------
// Permissions: on.
- enforcePermissions = true
+ theConfig.RequireSignatures = true
// Authenticated request, signed locator
// => OK
// ------------------
// With a server key.
- PermissionSecret = []byte(knownKey)
- blobSignatureTTL = 300 * time.Second
+ theConfig.blobSigningKey = []byte(knownKey)
+ theConfig.BlobSignatureTTL.Set("5m")
// When a permission key is available, the locator returned
// from an authenticated PUT request will be signed.
func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
defer teardown()
- dataManagerToken = "fake-data-manager-token"
+ theConfig.systemAuthToken = "fake-data-manager-token"
vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
vols[0].Readonly = true
KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
requestBody: TestBlock,
})
defer func(orig bool) {
- neverDelete = orig
- }(neverDelete)
- neverDelete = false
+ theConfig.EnableDelete = orig
+ }(theConfig.EnableDelete)
+ theConfig.EnableDelete = true
IssueRequest(
&RequestTester{
method: "DELETE",
uri: "/" + TestHash,
requestBody: TestBlock,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
})
type expect struct {
volnum int
// - authenticated /index/prefix request | superuser
//
// The only /index requests that should succeed are those issued by the
-// superuser. They should pass regardless of the value of enforcePermissions.
+// superuser. They should pass regardless of the value of RequireSignatures.
//
func TestIndexHandler(t *testing.T) {
defer teardown()
vols[0].Put(TestHash+".meta", []byte("metadata"))
vols[1].Put(TestHash2+".meta", []byte("metadata"))
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
unauthenticatedReq := &RequestTester{
method: "GET",
superuserReq := &RequestTester{
method: "GET",
uri: "/index",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
unauthPrefixReq := &RequestTester{
method: "GET",
superuserPrefixReq := &RequestTester{
method: "GET",
uri: "/index/" + TestHash[0:3],
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
superuserNoSuchPrefixReq := &RequestTester{
method: "GET",
uri: "/index/abcd",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
superuserInvalidPrefixReq := &RequestTester{
method: "GET",
uri: "/index/xyz",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
// -------------------------------------------------------------
// Only the superuser should be allowed to issue /index requests.
// ---------------------------
- // enforcePermissions enabled
+ // RequireSignatures enabled
// This setting should not affect tests passing.
- enforcePermissions = true
+ theConfig.RequireSignatures = true
// unauthenticated /index request
// => UnauthorizedError
response := IssueRequest(unauthenticatedReq)
ExpectStatusCode(t,
- "enforcePermissions on, unauthenticated request",
+ "RequireSignatures on, unauthenticated request",
UnauthorizedError.HTTPCode,
response)
response)
// ----------------------------
- // enforcePermissions disabled
+ // RequireSignatures disabled
// Valid Request should still pass.
- enforcePermissions = false
+ theConfig.RequireSignatures = false
// superuser /index request
// => OK
vols := KeepVM.AllWritable()
vols[0].Put(TestHash, TestBlock)
- // Explicitly set the blobSignatureTTL to 0 for these
+ // Explicitly set the BlobSignatureTTL to 0 for these
// tests, to ensure the MockVolume deletes the blocks
// even though they have just been created.
- blobSignatureTTL = time.Duration(0)
+ theConfig.BlobSignatureTTL = arvados.Duration(0)
var userToken = "NOT DATA MANAGER TOKEN"
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
- neverDelete = false
+ theConfig.EnableDelete = true
unauthReq := &RequestTester{
method: "DELETE",
superuserExistingBlockReq := &RequestTester{
method: "DELETE",
uri: "/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
superuserNonexistentBlockReq := &RequestTester{
method: "DELETE",
uri: "/" + TestHash2,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
// Unauthenticated request returns PermissionError.
http.StatusNotFound,
response)
- // Authenticated admin request for existing block while neverDelete is set.
- neverDelete = true
+ // Authenticated admin request for existing block while EnableDelete is false.
+ theConfig.EnableDelete = false
response = IssueRequest(superuserExistingBlockReq)
ExpectStatusCode(t,
"authenticated request, existing block, method disabled",
MethodDisabledError.HTTPCode,
response)
- neverDelete = false
+ theConfig.EnableDelete = true
// Authenticated admin request for existing block.
response = IssueRequest(superuserExistingBlockReq)
t.Error("superuserExistingBlockReq: block not deleted")
}
- // A DELETE request on a block newer than blobSignatureTTL
+ // A DELETE request on a block newer than BlobSignatureTTL
// should return success but leave the block on the volume.
vols[0].Put(TestHash, TestBlock)
- blobSignatureTTL = time.Hour
+ theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
response = IssueRequest(superuserExistingBlockReq)
ExpectStatusCode(t,
defer teardown()
var userToken = "USER TOKEN"
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
pullq = NewWorkQueue()
},
{
"Valid pull request from the data manager",
- RequestTester{"/pull", dataManagerToken, "PUT", goodJSON},
+ RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
http.StatusOK,
"Received 3 pull requests\n",
},
{
"Invalid pull request from the data manager",
- RequestTester{"/pull", dataManagerToken, "PUT", badJSON},
+ RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
http.StatusBadRequest,
"",
},
defer teardown()
var userToken = "USER TOKEN"
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
trashq = NewWorkQueue()
},
{
"Valid trash list from the data manager",
- RequestTester{"/trash", dataManagerToken, "PUT", goodJSON},
+ RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
http.StatusOK,
"Received 3 trash requests\n",
},
{
"Invalid trash list from the data manager",
- RequestTester{"/trash", dataManagerToken, "PUT", badJSON},
+ RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
http.StatusBadRequest,
"",
},
select {
case <-ok:
case <-time.After(time.Second):
- t.Fatal("PUT deadlocks with maxBuffers==1")
+ t.Fatal("PUT deadlocks with MaxBuffers==1")
}
}
ok := make(chan bool)
go func() {
- for i := 0; i < maxBuffers+1; i++ {
+ for i := 0; i < theConfig.MaxBuffers+1; i++ {
// Unauthenticated request, no server key
// => OK (unsigned response)
unsignedLocator := "/" + TestHash
func TestGetHandlerClientDisconnect(t *testing.T) {
defer func(was bool) {
- enforcePermissions = was
- }(enforcePermissions)
- enforcePermissions = false
+ theConfig.RequireSignatures = was
+ }(theConfig.RequireSignatures)
+ theConfig.RequireSignatures = false
defer func(orig *bufferPool) {
bufs = orig
// Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
// leak.
-func TestGetHandlerNoBufferleak(t *testing.T) {
+func TestGetHandlerNoBufferLeak(t *testing.T) {
defer teardown()
// Prepare two test Keep volumes. Our block is stored on the second volume.
ok := make(chan bool)
go func() {
- for i := 0; i < maxBuffers+1; i++ {
+ for i := 0; i < theConfig.MaxBuffers+1; i++ {
// Unauthenticated request, unsigned locator
// => OK
unsignedLocator := "/" + TestHash
vols := KeepVM.AllWritable()
vols[0].Put(TestHash, TestBlock)
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
// unauthenticatedReq => UnauthorizedError
unauthenticatedReq := &RequestTester{
datamanagerWithBadHashReq := &RequestTester{
method: "PUT",
uri: "/untrash/thisisnotalocator",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response = IssueRequest(datamanagerWithBadHashReq)
ExpectStatusCode(t,
datamanagerWrongMethodReq := &RequestTester{
method: "GET",
uri: "/untrash/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response = IssueRequest(datamanagerWrongMethodReq)
ExpectStatusCode(t,
datamanagerReq := &RequestTester{
method: "PUT",
uri: "/untrash/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response = IssueRequest(datamanagerReq)
ExpectStatusCode(t,
KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
defer KeepVM.Close()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
// datamanagerReq => StatusOK
datamanagerReq := &RequestTester{
method: "PUT",
uri: "/untrash/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response := IssueRequest(datamanagerReq)
ExpectStatusCode(t,
// GetBlockHandler is a HandleFunc to address Get block requests.
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
- if enforcePermissions {
+ if theConfig.RequireSignatures {
locator := req.URL.Path[1:] // strip leading slash
if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
// return it to the client.
returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
apiToken := GetAPIToken(req)
- if PermissionSecret != nil && apiToken != "" {
- expiry := time.Now().Add(blobSignatureTTL)
+ if theConfig.blobSigningKey != nil && apiToken != "" {
+ expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
returnHash = SignLocator(returnHash, apiToken, expiry)
}
resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
// IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
func IndexHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
return
}
- if neverDelete {
+ if !theConfig.EnableDelete {
http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
return
}
// PullHandler processes "PUT /pull" requests for the data manager.
func PullHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
// TrashHandler processes /trash requests.
func TrashHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
}
// Blocks may be deleted only when Keep has been configured with a
// data manager.
- if IsDataManagerToken(apiToken) {
+ if IsSystemAuth(apiToken) {
return true
}
// TODO(twp): look up apiToken with the API server
return false
}
-// IsDataManagerToken returns true if apiToken represents the data
-// manager's token.
-func IsDataManagerToken(apiToken string) bool {
- return dataManagerToken != "" && apiToken == dataManagerToken
+// IsSystemAuth returns true if the given token is allowed to perform
+// system level actions like deleting data.
+func IsSystemAuth(token string) bool {
+ return token != "" && token == theConfig.systemAuthToken
}
package main
import (
- "bytes"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
- "strings"
"syscall"
"time"
-)
-// ======================
-// Configuration settings
-//
-// TODO(twp): make all of these configurable via command line flags
-// and/or configuration file settings.
-
-// Default TCP address on which to listen for requests.
-// Initialized by the --listen flag.
-const DefaultAddr = ":25107"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/coreos/go-systemd/daemon"
+ "github.com/ghodss/yaml"
+)
// A Keep "block" is 64MB.
const BlockSize = 64 * 1024 * 1024
// ProcMounts /proc/mounts
var ProcMounts = "/proc/mounts"
-// enforcePermissions controls whether permission signatures
-// should be enforced (affecting GET and DELETE requests).
-// Initialized by the -enforce-permissions flag.
-var enforcePermissions bool
-
-// blobSignatureTTL is the time duration for which new permission
-// signatures (returned by PUT requests) will be valid.
-// Initialized by the -permission-ttl flag.
-var blobSignatureTTL time.Duration
-
-// dataManagerToken represents the API token used by the
-// Data Manager, and is required on certain privileged operations.
-// Initialized by the -data-manager-token-file flag.
-var dataManagerToken string
-
-// neverDelete can be used to prevent the DELETE handler from
-// actually deleting anything.
-var neverDelete = true
-
-// trashLifetime is the time duration after a block is trashed
-// during which it can be recovered using an /untrash request
-// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
-var trashLifetime time.Duration
-
-// trashCheckInterval is the time duration at which the emptyTrash goroutine
-// will check and delete expired trashed blocks. Default is one day.
-// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
-var trashCheckInterval time.Duration
-
-var maxBuffers = 128
var bufs *bufferPool
// KeepError types.
var pullq *WorkQueue
var trashq *WorkQueue
-type volumeSet []Volume
-
-var (
- flagSerializeIO bool
- flagReadonly bool
- volumes volumeSet
-)
-
-func (vs *volumeSet) String() string {
- if vs == nil {
- return "[]"
- }
- return fmt.Sprintf("%+v", (*vs)[:])
-}
-
-// TODO(twp): continue moving as much code as possible out of main
-// so it can be effectively tested. Esp. handling and postprocessing
-// of command line flags (identifying Keep volumes and initializing
-// permission arguments).
-
func main() {
- log.Println("keepstore starting, pid", os.Getpid())
- defer log.Println("keepstore exiting, pid", os.Getpid())
+ deprecated.beforeFlagParse(theConfig)
- var (
- dataManagerTokenFile string
- listen string
- blobSigningKeyFile string
- permissionTTLSec int
- pidfile string
- maxRequests int
- )
- flag.StringVar(
- &dataManagerTokenFile,
- "data-manager-token-file",
- "",
- "File with the API token used by the Data Manager. All DELETE "+
- "requests or GET /index requests must carry this token.")
- flag.BoolVar(
- &enforcePermissions,
- "enforce-permissions",
- false,
- "Enforce permission signatures on requests.")
- flag.StringVar(
- &listen,
- "listen",
- DefaultAddr,
- "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
- flag.IntVar(
- &maxRequests,
- "max-requests",
- 0,
- "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
- flag.BoolVar(
- &neverDelete,
- "never-delete",
- true,
- "If true, nothing will be deleted. "+
- "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
- "You should leave this option alone unless you can afford to lose data.")
- flag.StringVar(
- &blobSigningKeyFile,
- "permission-key-file",
- "",
- "Synonym for -blob-signing-key-file.")
- flag.StringVar(
- &blobSigningKeyFile,
- "blob-signing-key-file",
- "",
- "File containing the secret key for generating and verifying "+
- "blob permission signatures.")
- flag.IntVar(
- &permissionTTLSec,
- "permission-ttl",
- 0,
- "Synonym for -blob-signature-ttl.")
- flag.IntVar(
- &permissionTTLSec,
- "blob-signature-ttl",
- 2*7*24*3600,
- "Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. "+
- "See services/api/config/application.default.yml.")
- flag.BoolVar(
- &flagSerializeIO,
- "serialize",
- false,
- "Serialize read and write operations on the following volumes.")
- flag.BoolVar(
- &flagReadonly,
- "readonly",
- false,
- "Do not write, delete, or touch anything on the following volumes.")
- flag.StringVar(
- &pidfile,
- "pid",
- "",
- "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
- flag.IntVar(
- &maxBuffers,
- "max-buffers",
- maxBuffers,
- fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
- flag.DurationVar(
- &trashLifetime,
- "trash-lifetime",
- 0,
- "Time duration after a block is trashed during which it can be recovered using an /untrash request")
- flag.DurationVar(
- &trashCheckInterval,
- "trash-check-interval",
- 24*time.Hour,
- "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
+ dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+ defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
+ var configPath string
+ flag.StringVar(
+ &configPath,
+ "config",
+ defaultConfigPath,
+ "YAML or JSON configuration file `path`")
+ flag.Usage = usage
flag.Parse()
- if maxBuffers < 0 {
- log.Fatal("-max-buffers must be greater than zero.")
+ deprecated.afterFlagParse(theConfig)
+
+ err := config.LoadFile(theConfig, configPath)
+ if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
+ log.Fatal(err)
+ }
+
+ if *dumpConfig {
+ y, err := yaml.Marshal(theConfig)
+ if err != nil {
+ log.Fatal(err)
+ }
+ os.Stdout.Write(y)
+ os.Exit(0)
}
- bufs = newBufferPool(maxBuffers, BlockSize)
- if pidfile != "" {
+ err = theConfig.Start()
+
+ if pidfile := theConfig.PIDFile; pidfile != "" {
f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
if err != nil {
log.Fatalf("open pidfile (%s): %s", pidfile, err)
}
+ defer f.Close()
err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
log.Fatalf("flock pidfile (%s): %s", pidfile, err)
}
+ defer os.Remove(pidfile)
err = f.Truncate(0)
if err != nil {
log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
if err != nil {
log.Fatalf("sync pidfile (%s): %s", pidfile, err)
}
- defer f.Close()
- defer os.Remove(pidfile)
- }
-
- if len(volumes) == 0 {
- if (&unixVolumeAdder{&volumes}).Discover() == 0 {
- log.Fatal("No volumes found.")
- }
- }
-
- for _, v := range volumes {
- log.Printf("Using volume %v (writable=%v)", v, v.Writable())
}
- // Initialize data manager token and permission key.
- // If these tokens are specified but cannot be read,
- // raise a fatal error.
- if dataManagerTokenFile != "" {
- if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
- dataManagerToken = strings.TrimSpace(string(buf))
- } else {
- log.Fatalf("reading data manager token: %s\n", err)
- }
- }
-
- if neverDelete != true {
- log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
- "been extensively tested. You should leave this option alone unless you can afford to lose data.")
- }
-
- if blobSigningKeyFile != "" {
- if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
- PermissionSecret = bytes.TrimSpace(buf)
- } else {
- log.Fatalf("reading permission key: %s\n", err)
- }
- }
-
- blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
-
- if PermissionSecret == nil {
- if enforcePermissions {
- log.Fatal("-enforce-permissions requires a permission key")
- } else {
- log.Println("Running without a PermissionSecret. Block locators " +
- "returned by this server will not be signed, and will be rejected " +
- "by a server that enforces permissions.")
- log.Println("To fix this, use the -blob-signing-key-file flag " +
- "to specify the file containing the permission key.")
- }
- }
-
- if maxRequests <= 0 {
- maxRequests = maxBuffers * 2
- log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
- }
+ log.Println("keepstore starting, pid", os.Getpid())
+ defer log.Println("keepstore exiting, pid", os.Getpid())
// Start a round-robin VolumeManager with the volumes we have found.
- KeepVM = MakeRRVolumeManager(volumes)
+ KeepVM = MakeRRVolumeManager(theConfig.Volumes)
- // Middleware stack: logger, maxRequests limiter, method handlers
+ // Middleware stack: logger, MaxRequests limiter, method handlers
http.Handle("/", &LoggingRESTRouter{
- httpserver.NewRequestLimiter(maxRequests,
+ httpserver.NewRequestLimiter(theConfig.MaxRequests,
MakeRESTRouter()),
})
// Set up a TCP listener.
- listener, err := net.Listen("tcp", listen)
+ listener, err := net.Listen("tcp", theConfig.Listen)
if err != nil {
log.Fatal(err)
}
// Start emptyTrash goroutine
doneEmptyingTrash := make(chan bool)
- go emptyTrash(doneEmptyingTrash, trashCheckInterval)
+ go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, syscall.SIGINT)
- log.Println("listening at", listen)
- srv := &http.Server{Addr: listen}
+ if _, err := daemon.SdNotify("READY=1"); err != nil {
+ log.Printf("Error notifying init daemon: %v", err)
+ }
+ log.Println("listening at", listener.Addr())
+ srv := &http.Server{}
srv.Serve(listener)
}
-// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
-func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
- ticker := time.NewTicker(trashCheckInterval)
+// Periodically (once per interval) invoke EmptyTrash on all volumes.
+func emptyTrash(done <-chan bool, interval time.Duration) {
+ ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
- for _, v := range volumes {
+ for _, v := range theConfig.Volumes {
if v.Writable() {
v.EmptyTrash()
}
}
- case <-doneEmptyingTrash:
+ case <-done:
ticker.Stop()
return
}
--- /dev/null
+[Unit]
+Description=Arvados Keep Storage Daemon
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/keepstore/keepstore.yml
+
+[Service]
+Type=notify
+ExecStart=/usr/bin/keepstore
+Restart=always
+
+[Install]
+WantedBy=multi-user.target
f.Close()
ProcMounts = f.Name()
- resultVols := volumeSet{}
- added := (&unixVolumeAdder{&resultVols}).Discover()
+ cfg := &Config{}
+ added := (&unixVolumeAdder{cfg}).Discover()
- if added != len(resultVols) {
+ if added != len(cfg.Volumes) {
t.Errorf("Discover returned %d, but added %d volumes",
- added, len(resultVols))
+ added, len(cfg.Volumes))
}
if added != len(tempVols) {
t.Errorf("Discover returned %d but we set up %d volumes",
added, len(tempVols))
}
for i, tmpdir := range tempVols {
- if tmpdir != resultVols[i].(*UnixVolume).root {
+ if tmpdir != cfg.Volumes[i].(*UnixVolume).Root {
t.Errorf("Discover returned %s, expected %s\n",
- resultVols[i].(*UnixVolume).root, tmpdir)
+ cfg.Volumes[i].(*UnixVolume).Root, tmpdir)
}
- if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
+ if expectReadonly := i%2 == 1; expectReadonly != cfg.Volumes[i].(*UnixVolume).ReadOnly {
t.Errorf("Discover added %s with readonly=%v, should be %v",
tmpdir, !expectReadonly, expectReadonly)
}
f.Close()
ProcMounts = f.Name()
- resultVols := volumeSet{}
- added := (&unixVolumeAdder{&resultVols}).Discover()
- if added != 0 || len(resultVols) != 0 {
- t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
+ cfg := &Config{}
+ added := (&unixVolumeAdder{cfg}).Discover()
+ if added != 0 || len(cfg.Volumes) != 0 {
+ t.Fatalf("got %d, %v; expected 0, []", added, cfg.Volumes)
}
}
// teardown cleans up after each test.
func teardown() {
- dataManagerToken = ""
- enforcePermissions = false
- PermissionSecret = nil
+ theConfig.systemAuthToken = ""
+ theConfig.RequireSignatures = false
+ theConfig.blobSigningKey = nil
KeepVM = nil
}
"time"
)
-// The PermissionSecret is the secret key used to generate SHA1
-// digests for permission hints. apiserver and Keep must use the same
-// key.
-var PermissionSecret []byte
-
// SignLocator takes a blobLocator, an apiToken and an expiry time, and
// returns a signed locator string.
func SignLocator(blobLocator, apiToken string, expiry time.Time) string {
- return keepclient.SignLocator(blobLocator, apiToken, expiry, blobSignatureTTL, PermissionSecret)
+ return keepclient.SignLocator(blobLocator, apiToken, expiry, theConfig.BlobSignatureTTL.Duration(), theConfig.blobSigningKey)
}
// VerifySignature returns nil if the signature on the signedLocator
// something the client could have figured out independently) or
// PermissionError.
func VerifySignature(signedLocator, apiToken string) error {
- err := keepclient.VerifySignature(signedLocator, apiToken, blobSignatureTTL, PermissionSecret)
+ err := keepclient.VerifySignature(signedLocator, apiToken, theConfig.BlobSignatureTTL.Duration(), theConfig.blobSigningKey)
if err == keepclient.ErrSignatureExpired {
return ExpiredError
} else if err != nil {
"strconv"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
const (
"gokee3eamvjy8qq1fvy238838enjmy5wzy2md7yvsitp5vztft6j4q866efym7e6" +
"vu5wm9fpnwjyxfldw3vbo01mgjs75rgo7qioh8z8ij7jpyp8508okhgbbex3ceei" +
"786u5rw2a9gx743dj3fgq2irk"
- knownSignatureTTL = 1209600 * time.Second
+ knownSignatureTTL = arvados.Duration(24 * 14 * time.Hour)
knownSignature = "89118b78732c33104a4d6231e8b5a5fa1e4301e3"
knownTimestamp = "7fffffff"
knownSigHint = "+A" + knownSignature + "@" + knownTimestamp
func TestSignLocator(t *testing.T) {
defer func(b []byte) {
- PermissionSecret = b
- }(PermissionSecret)
+ theConfig.blobSigningKey = b
+ }(theConfig.blobSigningKey)
tsInt, err := strconv.ParseInt(knownTimestamp, 16, 0)
if err != nil {
}
t0 := time.Unix(tsInt, 0)
- blobSignatureTTL = knownSignatureTTL
+ theConfig.BlobSignatureTTL = knownSignatureTTL
- PermissionSecret = []byte(knownKey)
+ theConfig.blobSigningKey = []byte(knownKey)
if x := SignLocator(knownLocator, knownToken, t0); x != knownSignedLocator {
t.Fatalf("Got %+q, expected %+q", x, knownSignedLocator)
}
- PermissionSecret = []byte("arbitrarykey")
+ theConfig.blobSigningKey = []byte("arbitrarykey")
if x := SignLocator(knownLocator, knownToken, t0); x == knownSignedLocator {
- t.Fatalf("Got same signature %+q, even though PermissionSecret changed", x)
+ t.Fatalf("Got same signature %+q, even though blobSigningKey changed", x)
}
}
func TestVerifyLocator(t *testing.T) {
defer func(b []byte) {
- PermissionSecret = b
- }(PermissionSecret)
+ theConfig.blobSigningKey = b
+ }(theConfig.blobSigningKey)
- blobSignatureTTL = knownSignatureTTL
+ theConfig.BlobSignatureTTL = knownSignatureTTL
- PermissionSecret = []byte(knownKey)
+ theConfig.blobSigningKey = []byte(knownKey)
if err := VerifySignature(knownSignedLocator, knownToken); err != nil {
t.Fatal(err)
}
- PermissionSecret = []byte("arbitrarykey")
+ theConfig.blobSigningKey = []byte("arbitrarykey")
if err := VerifySignature(knownSignedLocator, knownToken); err == nil {
- t.Fatal("Verified signature even with wrong PermissionSecret")
+ t.Fatal("Verified signature even with wrong blobSigningKey")
}
}
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello",
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hola",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "unused",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "unused",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hello hello",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello again",
pullq.ReplaceQueue(makeTestWorkList(firstInput))
testPullLists["Added_before_actual_test_item"] = string(1)
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_items_latest_replacing_old",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hola de nuevo",
}
// In this case, the item will not be placed on pullq
-func (s *PullWorkerTestSuite) TestPullWorker_invalid_dataManagerToken(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", "invalid_dataManagerToken", "PUT", firstPullList},
+ req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
responseCode: http.StatusUnauthorized,
responseBody: "Unauthorized\n",
readContent: "hello",
"os"
"regexp"
"strings"
+ "sync"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
)
)
type s3VolumeAdder struct {
- *volumeSet
+ *Config
+}
+
+// String implements flag.Value
+func (s *s3VolumeAdder) String() string {
+ return "-"
}
func (s *s3VolumeAdder) Set(bucketName string) error {
if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
}
- region, ok := aws.Regions[s3RegionName]
- if s3Endpoint == "" {
- if !ok {
- return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
- }
- } else {
- if ok {
- return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
- "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
- }
- region = aws.Region{
- Name: s3RegionName,
- S3Endpoint: s3Endpoint,
- }
- }
- var err error
- var auth aws.Auth
- auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
- if err != nil {
- return err
- }
- auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
- if err != nil {
- return err
- }
- if flagSerializeIO {
+ if deprecated.flagSerializeIO {
log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
}
- v := NewS3Volume(auth, region, bucketName, s3RaceWindow, flagReadonly, s3Replication)
- if err := v.Check(); err != nil {
- return err
- }
- *s.volumeSet = append(*s.volumeSet, v)
+ s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
+ Bucket: bucketName,
+ AccessKeyFile: s3AccessKeyFile,
+ SecretKeyFile: s3SecretKeyFile,
+ Endpoint: s3Endpoint,
+ Region: s3RegionName,
+ RaceWindow: arvados.Duration(s3RaceWindow),
+ S3Replication: s3Replication,
+ UnsafeDelete: s3UnsafeDelete,
+ ReadOnly: deprecated.flagReadonly,
+ IndexPageSize: 1000,
+ })
return nil
}
}
func init() {
- flag.Var(&s3VolumeAdder{&volumes},
+ VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
+
+ flag.Var(&s3VolumeAdder{theConfig},
"s3-bucket-volume",
"Use the given bucket as a storage volume. Can be given multiple times.")
flag.StringVar(
&s3AccessKeyFile,
"s3-access-key-file",
"",
- "File containing the access key used for subsequent -s3-bucket-volume arguments.")
+ "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
flag.StringVar(
&s3SecretKeyFile,
"s3-secret-key-file",
"",
- "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
+ "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
flag.DurationVar(
&s3RaceWindow,
"s3-race-window",
// S3Volume implements Volume using an S3 bucket.
type S3Volume struct {
- *s3.Bucket
- raceWindow time.Duration
- readonly bool
- replication int
- indexPageSize int
-}
-
-// NewS3Volume returns a new S3Volume using the given auth, region,
-// and bucket name. The replication argument specifies the replication
-// level to report when writing data.
-func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow time.Duration, readonly bool, replication int) *S3Volume {
- return &S3Volume{
- Bucket: &s3.Bucket{
- S3: s3.New(auth, region),
- Name: bucket,
+ AccessKeyFile string
+ SecretKeyFile string
+ Endpoint string
+ Region string
+ Bucket string
+ LocationConstraint bool
+ IndexPageSize int
+ S3Replication int
+ RaceWindow arvados.Duration
+ ReadOnly bool
+ UnsafeDelete bool
+
+ bucket *s3.Bucket
+
+ startOnce sync.Once
+}
+
+// Examples implements VolumeWithExamples.
+func (*S3Volume) Examples() []Volume {
+ return []Volume{
+ &S3Volume{
+ AccessKeyFile: "/etc/aws_s3_access_key.txt",
+ SecretKeyFile: "/etc/aws_s3_secret_key.txt",
+ Endpoint: "",
+ Region: "us-east-1",
+ Bucket: "example-bucket-name",
+ IndexPageSize: 1000,
+ S3Replication: 2,
+ RaceWindow: arvados.Duration(24 * time.Hour),
+ },
+ &S3Volume{
+ AccessKeyFile: "/etc/gce_s3_access_key.txt",
+ SecretKeyFile: "/etc/gce_s3_secret_key.txt",
+ Endpoint: "https://storage.googleapis.com",
+ Region: "",
+ Bucket: "example-bucket-name",
+ IndexPageSize: 1000,
+ S3Replication: 2,
+ RaceWindow: arvados.Duration(24 * time.Hour),
},
- raceWindow: raceWindow,
- readonly: readonly,
- replication: replication,
- indexPageSize: 1000,
}
}
-// Check returns an error if the volume is inaccessible (e.g., config
-// error).
-func (v *S3Volume) Check() error {
+// Type implements Volume.
+func (*S3Volume) Type() string {
+ return "S3"
+}
+
+// Start populates private fields and verifies the configuration is
+// valid.
+func (v *S3Volume) Start() error {
+ region, ok := aws.Regions[v.Region]
+ if v.Endpoint == "" {
+ if !ok {
+ return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
+ }
+ } else if ok {
+ return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
+ "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
+ } else {
+ region = aws.Region{
+ Name: v.Region,
+ S3Endpoint: v.Endpoint,
+ S3LocationConstraint: v.LocationConstraint,
+ }
+ }
+
+ var err error
+ var auth aws.Auth
+ auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
+ if err != nil {
+ return err
+ }
+ auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
+ if err != nil {
+ return err
+ }
+ v.bucket = &s3.Bucket{
+ S3: s3.New(auth, region),
+ Name: v.Bucket,
+ }
return nil
}
// disappeared in a Trash race, getReader calls fixRace to recover the
// data, and tries again.
func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
- rdr, err = v.Bucket.GetReader(loc)
+ rdr, err = v.bucket.GetReader(loc)
err = v.translateError(err)
if err == nil || !os.IsNotExist(err) {
return
}
- _, err = v.Bucket.Head("recent/"+loc, nil)
+ _, err = v.bucket.Head("recent/"+loc, nil)
err = v.translateError(err)
if err != nil {
// If we can't read recent/X, there's no point in
err = os.ErrNotExist
return
}
- rdr, err = v.Bucket.GetReader(loc)
+ rdr, err = v.bucket.GetReader(loc)
if err != nil {
log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
err = v.translateError(err)
// Put writes a block.
func (v *S3Volume) Put(loc string, block []byte) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
var opts s3.Options
}
opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
}
- err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
+ err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
if err != nil {
return v.translateError(err)
}
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
// Touch sets the timestamp for the given locator to the current time.
func (v *S3Volume) Touch(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
- _, err := v.Bucket.Head(loc, nil)
+ _, err := v.bucket.Head(loc, nil)
err = v.translateError(err)
if os.IsNotExist(err) && v.fixRace(loc) {
// The data object got trashed in a race, but fixRace
} else if err != nil {
return err
}
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
// Mtime returns the stored timestamp for the given locator.
func (v *S3Volume) Mtime(loc string) (time.Time, error) {
- _, err := v.Bucket.Head(loc, nil)
+ _, err := v.bucket.Head(loc, nil)
if err != nil {
return zeroTime, v.translateError(err)
}
- resp, err := v.Bucket.Head("recent/"+loc, nil)
+ resp, err := v.bucket.Head("recent/"+loc, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// The data object X exists, but recent/X is missing.
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
log.Printf("error: creating %q: %s", "recent/"+loc, err)
return zeroTime, v.translateError(err)
}
log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
- resp, err = v.Bucket.Head("recent/"+loc, nil)
+ resp, err = v.bucket.Head("recent/"+loc, nil)
if err != nil {
log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
return zeroTime, v.translateError(err)
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
- Bucket: v.Bucket,
+ Bucket: v.bucket,
Prefix: prefix,
- PageSize: v.indexPageSize,
+ PageSize: v.IndexPageSize,
}
recentL := s3Lister{
- Bucket: v.Bucket,
+ Bucket: v.bucket,
Prefix: "recent/" + prefix,
- PageSize: v.indexPageSize,
+ PageSize: v.IndexPageSize,
}
for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
if data.Key >= "g" {
// Trash a Keep block.
func (v *S3Volume) Trash(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
if t, err := v.Mtime(loc); err != nil {
return err
- } else if time.Since(t) < blobSignatureTTL {
+ } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
return nil
}
- if trashLifetime == 0 {
+ if theConfig.TrashLifetime == 0 {
if !s3UnsafeDelete {
return ErrS3TrashDisabled
}
- return v.Bucket.Del(loc)
+ return v.bucket.Del(loc)
}
err := v.checkRaceWindow(loc)
if err != nil {
if err != nil {
return err
}
- return v.translateError(v.Bucket.Del(loc))
+ return v.translateError(v.bucket.Del(loc))
}
// checkRaceWindow returns a non-nil error if trash/loc is, or might
// be, in the race window (i.e., it's not safe to trash loc).
func (v *S3Volume) checkRaceWindow(loc string) error {
- resp, err := v.Bucket.Head("trash/"+loc, nil)
+ resp, err := v.bucket.Head("trash/"+loc, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// OK, trash/X doesn't exist so we're not in the race
// Can't parse timestamp
return err
}
- safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow))
+ safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
if safeWindow <= 0 {
// We can't count on "touch trash/X" to prolong
// trash/X's lifetime. The new timestamp might not
// (PutCopy returns 200 OK if the request was received, even if the
// copy failed).
func (v *S3Volume) safeCopy(dst, src string) error {
- resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
+ resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
ContentType: "application/octet-stream",
MetadataDirective: "REPLACE",
- }, v.Bucket.Name+"/"+src)
+ }, v.bucket.Name+"/"+src)
err = v.translateError(err)
if err != nil {
return err
if err != nil {
return err
}
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
// String implements fmt.Stringer.
func (v *S3Volume) String() string {
- return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
+ return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
}
// Writable returns false if all future Put, Mtime, and Delete calls
// are expected to fail.
func (v *S3Volume) Writable() bool {
- return !v.readonly
+ return !v.ReadOnly
}
// Replication returns the storage redundancy of the underlying
// device. Configured via command line flag.
func (v *S3Volume) Replication() int {
- return v.replication
+ return v.S3Replication
}
var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
// there was a race between Put and Trash, fixRace recovers from the
// race by Untrashing the block.
func (v *S3Volume) fixRace(loc string) bool {
- trash, err := v.Bucket.Head("trash/"+loc, nil)
+ trash, err := v.bucket.Head("trash/"+loc, nil)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
return false
}
- recent, err := v.Bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil {
log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
return false
}
ageWhenTrashed := trashTime.Sub(recentTime)
- if ageWhenTrashed >= blobSignatureTTL {
+ if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
// No evidence of a race: block hasn't been written
// since it became eligible for Trash. No fix needed.
return false
}
- log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, blobSignatureTTL)
+ log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
err = v.safeCopy(loc, "trash/"+loc)
if err != nil {
return err
}
-// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
// and deletes them from the volume.
func (v *S3Volume) EmptyTrash() {
var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
// Use a merge sort to find matching sets of trash/X and recent/X.
trashL := s3Lister{
- Bucket: v.Bucket,
+ Bucket: v.bucket,
Prefix: "trash/",
- PageSize: v.indexPageSize,
+ PageSize: v.IndexPageSize,
}
// Define "ready to delete" as "...when EmptyTrash started".
startT := time.Now()
log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
continue
}
- recent, err := v.Bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil && os.IsNotExist(v.translateError(err)) {
log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
err = v.Untrash(loc)
log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
continue
}
- if trashT.Sub(recentT) < blobSignatureTTL {
- if age := startT.Sub(recentT); age >= blobSignatureTTL-v.raceWindow {
+ if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
+ if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
// recent/loc is too old to protect
// loc from being Trashed again during
// the raceWindow that starts if we
// delete trash/X now.
//
- // Note this means (trashCheckInterval
- // < blobSignatureTTL - raceWindow) is
+ // Note this means (TrashCheckInterval
+ // < BlobSignatureTTL - raceWindow) is
// necessary to avoid starvation.
log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
v.fixRace(loc)
v.Touch(loc)
continue
- } else if _, err := v.Bucket.Head(loc, nil); os.IsNotExist(err) {
+ } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
v.fixRace(loc)
continue
continue
}
}
- if startT.Sub(trashT) < trashLifetime {
+ if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
continue
}
- err = v.Bucket.Del(trash.Key)
+ err = v.bucket.Del(trash.Key)
if err != nil {
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
continue
bytesDeleted += trash.Size
blocksDeleted++
- _, err = v.Bucket.Head(loc, nil)
+ _, err = v.bucket.Head(loc, nil)
if os.IsNotExist(err) {
- err = v.Bucket.Del("recent/" + loc)
+ err = v.bucket.Del("recent/" + loc)
if err != nil {
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
}
"bytes"
"crypto/md5"
"fmt"
+ "io/ioutil"
"log"
"os"
"time"
- "github.com/AdRoll/goamz/aws"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/s3"
"github.com/AdRoll/goamz/s3/s3test"
check "gopkg.in/check.v1"
)
-type TestableS3Volume struct {
- *S3Volume
- server *s3test.Server
- c *check.C
- serverClock *fakeClock
-}
-
const (
TestBucketName = "testbucket"
)
s3UnsafeDelete = true
}
-func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
- clock := &fakeClock{}
- srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
- c.Assert(err, check.IsNil)
- auth := aws.Auth{}
- region := aws.Region{
- Name: "test-region-1",
- S3Endpoint: srv.URL(),
- S3LocationConstraint: true,
- }
- bucket := &s3.Bucket{
- S3: s3.New(auth, region),
- Name: TestBucketName,
- }
- err = bucket.PutBucket(s3.ACL("private"))
- c.Assert(err, check.IsNil)
-
- return &TestableS3Volume{
- S3Volume: NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
- server: srv,
- serverClock: clock,
- }
-}
-
var _ = check.Suite(&StubbedS3Suite{})
type StubbedS3Suite struct {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
// Use a negative raceWindow so s3test's 1-second
// timestamp precision doesn't confuse fixRace.
- return NewTestableS3Volume(c, -2*time.Second, false, 2)
+ return s.newTestableVolume(c, -2*time.Second, false, 2)
})
}
func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
- return NewTestableS3Volume(c, -2*time.Second, true, 2)
+ return s.newTestableVolume(c, -2*time.Second, true, 2)
})
}
func (s *StubbedS3Suite) TestIndex(c *check.C) {
- v := NewTestableS3Volume(c, 0, false, 2)
- v.indexPageSize = 3
+ v := s.newTestableVolume(c, 0, false, 2)
+ v.IndexPageSize = 3
for i := 0; i < 256; i++ {
v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
}
}
func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
- defer func(tl, bs time.Duration) {
- trashLifetime = tl
- blobSignatureTTL = bs
- }(trashLifetime, blobSignatureTTL)
- trashLifetime = time.Hour
- blobSignatureTTL = time.Hour
+ defer func(tl, bs arvados.Duration) {
+ theConfig.TrashLifetime = tl
+ theConfig.BlobSignatureTTL = bs
+ }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
+ theConfig.TrashLifetime.Set("1h")
+ theConfig.BlobSignatureTTL.Set("1h")
- v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
+ v := s.newTestableVolume(c, 5*time.Minute, false, 2)
var none time.Time
putS3Obj := func(t time.Time, key string, data []byte) {
return
}
v.serverClock.now = &t
- v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+ v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
}
t0 := time.Now()
false, false, false, true, false, false,
},
{
- "Erroneously trashed during a race, detected before trashLifetime",
+ "Erroneously trashed during a race, detected before TrashLifetime",
none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
true, false, true, true, true, false,
},
{
- "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
+ "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
true, false, true, true, true, false,
},
// freshAfterEmpty
loc, blk = setupScenario()
v.EmptyTrash()
- _, err = v.Bucket.Head("trash/"+loc, nil)
+ _, err = v.bucket.Head("trash/"+loc, nil)
c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
if scenario.freshAfterEmpty {
t, err := v.Mtime(loc)
}
}
+type TestableS3Volume struct {
+ *S3Volume
+ server *s3test.Server
+ c *check.C
+ serverClock *fakeClock
+}
+
+func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
+ clock := &fakeClock{}
+ srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
+ c.Assert(err, check.IsNil)
+
+ tmp, err := ioutil.TempFile("", "keepstore")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(tmp.Name())
+ _, err = tmp.Write([]byte("xxx\n"))
+ c.Assert(err, check.IsNil)
+ c.Assert(tmp.Close(), check.IsNil)
+
+ v := &TestableS3Volume{
+ S3Volume: &S3Volume{
+ Bucket: TestBucketName,
+ AccessKeyFile: tmp.Name(),
+ SecretKeyFile: tmp.Name(),
+ Endpoint: srv.URL(),
+ Region: "test-region-1",
+ LocationConstraint: true,
+ RaceWindow: arvados.Duration(raceWindow),
+ S3Replication: replication,
+ UnsafeDelete: s3UnsafeDelete,
+ ReadOnly: readonly,
+ IndexPageSize: 1000,
+ },
+ server: srv,
+ serverClock: clock,
+ }
+ c.Assert(v.Start(), check.IsNil)
+ err = v.bucket.PutBucket(s3.ACL("private"))
+ c.Assert(err, check.IsNil)
+ return v
+}
+
// PutRaw skips the ContentMD5 test
func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
- err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+ err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
log.Printf("PutRaw: %+v", err)
}
// while we do this.
func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
v.serverClock.now = &lastPut
- err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
panic(err)
}
"errors"
"log"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
// TrashItem deletes the indicated block from every writable volume.
func TrashItem(trashRequest TrashRequest) {
reqMtime := time.Unix(0, trashRequest.BlockMtime)
- if time.Since(reqMtime) < blobSignatureTTL {
+ if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
- time.Since(reqMtime),
+ arvados.Duration(time.Since(reqMtime)),
trashRequest.Locator,
trashRequest.BlockMtime,
reqMtime,
- blobSignatureTTL)
+ theConfig.BlobSignatureTTL)
return
}
continue
}
- if neverDelete {
- err = errors.New("did not delete block because neverDelete is true")
+ if !theConfig.EnableDelete {
+ err = errors.New("did not delete block because EnableDelete is false")
} else {
err = volume.Trash(trashRequest.Locator)
}
Expect no errors.
*/
func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: "5d41402abc4b2a76b9719d911017c592",
Block1: []byte("hello"),
Expect the second locator in volume 2 to be unaffected.
*/
func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
Expect the first locator in volume 1 to be unaffected.
*/
func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
Expect locator to be deleted from both volumes.
*/
func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
Delete the second and expect the first to be still around.
*/
func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
Expect the other unaffected.
*/
func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
will not be deleted because its Mtime is within the trash life time.
*/
func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
performTrashWorkerTest(testData, t)
}
-/* Delete a block with matching mtime for locator in both volumes, but neverDelete is true,
+/* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
so block won't be deleted.
*/
-func TestTrashWorkerIntegration_NeverDelete(t *testing.T) {
- neverDelete = true
+func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) {
+ theConfig.EnableDelete = false
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
}
}
- oldBlockTime := time.Now().Add(-blobSignatureTTL - time.Minute)
+ oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute)
// Create TrashRequest for the test
trashRequest := TrashRequest{
--- /dev/null
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+ "sort"
+ "strings"
+
+ "github.com/ghodss/yaml"
+)
+
+func usage() {
+ c := DefaultConfig()
+ knownTypes := []string{}
+ for _, vt := range VolumeTypes {
+ c.Volumes = append(c.Volumes, vt().Examples()...)
+ knownTypes = append(knownTypes, vt().Type())
+ }
+ exampleConfigFile, err := yaml.Marshal(c)
+ if err != nil {
+ panic(err)
+ }
+ sort.Strings(knownTypes)
+ knownTypeList := strings.Join(knownTypes, ", ")
+ fmt.Fprintf(os.Stderr, `
+
+keepstore provides a content-addressed data store backed by a local filesystem or networked storage.
+
+Usage: keepstore -config path/to/keepstore.yml
+ keepstore [OPTIONS] -dump-config
+
+NOTE: All options (other than -config) are deprecated in favor of YAML
+ configuration. Use -dump-config to translate existing
+ configurations to YAML format.
+
+Options:
+`)
+ flag.PrintDefaults()
+ fmt.Fprintf(os.Stderr, `
+Example config file:
+
+%s
+
+Listen:
+
+ Local port to listen on. Can be "address:port" or ":port", where
+ "address" is a host IP address or name and "port" is a port number
+ or name.
+
+PIDFile:
+
+ Path to write PID file during startup. This file is kept open and
+ locked with LOCK_EX until keepstore exits, so "fuser -k pidfile" is
+ one way to shut down. Exit immediately if there is an error
+ opening, locking, or writing the PID file.
+
+MaxBuffers:
+
+ Maximum RAM to use for data buffers, given in multiples of block
+ size (64 MiB). When this limit is reached, HTTP requests requiring
+ buffers (like GET and PUT) will wait for buffer space to be
+ released.
+
+MaxRequests:
+
+ Maximum concurrent requests. When this limit is reached, new
+ requests will receive 503 responses. Note: this limit does not
+ include idle connections from clients using HTTP keepalive, so it
+ does not strictly limit the number of concurrent connections. If
+ omitted or zero, the default is 2 * MaxBuffers.
+
+BlobSigningKeyFile:
+
+ Local file containing the secret blob signing key (used to
+ generate and verify blob signatures). This key should be
+ identical to the API server's blob_signing_key configuration
+ entry.
+
+RequireSignatures:
+
+ Honor read requests only if a valid signature is provided. This
+ should be true, except for development use and when migrating from
+ a very old version.
+
+BlobSignatureTTL:
+
+ Duration for which new permission signatures (returned in PUT
+ responses) will be valid. This should be equal to the API
+ server's blob_signature_ttl configuration entry.
+
+SystemAuthTokenFile:
+
+ Local file containing the Arvados API token used by keep-balance
+ or data manager. Delete, trash, and index requests are honored
+ only for this token.
+
+EnableDelete:
+
+ Enable trash and delete features. If false, trash lists will be
+ accepted but blocks will not be trashed or deleted.
+
+TrashLifetime:
+
+ Time duration after a block is trashed during which it can be
+ recovered using an /untrash request.
+
+TrashCheckInterval:
+
+ How often to check for (and delete) trashed blocks whose
+ TrashLifetime has expired.
+
+Volumes:
+
+ List of storage volumes. If omitted or empty, the default is to
+ use all directories named "keep" that exist in the top level
+ directory of a mount point at startup time.
+
+ Volume types: %s
+
+ (See volume configuration examples above.)
+
+`, exampleConfigFile, knownTypeList)
+}
// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
// etc.
type Volume interface {
+ // Volume type as specified in config file. Examples: "S3",
+ // "Directory".
+ Type() string
+
+ // Do whatever private setup tasks and configuration checks
+ // are needed. Return non-nil if the volume is unusable (e.g.,
+ // invalid config).
+ Start() error
+
// Get a block: copy the block data into buf, and return the
// number of bytes copied.
//
// loc is as described in Get.
//
// If the timestamp for the given locator is newer than
- // blobSignatureTTL, Trash must not trash the data.
+ // BlobSignatureTTL, Trash must not trash the data.
//
// If a Trash operation overlaps with any Touch or Put
// operations on the same locator, the implementation must
// reliably or fail outright.
//
// Corollary: A successful Touch or Put guarantees a block
- // will not be trashed for at least blobSignatureTTL
+ // will not be trashed for at least BlobSignatureTTL
// seconds.
Trash(loc string) error
// responses to PUT requests.
Replication() int
- // EmptyTrash looks for trashed blocks that exceeded trashLifetime
+ // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
// and deletes them from the volume.
EmptyTrash()
}
+// A VolumeWithExamples provides example configs to display in the
+// -help message.
+type VolumeWithExamples interface {
+ Volume
+ Examples() []Volume
+}
+
// A VolumeManager tells callers which volumes can read, which volumes
// can write, and on which volume the next write should be attempted.
type VolumeManager interface {
"strings"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
)
func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- blobSignatureTTL = 300 * time.Second
+ theConfig.BlobSignatureTTL.Set("5m")
if v.Writable() == false {
return
}
// Calling Delete() for a block with a timestamp older than
-// blobSignatureTTL seconds in the past should delete the data.
+// BlobSignatureTTL seconds in the past should delete the data.
// Test is intended for only writable volumes
func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- blobSignatureTTL = 300 * time.Second
+ theConfig.BlobSignatureTTL.Set("5m")
if v.Writable() == false {
return
}
v.Put(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
}
-// With trashLifetime != 0, perform:
+// With TrashLifetime != 0, perform:
// Trash an old block - which either raises ErrNotImplemented or succeeds
// Untrash - which either raises ErrNotImplemented or succeeds
// Get - which must succeed
v := factory(t)
defer v.Teardown()
defer func() {
- trashLifetime = 0
+ theConfig.TrashLifetime = 0
}()
- trashLifetime = 3600 * time.Second
+ theConfig.TrashLifetime.Set("1h")
// put block and backdate it
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
buf := make([]byte, BlockSize)
n, err := v.Get(TestHash, buf)
func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- defer func(orig time.Duration) {
- trashLifetime = orig
- }(trashLifetime)
+ defer func(orig arvados.Duration) {
+ theConfig.TrashLifetime = orig
+ }(theConfig.TrashLifetime)
checkGet := func() error {
buf := make([]byte, BlockSize)
// First set: EmptyTrash before reaching the trash deadline.
- trashLifetime = time.Hour
+ theConfig.TrashLifetime.Set("1h")
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
err := checkGet()
if err != nil {
err = v.Trash(TestHash)
if err == MethodDisabledError || err == ErrNotImplemented {
// Skip the trash tests for read-only volumes, and
- // volume types that don't support trashLifetime>0.
+ // volume types that don't support TrashLifetime>0.
return
}
}
// Because we Touch'ed, need to backdate again for next set of tests
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
// If the only block in the trash has already been untrashed,
// most volumes will fail a subsequent Untrash with a 404, but
}
// Untrash might have updated the timestamp, so backdate again
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
// Second set: EmptyTrash after the trash deadline has passed.
- trashLifetime = time.Nanosecond
+ theConfig.TrashLifetime.Set("1ns")
err = v.Trash(TestHash)
if err != nil {
// Trash it again, and this time call EmptyTrash so it really
// goes away.
// (In Azure volumes, un/trash changes Mtime, so first backdate again)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
err = v.Trash(TestHash)
err = checkGet()
if err == nil || !os.IsNotExist(err) {
// un-trashed copy doesn't get deleted along with it.
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
- trashLifetime = time.Nanosecond
+ theConfig.TrashLifetime.Set("1ns")
err = v.Trash(TestHash)
if err != nil {
t.Fatal(err)
}
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
// EmptyTrash should not delete the untrashed copy.
v.EmptyTrash()
// untrash the block whose deadline is "C".
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
- trashLifetime = time.Nanosecond
+ theConfig.TrashLifetime.Set("1ns")
err = v.Trash(TestHash)
if err != nil {
t.Fatal(err)
}
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
- trashLifetime = time.Hour
+ theConfig.TrashLifetime.Set("1h")
err = v.Trash(TestHash)
if err != nil {
t.Fatal(err)
return MethodDisabledError
}
if _, ok := v.Store[loc]; ok {
- if time.Since(v.Timestamps[loc]) < blobSignatureTTL {
+ if time.Since(v.Timestamps[loc]) < time.Duration(theConfig.BlobSignatureTTL) {
return nil
}
delete(v.Store, loc)
return os.ErrNotExist
}
-// TBD
+func (v *MockVolume) Type() string {
+ return "Mock"
+}
+
+func (v *MockVolume) Start() error {
+ return nil
+}
+
func (v *MockVolume) Untrash(loc string) error {
return nil
}
import (
"bufio"
- "errors"
"flag"
"fmt"
"io"
)
type unixVolumeAdder struct {
- *volumeSet
+ *Config
}
-func (vs *unixVolumeAdder) Set(value string) error {
- if dirs := strings.Split(value, ","); len(dirs) > 1 {
+// String implements flag.Value
+func (s *unixVolumeAdder) String() string {
+ return "-"
+}
+
+func (vs *unixVolumeAdder) Set(path string) error {
+ if dirs := strings.Split(path, ","); len(dirs) > 1 {
log.Print("DEPRECATED: using comma-separated volume list.")
for _, dir := range dirs {
if err := vs.Set(dir); err != nil {
}
return nil
}
- if len(value) == 0 || value[0] != '/' {
- return errors.New("Invalid volume: must begin with '/'.")
- }
- if _, err := os.Stat(value); err != nil {
- return err
- }
- var locker sync.Locker
- if flagSerializeIO {
- locker = &sync.Mutex{}
- }
- *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
- root: value,
- locker: locker,
- readonly: flagReadonly,
+ vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
+ Root: path,
+ ReadOnly: deprecated.flagReadonly,
+ Serialize: deprecated.flagSerializeIO,
})
return nil
}
func init() {
- flag.Var(
- &unixVolumeAdder{&volumes},
- "volumes",
- "Deprecated synonym for -volume.")
- flag.Var(
- &unixVolumeAdder{&volumes},
- "volume",
- "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+ VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
+
+ flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
+ flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
}
// Discover adds a UnixVolume for every directory named "keep" that is
}
// Set the -readonly flag (but only for this volume)
// if the filesystem is mounted readonly.
- flagReadonlyWas := flagReadonly
+ flagReadonlyWas := deprecated.flagReadonly
for _, fsopt := range strings.Split(args[3], ",") {
if fsopt == "ro" {
- flagReadonly = true
+ deprecated.flagReadonly = true
break
}
if fsopt == "rw" {
} else {
added++
}
- flagReadonly = flagReadonlyWas
+ deprecated.flagReadonly = flagReadonlyWas
}
return added
}
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- // path to the volume's root directory
- root string
+ Root string // path to the volume's root directory
+ ReadOnly bool
+ Serialize bool
+ DirectoryReplication int
+
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
- locker sync.Locker
- readonly bool
+ locker sync.Locker
+}
+
+// Examples implements VolumeWithExamples.
+func (*UnixVolume) Examples() []Volume {
+ return []Volume{
+ &UnixVolume{
+ Root: "/mnt/local-disk",
+ Serialize: true,
+ DirectoryReplication: 1,
+ },
+ &UnixVolume{
+ Root: "/mnt/network-disk",
+ Serialize: false,
+ DirectoryReplication: 2,
+ },
+ }
+}
+
+// Type implements Volume
+func (v *UnixVolume) Type() string {
+ return "Directory"
+}
+
+// Start implements Volume
+func (v *UnixVolume) Start() error {
+ if v.Serialize {
+ v.locker = &sync.Mutex{}
+ }
+ if !strings.HasPrefix(v.Root, "/") {
+ return fmt.Errorf("volume root does not start with '/': %q", v.Root)
+ }
+ if v.DirectoryReplication == 0 {
+ v.DirectoryReplication = 1
+ }
+ _, err := os.Stat(v.Root)
+ return err
}
// Touch sets the timestamp for the given locator to the current time
func (v *UnixVolume) Touch(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
p := v.blockPath(loc)
// returns a FullError. If the write fails due to some other error,
// that error is returned.
func (v *UnixVolume) Put(loc string, block []byte) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
if v.IsFull() {
var fs syscall.Statfs_t
var devnum uint64
- if fi, err := os.Stat(v.root); err == nil {
+ if fi, err := os.Stat(v.Root); err == nil {
devnum = fi.Sys().(*syscall.Stat_t).Dev
} else {
log.Printf("%s: os.Stat: %s\n", v, err)
return nil
}
- err := syscall.Statfs(v.root, &fs)
+ err := syscall.Statfs(v.Root, &fs)
if err != nil {
log.Printf("%s: statfs: %s\n", v, err)
return nil
// uses fs.Blocks - fs.Bfree.
free := fs.Bavail * uint64(fs.Bsize)
used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
- return &VolumeStatus{v.root, devnum, free, used}
+ return &VolumeStatus{v.Root, devnum, free, used}
}
var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
//
func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
var lastErr error
- rootdir, err := os.Open(v.root)
+ rootdir, err := os.Open(v.Root)
if err != nil {
return err
}
if !blockDirRe.MatchString(names[0]) {
continue
}
- blockdirpath := filepath.Join(v.root, names[0])
+ blockdirpath := filepath.Join(v.Root, names[0])
blockdir, err := os.Open(blockdirpath)
if err != nil {
log.Print("Error reading ", blockdirpath, ": ", err)
}
// Trash trashes the block data from the unix storage
-// If trashLifetime == 0, the block is deleted
+// If TrashLifetime == 0, the block is deleted
// Else, the block is renamed as path/{loc}.trash.{deadline},
-// where deadline = now + trashLifetime
+// where deadline = now + TrashLifetime
func (v *UnixVolume) Trash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
// Trash() will read the correct up-to-date timestamp and choose not to
// trash the file.
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
if v.locker != nil {
// anyway (because the permission signatures have expired).
if fi, err := os.Stat(p); err != nil {
return err
- } else if time.Since(fi.ModTime()) < blobSignatureTTL {
+ } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
return nil
}
- if trashLifetime == 0 {
+ if theConfig.TrashLifetime == 0 {
return os.Remove(p)
}
- return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+ return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
}
// Untrash moves block from trash back into store
// Look for path/{loc}.trash.{deadline} in storage,
// and rename the first such file as path/{loc}
func (v *UnixVolume) Untrash(loc string) (err error) {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
// blockDir returns the fully qualified directory name for the directory
// where loc is (or would be) stored on this volume.
func (v *UnixVolume) blockDir(loc string) string {
- return filepath.Join(v.root, loc[0:3])
+ return filepath.Join(v.Root, loc[0:3])
}
// blockPath returns the fully qualified pathname for the path to loc
// MinFreeKilobytes.
//
func (v *UnixVolume) IsFull() (isFull bool) {
- fullSymlink := v.root + "/full"
+ fullSymlink := v.Root + "/full"
// Check if the volume has been marked as full in the last hour.
if link, err := os.Readlink(fullSymlink); err == nil {
//
func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
var fs syscall.Statfs_t
- err = syscall.Statfs(v.root, &fs)
+ err = syscall.Statfs(v.Root, &fs)
if err == nil {
// Statfs output is not guaranteed to measure free
// space in terms of 1K blocks.
}
func (v *UnixVolume) String() string {
- return fmt.Sprintf("[UnixVolume %s]", v.root)
+ return fmt.Sprintf("[UnixVolume %s]", v.Root)
}
// Writable returns false if all future Put, Mtime, and Delete calls
// are expected to fail.
func (v *UnixVolume) Writable() bool {
- return !v.readonly
+ return !v.ReadOnly
}
// Replication returns the number of replicas promised by the
-// underlying device (currently assumed to be 1).
+// underlying device (as specified in configuration).
func (v *UnixVolume) Replication() int {
- return 1
+ return v.DirectoryReplication
}
// lockfile and unlockfile use flock(2) to manage kernel file locks.
var bytesDeleted, bytesInTrash int64
var blocksDeleted, blocksInTrash int
- err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+ err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
return nil
}
return &TestableUnixVolume{
UnixVolume: UnixVolume{
- root: d,
+ Root: d,
+ ReadOnly: readonly,
locker: locker,
- readonly: readonly,
},
t: t,
}
// the volume is readonly.
func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
defer func(orig bool) {
- v.readonly = orig
- }(v.readonly)
- v.readonly = false
+ v.ReadOnly = orig
+ }(v.ReadOnly)
+ v.ReadOnly = false
err := v.Put(locator, data)
if err != nil {
v.t.Fatal(err)
}
func (v *TestableUnixVolume) Teardown() {
- if err := os.RemoveAll(v.root); err != nil {
+ if err := os.RemoveAll(v.Root); err != nil {
v.t.Fatal(err)
}
}
})
}
+func TestReplicationDefault1(t *testing.T) {
+ v := &UnixVolume{
+ Root: "/",
+ ReadOnly: true,
+ }
+ if err := v.Start(); err != nil {
+ t.Error(err)
+ }
+ if got := v.Replication(); got != 1 {
+ t.Errorf("Replication() returned %d, expected 1 if no config given", got)
+ }
+}
+
func TestGetNotFound(t *testing.T) {
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
if err != nil {
t.Error(err)
}
- p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+ p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
if buf, err := ioutil.ReadFile(p); err != nil {
t.Error(err)
} else if bytes.Compare(buf, TestBlock) != 0 {
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- os.Chmod(v.root, 000)
+ os.Chmod(v.Root, 000)
err := v.Put(TestHash, TestBlock)
if err == nil {
t.Error("Write should have failed")
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- fullPath := v.root + "/full"
+ fullPath := v.Root + "/full"
now := fmt.Sprintf("%d", time.Now().Unix())
os.Symlink(now, fullPath)
if !v.IsFull() {
// Get node status and make a basic sanity check.
volinfo := v.Status()
- if volinfo.MountPoint != v.root {
- t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
+ if volinfo.MountPoint != v.Root {
+ t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
}
if volinfo.DeviceNum == 0 {
t.Errorf("uninitialized device_num in %v", volinfo)
t.Errorf("Got err %q, expected %q", err, DiskHashError)
}
- p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+ p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
os.Chmod(p, 000)
err = v.Compare(TestHash, TestBlock)
if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
end
end
+exclusive_mode = ARGV.index("--exclusive")
+exclusive_banner = "#######################################################################################
+# THIS FILE IS MANAGED BY #{$0} -- CHANGES WILL BE OVERWRITTEN #
+#######################################################################################\n\n"
+start_banner = "### BEGIN Arvados-managed keys -- changes between markers will be overwritten\n"
+end_banner = "### END Arvados-managed keys -- changes between markers will be overwritten\n"
+
keys = ''
seen = Hash.new
@homedir = Etc.getpwnam(l[:username]).dir
userdotssh = File.join(@homedir, ".ssh")
Dir.mkdir(userdotssh) if !File.exists?(userdotssh)
- @key = "#######################################################################################
-# THIS FILE IS MANAGED BY #{$0} -- CHANGES WILL BE OVERWRITTEN #
-#######################################################################################\n\n"
- @key += keys[l[:username]].join("\n") + "\n"
- userauthkeys = File.join(userdotssh, "authorized_keys")
- if !File.exists?(userauthkeys) or IO::read(userauthkeys) != @key then
- f = File.new(userauthkeys, 'w')
- f.write(@key)
+
+ newkeys = "###\n###\n" + keys[l[:username]].join("\n") + "\n###\n###\n"
+
+ keysfile = File.join(userdotssh, "authorized_keys")
+
+ if File.exists?(keysfile)
+ oldkeys = IO::read(keysfile)
+ else
+ oldkeys = ""
+ end
+
+ if exclusive_mode
+ newkeys = exclusive_banner + newkeys
+ elsif oldkeys.start_with?(exclusive_banner)
+ newkeys = start_banner + newkeys + end_banner
+ elsif (m = /^(.*?\n|)#{start_banner}(.*?\n|)#{end_banner}(.*)/m.match(oldkeys))
+ newkeys = m[1] + start_banner + newkeys + end_banner + m[3]
+ else
+ newkeys = start_banner + newkeys + end_banner + oldkeys
+ end
+
+ if oldkeys != newkeys then
+ f = File.new(keysfile, 'w')
+ f.write(newkeys)
f.close()
end
FileUtils.chown_R(l[:username], nil, userdotssh)
File.chmod(0700, userdotssh)
File.chmod(0750, @homedir)
- File.chmod(0600, userauthkeys)
+ File.chmod(0600, keysfile)
end
devnull.close