begin
if not model_class
@object = nil
+ elsif params[:uuid].nil? or params[:uuid].empty?
+ @object = nil
elsif not params[:uuid].is_a?(String)
@object = model_class.where(uuid: params[:uuid]).first
- elsif params[:uuid].empty?
- @object = nil
elsif (model_class != Link and
resource_class_for_uuid(params[:uuid]) == Link)
@name_link = Link.find(params[:uuid])
@object = model_class.find(@name_link.head_uuid)
else
@object = model_class.find(params[:uuid])
+ load_preloaded_objects [@object]
end
rescue ArvadosApiClient::NotFoundException, ArvadosApiClient::NotLoggedInException, RuntimeError => error
if error.is_a?(RuntimeError) and (error.message !~ /^argument to find\(/)
# helper method to get object of a given dataclass and uuid
helper_method :object_for_dataclass
- def object_for_dataclass dataclass, uuid
+ def object_for_dataclass dataclass, uuid, by_attr=nil
raise ArgumentError, 'No input argument dataclass' unless (dataclass && uuid)
- preload_objects_for_dataclass(dataclass, [uuid])
+ preload_objects_for_dataclass(dataclass, [uuid], by_attr)
@objects_for[uuid]
end
# helper method to preload objects for given dataclass and uuids
helper_method :preload_objects_for_dataclass
- def preload_objects_for_dataclass dataclass, uuids
+ def preload_objects_for_dataclass dataclass, uuids, by_attr=nil
@objects_for ||= {}
raise ArgumentError, 'Argument is not a data class' unless dataclass.is_a? Class
uuids.each do |x|
@objects_for[x] = nil
end
- dataclass.where(uuid: uuids).each do |obj|
- @objects_for[obj.uuid] = obj
+ if by_attr and ![:uuid, :name].include?(by_attr)
+ raise ArgumentError, "Preloading only using lookups by uuid or name are supported: #{by_attr}"
+ elsif by_attr and by_attr == :name
+ dataclass.where(name: uuids).each do |obj|
+ @objects_for[obj.name] = obj
+ end
+ else
+ dataclass.where(uuid: uuids).each do |obj|
+ @objects_for[obj.uuid] = obj
+ end
end
@objects_for
end
+ # helper method to load objects that are already preloaded
+ helper_method :load_preloaded_objects
+ def load_preloaded_objects objs
+ @objects_for ||= {}
+ objs.each do |obj|
+ @objects_for[obj.uuid] = obj
+ end
+ end
+
def wiselinks_layout
'body'
end
else
begin
if resource_class.name == 'Collection'
- link_name = collections_for_object(link_uuid).andand.first.andand.friendly_link_name
+ if CollectionsHelper.match(link_uuid)
+ link_name = collection_for_pdh(link_uuid).andand.first.andand.portable_data_hash
+ else
+ link_name = collections_for_object(link_uuid).andand.first.andand.friendly_link_name
+ end
else
link_name = object_for_dataclass(resource_class, link_uuid).andand.friendly_link_name
end
@unreadable_children
end
- def readable?
- resource_class = ArvadosBase::resource_class_for_uuid(uuid)
- resource_class.where(uuid: [uuid]).first rescue nil
- end
-
- def link_to_log
- if state_label.in? ["Complete", "Failed", "Cancelled"]
- lc = log_collection
- if lc
- logCollection = Collection.find? lc
- if logCollection
- ApplicationController.helpers.link_to("Log", "#{uri}#Log")
- else
- "Log unavailable"
- end
- end
- elsif state_label == "Running"
- if readable?
- ApplicationController.helpers.link_to("Log", "#{uri}#Log")
- else
- "Log unavailable"
- end
- end
- end
-
def walltime
if state_label != "Queued"
if started_at
# returns true if this work unit can be canceled
end
- def readable?
- # is the proxied object readable by current user?
- end
-
def uri
# returns the uri for this work unit
end
end
# view helper methods
- def link_to_log
- # display a link to log if present
- end
-
def walltime
# return walltime for a running or completed work unit
end
</div>
<div class="col-md-6">
<table>
- <% # link to repo tree/file only if the repo is readable
- # and the commit is a sha1...
- repo =
- (/^[0-9a-f]{40}$/ =~ current_obj.script_version and
- Repository.where(name: current_obj.repository).first)
-
- # ...and the api server provides an http:// or https:// url
+ <% # link to repo tree/file only if the repo is readable and the commit is a sha1
+ repo = (/^[0-9a-f]{40}$/ =~ current_obj.script_version and
+ current_obj.repository and
+ object_for_dataclass(Repository, current_obj.repository, :name))
repo = nil unless repo.andand.http_fetch_url
%>
<% [:script, :repository, :script_version, :supplied_script_version, :nondeterministic,
<div class="col-md-8"></div>
<% else %>
<div class="col-md-1">
- <%= current_obj.link_to_log %>
+ <%= render partial: 'work_units/show_log_link', locals: {wu: current_obj} %>
</div>
<% walltime = current_obj.walltime %>
uuids = wu.children.collect {|c| c.uuid}.compact
if uuids.any?
resource_class = resource_class_for_uuid(uuids.first, friendly_name: true)
- preload_objects_for_dataclass resource_class, uuids
+
+ start = 0; inc = 200
+ while start < uuids.length
+ preload_objects_for_dataclass resource_class, uuids[start, inc]
+ start += inc
+ end
end
+ collections = wu.outputs.flatten.uniq
+ collections << wu.log_collection if wu.log_collection
+ collections << wu.docker_image if wu.docker_image
collections = wu.children.collect {|j| j.outputs}.compact
collections = collections.flatten.uniq
collections.concat wu.children.collect {|j| j.docker_image}.uniq.compact
+ collections.concat wu.children.collect {|j| j.log_collection}.uniq.compact
collections_pdhs = collections.select {|x| !(m = CollectionsHelper.match(x)).nil?}.uniq.compact
collections_uuids = collections - collections_pdhs
- preload_collections_for_objects collections_uuids if collections_uuids.any?
- preload_for_pdhs collections_pdhs if collections_pdhs.any?
+
+ if collections_uuids.any?
+ start = 0; inc = 200
+ while start < collections_uuids.length
+ preload_collections_for_objects collections_uuids[start, inc]
+ start += inc
+ end
+ end
+
+ if collections_pdhs.any?
+ start = 0; inc = 200
+ while start < collections_pdhs.length
+ preload_for_pdhs collections_pdhs[start, inc]
+ start += inc
+ end
+ end
+
+ repos = wu.children.collect {|c| c.repository}.uniq.compact
+ preload_objects_for_dataclass(Repository, repos, :name) if repos.any?
%>
<% if wu.has_unreadable_children %>
--- /dev/null
+<% if wu.state_label.in? ["Complete", "Failed", "Cancelled"] %>
+ <% lc = wu.log_collection %>
+ <% if lc and object_readable(lc, Collection) and object_readable(wu.uuid) %>
+ <%= link_to("Log", "#{wu.uri}#Log") %>
+ <% else %>
+ Log unavailable
+ <% end %>
+<% elsif wu.state_label == "Running" %>
+ <% if object_readable(wu.uuid) %>
+ <%= link_to("Log", "#{wu.uri}#Log") %>
+ <% else %>
+ Log unavailable
+ <% end %>
+<% end %>
}]
get :index, encoded_params, session_for(:active)
end
+
+ [
+ [Job, 'active', 'running_job_with_components', '/jobs/zzzzz-8i9sb-jyq01m7in1jlofj#Log'],
+ [PipelineInstance, 'active', 'pipeline_in_running_state', '/jobs/zzzzz-8i9sb-pshmckwoma9plh7#Log'],
+ [PipelineInstance, nil, 'pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', 'Log unavailable'],
+ ].each do |type, token, fixture, log_link|
+ test "link_to_log for #{fixture} for #{token}" do
+ use_token 'admin'
+ obj = find_fixture(type, fixture)
+
+ @controller = if type == Job then JobsController.new else PipelineInstancesController.new end
+
+ if token
+ get :show, {id: obj['uuid']}, session_for(token)
+ else
+ Rails.configuration.anonymous_user_token =
+ api_fixture("api_client_authorizations", "anonymous", "api_token")
+ get :show, {id: obj['uuid']}
+ end
+
+ assert_includes @response.body, log_link
+ end
+ end
end
page.assert_selector 'a,button', text: 'Pause'
# Wait for pipeline run to complete
- wait_until_page_has 'Complete', pipeline_config['max_wait_seconds']
+ wait_until_page_has 'completed', pipeline_config['max_wait_seconds']
end
end
# Looks for the text_to_look_for for up to the max_time provided
def wait_until_page_has text_to_look_for, max_time=30
max_time = 30 if (!max_time || (max_time.to_s != max_time.to_i.to_s))
+ text_found = false
Timeout.timeout(max_time) do
- loop until page.has_text?(text_to_look_for)
+ until text_found do
+ visit_page_with_token 'active', current_path
+ text_found = has_text?(text_to_look_for)
+ end
end
end
-
end
end
end
- [
- [Job, 'active', 'running_job_with_components', true],
- [Job, 'active', 'queued', false],
- [Job, nil, 'completed_job_in_publicly_accessible_project', true],
- [Job, 'active', 'completed_job_in_publicly_accessible_project', true],
- [PipelineInstance, 'active', 'pipeline_in_running_state', true], # no log, but while running the log link points to pi Log tab
- [PipelineInstance, nil, 'pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', false],
- [PipelineInstance, 'active', 'pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', false], #no log for completed pi
- [Job, nil, 'job_in_publicly_accessible_project_but_other_objects_elsewhere', false, "Log unavailable"],
- ].each do |type, token, fixture, has_log, log_link|
- test "link_to_log for #{fixture} for #{token}" do
- use_token token if token
- obj = find_fixture(type, fixture)
- wu = obj.work_unit
-
- link = "#{wu.uri}#Log" if has_log
- link_to_log = wu.link_to_log
-
- if has_log
- assert_includes link_to_log, link
- else
- assert_equal log_link, link_to_log
- end
- end
- end
-
test 'can_cancel?' do
use_token 'active' do
assert find_fixture(Job, 'running').work_unit.can_cancel?
<notextile>
<pre><code>~$ <span class="userinput">sudo yum install git arvados-git-httpd</span>
+~$ <span class="userinput">sudo systemctl enable arvados-git-httpd</span>
</code></pre>
</notextile>
<notextile>
<pre><code>~$ <span class="userinput">arvados-git-httpd -h</span>
-Usage of arvados-git-httpd:
- -address="0.0.0.0:80": Address to listen on, "host:port".
- -git-command="/usr/bin/git": Path to git executable. Each authenticated request will execute this program with a single argument, "http-backend".
- -repo-root="/path/to/cwd": Path to git repositories.
+[...]
+Usage: arvados-git-httpd [-config path/to/arvados/git-httpd.yml]
+[...]
~$ <span class="userinput">git http-backend</span>
Status: 500 Internal Server Error
Expires: Fri, 01 Jan 1980 00:00:00 GMT
h3. Enable arvados-git-httpd
-Install runit to supervise the arvados-git-httpd daemon. {% include 'install_runit' %}
+{% include 'notebox_begin' %}
+
+The arvados-git-httpd package includes configuration files for systemd. If you're using a different init system, you'll need to configure a service to start and stop an @arvados-git-httpd@ process as desired.
+
+{% include 'notebox_end' %}
-Configure runit to run arvados-git-httpd, making sure to update the API host to match your site:
+Create the configuration file @/etc/arvados/git-httpd/git-httpd.yml@. Run @arvados-git-httpd -h@ to learn more about configuration entries.
<notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/sv</span>
-~$ <span class="userinput">cd /etc/sv</span>
-/etc/sv$ <span class="userinput">sudo mkdir arvados-git-httpd; cd arvados-git-httpd</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo mkdir log</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo sh -c 'cat >log/run' <<'EOF'
-#!/bin/sh
-mkdir -p main
-chown git:git main
-exec chpst -u git:git svlogd -tt main
-EOF</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo sh -c 'cat >run' <<'EOF'
-#!/bin/sh
-export ARVADOS_API_HOST=<b>uuid_prefix.your.domain</b>
-export GITOLITE_HTTP_HOME=/var/lib/arvados/git
-export GL_BYPASS_ACCESS_CHECKS=1
-export PATH="$PATH:/var/lib/arvados/git/bin"
-exec chpst -u git:git arvados-git-httpd -address=:9001 -git-command=/var/lib/arvados/git/gitolite/src/gitolite-shell -repo-root=<b>/var/lib/arvados/git</b>/repositories 2>&1
-EOF</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo chmod +x run log/run</span>
-/etc/sv/arvados-git-httpd$ <span class="userinput">sudo ln -s "$(pwd)" /etc/service/</span>
+<pre><code>Client:
+ APIHost: <b>uuid_prefix.your.domain</b>
+ Insecure: false
+GitCommand: /var/lib/arvados/git/gitolite/src/gitolite-shell
+GitoliteHome: /var/lib/arvados/git
+Listen: :9001
+RepoRoot: /var/lib/arvados/git/repositories
</code></pre>
</notextile>
-If you are using a different daemon supervisor, or if you want to test the daemon in a terminal window, an equivalent shell command to run arvados-git-httpd is:
+Restart the systemd service to ensure the new configuration is used.
<notextile>
-<pre><code>sudo -u git \
- ARVADOS_API_HOST=<span class="userinput">uuid_prefix.your.domain</span> \
- GITOLITE_HTTP_HOME=/var/lib/arvados/git \
- GL_BYPASS_ACCESS_CHECKS=1 \
- PATH="$PATH:/var/lib/arvados/git/bin" \
- arvados-git-httpd -address=:9001 -git-command=/var/lib/arvados/git/gitolite/src/gitolite-shell -repo-root=/var/lib/arvados/git/repositories 2>&1
+<pre><code>~$ <span class="userinput">sudo systemctl restart arvados-git-httpd</span>
</code></pre>
</notextile>
end
assert_equal "#{Digest::MD5.hexdigest('foo')} ./foo\n", err
assert_equal '', out
- assert_equal false, File.exists?('tmp/foo')
+ assert_equal false, File.exist?('tmp/foo')
end
def test_sha1_nowrite
end
assert_equal "#{Digest::SHA1.hexdigest('foo')} ./foo\n", err
assert_equal '', out
- assert_equal false, File.exists?('tmp/foo')
+ assert_equal false, File.exist?('tmp/foo')
end
def test_block_to_file
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner
+from. runner import Runner, upload_instance
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess
self.pipeline = None
self.final_output_collection = None
self.output_name = output_name
+ self.project_uuid = None
+
if keep_client is not None:
self.keep_client = keep_client
else:
kwargs["docker_outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
+ upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
+
runnerjob = None
if kwargs.get("submit"):
if self.work_api == "containers":
logger.info("Pipeline instance %s", self.pipeline["uuid"])
if runnerjob and not kwargs.get("wait"):
- runnerjob.run()
+ runnerjob.run(wait=kwargs.get("wait"))
return runnerjob.uuid
self.poll_api = arvados.api('v1')
import re
import copy
import json
+import time
from cwltool.process import get_feature, shortname
from cwltool.errors import WorkflowException
workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ # Need to filter this out, gets added by cwltool when providing
+ # parameters on the command line, and arv-run-pipeline-instance doesn't
+ # like it.
+ if "job_order" in self.job_order:
+ del self.job_order["job_order"]
+
self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
if self.output_name:
self.job_order["arv:output_name"] = self.output_name
def run(self, *args, **kwargs):
job_spec = self.arvados_job_spec(*args, **kwargs)
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-
- response = self.arvrunner.api.jobs().create(
- body=job_spec,
- find_or_create=self.enable_reuse
- ).execute(num_retries=self.arvrunner.num_retries)
- self.uuid = response["uuid"]
+ for k,v in job_spec["script_parameters"].items():
+ if isinstance(v, dict):
+ job_spec["script_parameters"][k] = {"value": v}
+
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
+ body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": shortname(self.tool.tool["id"]),
+ "components": {"cwl-runner": job_spec },
+ "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
+ logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
+
+ if kwargs.get("wait") is False:
+ self.uuid = self.arvrunner.pipeline["uuid"]
+ return
+
+ job = None
+ while not job:
+ time.sleep(2)
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().get(
+ uuid=self.arvrunner.pipeline["uuid"]).execute(
+ num_retries=self.arvrunner.num_retries)
+ job = self.arvrunner.pipeline["components"]["cwl-runner"].get("job")
+ if not job and self.arvrunner.pipeline["state"] != "RunningOnServer":
+ raise WorkflowException("Submitted pipeline is %s" % (self.arvrunner.pipeline["state"]))
+
+ self.uuid = job["uuid"]
self.arvrunner.processes[self.uuid] = self
- logger.info("Submitted job %s", response["uuid"])
-
- if kwargs.get("submit"):
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": shortname(self.tool.tool["id"]),
- "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
- "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
+ if job["state"] in ("Complete", "Failed", "Cancelled"):
+ self.done(job)
class RunnerTemplate(object):
# Local FS ref, may need to be uploaded or may be on keep
# mount.
ab = abspath(src, self.input_basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
+ st = arvados.commands.run.statfile("", ab, fnPattern="keep:%s/%s")
if isinstance(st, arvados.commands.run.UploadFile):
uploadfiles.add((src, ab, st))
elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+ self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
elif src.startswith("_:"):
if "contents" in srcobj:
pass
def setup(self, referenced_files, basedir):
# type: (List[Any], unicode) -> None
- self._pathmap = self.arvrunner.get_uploaded()
uploadfiles = set()
+ for k,v in self.arvrunner.get_uploaded().iteritems():
+ self._pathmap[k] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+
for srcobj in referenced_files:
self.visit(srcobj, uploadfiles)
self.arvrunner.api,
dry_run=False,
num_retries=self.arvrunner.num_retries,
- fnPattern=self.file_pattern,
+ fnPattern="keep:%s/%s",
name=self.name,
project=self.arvrunner.project_uuid)
for src, ab, st in uploadfiles:
- self._pathmap[src] = MapperEnt("keep:" + st.keepref, st.fn, "File")
+ self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
self.arvrunner.add_uploaded(src, self._pathmap[src])
for srcobj in referenced_files:
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
+def upload_instance(arvrunner, name, tool, job_order):
+ upload_docker(arvrunner, tool)
+
+ workflowmapper = upload_dependencies(arvrunner,
+ name,
+ tool.doc_loader,
+ tool.tool,
+ tool.tool["id"],
+ True)
+
+ jobmapper = upload_dependencies(arvrunner,
+ os.path.basename(job_order.get("id", "#")),
+ tool.doc_loader,
+ job_order,
+ job_order.get("id", "#"),
+ False)
+
+ adjustDirObjs(job_order, trim_listing)
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ return workflowmapper
+
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse, output_name):
pass
def arvados_job_spec(self, *args, **kwargs):
- upload_docker(self.arvrunner, self.tool)
-
self.name = os.path.basename(self.tool.tool["id"])
-
- workflowmapper = upload_dependencies(self.arvrunner,
- self.name,
- self.tool.doc_loader,
- self.tool.tool,
- self.tool.tool["id"],
- True)
-
- jobmapper = upload_dependencies(self.arvrunner,
- os.path.basename(self.job_order.get("id", "#")),
- self.tool.doc_loader,
- self.job_order,
- self.job_order.get("id", "#"),
- False)
-
- adjustDirObjs(self.job_order, trim_listing)
-
- if "id" in self.job_order:
- del self.job_order["id"]
-
- return workflowmapper
-
+ return upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
def done(self, record):
if record["state"] == "Complete":
--- /dev/null
+print "Hello world"
--- /dev/null
+import functools
+import mock
+import sys
+import unittest
+import json
+import logging
+import os
+
+import arvados
+import arvados.keep
+import arvados.collection
+import arvados_cwl
+
+from cwltool.pathmapper import MapperEnt
+
+from arvados_cwl.pathmapper import ArvPathMapper
+
+def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
+ pdh = "99999999999999999999999999999991+99"
+ for c in files:
+ c.fn = fnPattern % (pdh, os.path.basename(c.fn))
+
+class TestPathmap(unittest.TestCase):
+ def test_keepref(self):
+ """Test direct keep references."""
+
+ arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+
+ p = ArvPathMapper(arvrunner, [{
+ "class": "File",
+ "location": "keep:99999999999999999999999999999991+99/hw.py"
+ }], "", "/test/%s", "/test/%s/%s")
+
+ self.assertEqual({'keep:99999999999999999999999999999991+99/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ p._pathmap)
+
+ @mock.patch("arvados.commands.run.uploadfiles")
+ def test_upload(self, upl):
+ """Test pathmapper uploading files."""
+
+ arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+
+ upl.side_effect = upload_mock
+
+ p = ArvPathMapper(arvrunner, [{
+ "class": "File",
+ "location": "tests/hw.py"
+ }], "", "/test/%s", "/test/%s/%s")
+
+ self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ p._pathmap)
+
+ @mock.patch("arvados.commands.run.uploadfiles")
+ def test_prev_uploaded(self, upl):
+ """Test pathmapper handling previously uploaded files."""
+
+ arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+ arvrunner.add_uploaded('tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File'))
+
+ upl.side_effect = upload_mock
+
+ p = ArvPathMapper(arvrunner, [{
+ "class": "File",
+ "location": "tests/hw.py"
+ }], "", "/test/%s", "/test/%s/%s")
+
+ self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ p._pathmap)
+
+ @mock.patch("arvados.commands.run.uploadfiles")
+ @mock.patch("arvados.commands.run.statfile")
+ def test_statfile(self, statfile, upl):
+ """Test pathmapper handling ArvFile references."""
+ arvrunner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+
+ # An ArvFile object returned from arvados.commands.run.statfile means the file is located on a
+ # keep mount, so we can construct a direct reference directly without upload.
+ def statfile_mock(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
+ st = arvados.commands.run.ArvFile("", fnPattern % ("99999999999999999999999999999991+99", "hw.py"))
+ return st
+
+ upl.side_effect = upload_mock
+ statfile.side_effect = statfile_mock
+
+ p = ArvPathMapper(arvrunner, [{
+ "class": "File",
+ "location": "tests/hw.py"
+ }], "", "/test/%s", "/test/%s/%s")
+
+ self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ p._pathmap)
"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz5",
"portable_data_hash": "99999999999999999999999999999995+99",
"manifest_text": ""
- } )
+ },
+ {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz6",
+ "portable_data_hash": "99999999999999999999999999999996+99",
+ "manifest_text": ""
+ }
+ )
stubs.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99", "manifest_text": "./tool 00000000000000000000000000000000+0 0:0:submit_tool.cwl 0:0:blub.txt"}
'script_version': 'master',
'script': 'cwl-runner'
}
+ stubs.pipeline_component = stubs.expect_job_spec.copy()
+ stubs.expect_pipeline_instance = {
+ 'name': 'submit_wf.cwl',
+ 'state': 'RunningOnServer',
+ "components": {
+ "cwl-runner": {
+ 'runtime_constraints': {'docker_image': 'arvados/jobs'},
+ 'script_parameters': {
+ 'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
+ 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'}},
+ 'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
+ 'listing': [
+ {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
+ ]}},
+ 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+ },
+ 'repository': 'arvados',
+ 'script_version': 'master',
+ 'script': 'cwl-runner'
+ }
+ }
+ }
+ stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
+ stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+ stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
+ stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
+ stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
+ "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+ "state": "Queued"
+ }
+ stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
+ stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
stubs.expect_container_spec = {
'priority': 1,
class TestSubmit(unittest.TestCase):
+ @mock.patch("time.sleep")
@stubs
- def test_submit(self, stubs):
+ def test_submit(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait",
+ ["--submit", "--no-wait", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
}, ensure_unique_name=True),
mock.call().execute()])
- expect_job = copy.deepcopy(stubs.expect_job_spec)
- expect_job["owner_uuid"] = stubs.fake_user_uuid
- stubs.api.jobs().create.assert_called_with(
- body=expect_job,
- find_or_create=True)
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=expect_pipeline)
self.assertEqual(capture_stdout.getvalue(),
- stubs.expect_job_uuid + '\n')
+ stubs.expect_pipeline_uuid + '\n')
+ @mock.patch("time.sleep")
@stubs
- def test_submit_with_project_uuid(self, stubs):
+ def test_submit_with_project_uuid(self, stubs, tm):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
exited = arvados_cwl.main(
sys.stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- expect_body = copy.deepcopy(stubs.expect_job_spec)
- expect_body["owner_uuid"] = project_uuid
- stubs.api.jobs().create.assert_called_with(
- body=expect_body,
- find_or_create=True)
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["owner_uuid"] = project_uuid
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=expect_pipeline)
@stubs
def test_submit_container(self, stubs):
return time.Duration(d).String()
}
+// Duration returns a time.Duration
+func (d Duration) Duration() time.Duration {
+ return time.Duration(d)
+}
+
// Value implements flag.Value
func (d *Duration) Set(s string) error {
dur, err := time.ParseDuration(s)
package streamer
import (
+ "errors"
"io"
)
+var ErrAlreadyClosed = errors.New("cannot close a stream twice")
+
type AsyncStream struct {
buffer []byte
requests chan sliceRequest
add_reader chan bool
subtract_reader chan bool
wait_zero_readers chan bool
+ closed bool
}
// Reads from the buffer managed by the Transfer()
}
func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
- t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+ t := &AsyncStream{
+ buffer: make([]byte, buffersize),
+ requests: make(chan sliceRequest),
+ add_reader: make(chan bool),
+ subtract_reader: make(chan bool),
+ wait_zero_readers: make(chan bool),
+ }
go t.transfer(source)
go t.readersMonitor()
}
func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+ t := &AsyncStream{
+ buffer: buf,
+ requests: make(chan sliceRequest),
+ add_reader: make(chan bool),
+ subtract_reader: make(chan bool),
+ wait_zero_readers: make(chan bool),
+ }
go t.transfer(nil)
go t.readersMonitor()
// Close the responses channel
func (this *StreamReader) Close() error {
+ if this.stream == nil {
+ return ErrAlreadyClosed
+ }
this.stream.subtract_reader <- true
close(this.responses)
this.stream = nil
return nil
}
-func (this *AsyncStream) Close() {
+func (this *AsyncStream) Close() error {
+ if this.closed {
+ return ErrAlreadyClosed
+ }
+ this.closed = true
this.wait_zero_readers <- true
close(this.requests)
close(this.add_reader)
close(this.subtract_reader)
close(this.wait_zero_readers)
+ return nil
}
writer.Write([]byte("baz"))
writer.Close()
}
+
+func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
+ buffer := make([]byte, 100)
+ tr := AsyncStreamFromSlice(buffer)
+ sr := tr.MakeStreamReader()
+ c.Check(sr.Close(), IsNil)
+ c.Check(sr.Close(), Equals, ErrAlreadyClosed)
+ c.Check(tr.Close(), IsNil)
+ c.Check(tr.Close(), Equals, ErrAlreadyClosed)
+}
import errno
import re
import logging
+import collections
+import uuid
from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
def __init__(self, keep, copies=None):
"""keep: KeepClient object to use"""
self._keep = keep
- self._bufferblocks = {}
+ self._bufferblocks = collections.OrderedDict()
self._put_queue = None
self._put_threads = None
self._prefetch_queue = None
ArvadosFile that owns this block
"""
+ return self._alloc_bufferblock(blockid, starting_capacity, owner)
+
+ def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
if blockid is None:
- blockid = "bufferblock%i" % len(self._bufferblocks)
+ blockid = "%s" % uuid.uuid4()
bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
self._bufferblocks[bufferblock.blockid] = bufferblock
return bufferblock
def __exit__(self, exc_type, exc_value, traceback):
self.stop_threads()
+ @synchronized
+ def repack_small_blocks(self, force=False, sync=False):
+ """Packs small blocks together before uploading"""
+ # Search blocks ready for getting packed together before being committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that it's
+ # size is <= KEEP_BLOCK_SIZE/2.
+ small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
+
+ # Check if there are enough small blocks for filling up one in full
+ pending_write_size = sum([b.size() for b in small_blocks])
+ if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+ new_bb = self._alloc_bufferblock()
+ while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+ bb = small_blocks.pop(0)
+ arvfile = bb.owner
+ new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+ arvfile.set_segments([Range(new_bb.blockid,
+ 0,
+ bb.size(),
+ new_bb.write_pointer - bb.size())])
+ self._delete_bufferblock(bb.blockid)
+ self.commit_bufferblock(new_bb, sync=sync)
+
def commit_bufferblock(self, block, sync):
"""Initiate a background upload of a bufferblock.
which case it will wait on an upload queue slot.
"""
-
try:
# Mark the block as PENDING so to disallow any more appends.
block.set_state(_BufferBlock.PENDING)
@synchronized
def delete_bufferblock(self, locator):
+ self._delete_bufferblock(locator)
+
+ def _delete_bufferblock(self, locator):
bb = self._bufferblocks[locator]
bb.clear()
del self._bufferblocks[locator]
are uploaded. Raises KeepWriteError() if any blocks failed to upload.
"""
+ self.repack_small_blocks(force=True, sync=True)
+
with self.lock:
items = self._bufferblocks.items()
for k,v in items:
- if v.state() != _BufferBlock.COMMITTED:
+ if v.state() != _BufferBlock.COMMITTED and v.owner:
v.owner.flush(sync=False)
with self.lock:
"""
self.parent = parent
self.name = name
+ self._writers = set()
self._committed = False
self._segments = []
self.lock = parent.root_collection().lock
def __ne__(self, other):
return not self.__eq__(other)
+ @synchronized
+ def set_segments(self, segs):
+ self._segments = segs
+
@synchronized
def set_committed(self):
- """Set committed flag to False"""
+ """Set committed flag to True"""
self._committed = True
@synchronized
"""Get whether this is committed or not."""
return self._committed
+ @synchronized
+ def add_writer(self, writer):
+ """Add an ArvadosFileWriter reference to the list of writers"""
+ if isinstance(writer, ArvadosFileWriter):
+ self._writers.add(writer)
+
+ @synchronized
+ def remove_writer(self, writer, flush):
+ """
+ Called from ArvadosFileWriter.close(). Remove a writer reference from the list
+ and do some block maintenance tasks.
+ """
+ self._writers.remove(writer)
+
+ if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
+ # File writer closed, not small enough for repacking
+ self.flush()
+ elif self.closed():
+ # All writers closed and size is adequate for repacking
+ self.parent._my_block_manager().repack_small_blocks()
+
+ def closed(self):
+ """
+ Get whether this is closed or not. When the writers list is empty, the file
+ is supposed to be closed.
+ """
+ return len(self._writers) == 0
+
@must_be_writable
@synchronized
def truncate(self, size):
def __init__(self, arvadosfile, mode, num_retries=None):
super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
self.mode = mode
+ self.arvadosfile.add_writer(self)
@_FileLikeObjectBase._before_close
@retry_method
def flush(self):
self.arvadosfile.flush()
- def close(self):
+ def close(self, flush=True):
if not self.closed:
- self.flush()
+ self.arvadosfile.remove_writer(self, flush)
super(ArvadosFileWriter, self).close()
with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
keep_args['-blob-signing-key-file'] = f.name
f.write(blob_signing_key)
- if enforce_permissions:
- keep_args['-enforce-permissions'] = 'true'
+ keep_args['-enforce-permissions'] = str(enforce_permissions).lower()
with open(os.path.join(TEST_TMPDIR, "keep.data-manager-token-file"), "w") as f:
keep_args['-data-manager-token-file'] = f.name
f.write(auth_token('data_manager'))
# One file committed
with c.open("foo.txt", "w") as foo:
foo.write("foo")
+ foo.flush() # Force block commit
f.write("0123456789")
# Other file not committed. Block not written to keep yet.
self.assertEqual(
normalize=False,
only_committed=True),
'. acbd18db4cc2f85cedef654fccc4a4d8+3 0:0:count.txt 0:3:foo.txt\n')
- # And now with the file closed...
+ # And now with the file closed...
+ f.flush() # Force block commit
self.assertEqual(
c._get_manifest_text(".",
strip=False,
only_committed=True),
". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:10:count.txt 10:3:foo.txt\n")
+ def test_only_small_blocks_are_packed_together(self):
+ c = Collection()
+ # Write a couple of small files,
+ f = c.open("count.txt", "w")
+ f.write("0123456789")
+ f.close(flush=False)
+ foo = c.open("foo.txt", "w")
+ foo.write("foo")
+ foo.close(flush=False)
+ # Then, write a big file, it shouldn't be packed with the ones above
+ big = c.open("bigfile.txt", "w")
+ big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
+ big.close(flush=False)
+ self.assertEqual(
+ c.manifest_text("."),
+ '. 2d303c138c118af809f39319e5d507e9+34603008 a8430a058b8fbf408e1931b794dbd6fb+13 0:34603008:bigfile.txt 34603008:10:count.txt 34603018:3:foo.txt\n')
+
class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
uuids: uuid_list)
end
+ def final?
+ [Complete, Cancelled].include?(self.state)
+ end
+
protected
def fill_field_defaults
def handle_completed
# This container is finished so finalize any associated container requests
# that are associated with this container.
- if self.state_changed? and [Complete, Cancelled].include? self.state
+ if self.state_changed? and self.final?
act_as_system_user do
if self.state == Cancelled
# Notify container requests associated with this container
ContainerRequest.where(container_uuid: uuid,
state: ContainerRequest::Committed).each do |cr|
- cr.container_completed!
+ cr.finalize!
end
# Try to cancel any outstanding container requests made by this container.
validate :validate_change
validate :validate_runtime_constraints
after_save :update_priority
+ after_save :finalize_if_needed
before_create :set_requesting_container_uuid
api_accessible :user, extend: :common do |t|
%w(modified_by_client_uuid container_uuid requesting_container_uuid)
end
+ def finalize_if_needed
+ if state == Committed && Container.find_by_uuid(container_uuid).final?
+ reload
+ act_as_system_user do
+ finalize!
+ end
+ end
+ end
+
# Finalize the container request after the container has
# finished/cancelled.
- def container_completed!
- update_attributes!(state: ContainerRequest::Final)
+ def finalize!
+ update_attributes!(state: Final)
c = Container.find_by_uuid(container_uuid)
['output', 'log'].each do |out_type|
pdh = c.send(out_type)
assert_equal prev_container_uuid, cr.container_uuid
end
+ test "Finalize committed request when reusing a finished container" do
+ set_user_from_auth :active
+ cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+ cr.reload
+ assert_equal ContainerRequest::Committed, cr.state
+ act_as_system_user do
+ c = Container.find_by_uuid(cr.container_uuid)
+ c.update_attributes!(state: Container::Locked)
+ c.update_attributes!(state: Container::Running)
+ c.update_attributes!(state: Container::Complete,
+ exit_code: 0,
+ output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
+ log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+ end
+ cr.reload
+ assert_equal ContainerRequest::Final, cr.state
+
+ cr2 = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+ assert_equal cr.container_uuid, cr2.container_uuid
+ assert_equal ContainerRequest::Final, cr2.state
+
+ cr3 = create_minimal_req!(priority: 1, state: ContainerRequest::Uncommitted)
+ assert_equal ContainerRequest::Uncommitted, cr3.state
+ cr3.update_attributes!(state: ContainerRequest::Committed)
+ assert_equal cr.container_uuid, cr3.container_uuid
+ assert_equal ContainerRequest::Final, cr3.state
+ end
end
Description=Arvados git server
Documentation=https://doc.arvados.org/
After=network.target
-AssertPathExists=/etc/arvados/arvados-git-httpd/arvados-git-httpd.yml
+AssertPathExists=/etc/arvados/git-httpd/git-httpd.yml
[Service]
Type=notify
+++ /dev/null
-/*
-arv-git-httpd provides authenticated access to Arvados-hosted git repositories.
-
-See http://doc.arvados.org/install/install-arv-git-httpd.html.
-
-Example:
-
- arv-git-httpd -address=:8000 -repo-root=/var/lib/arvados/git
-
-Options:
-
- -address [host]:[port]
-
-Listen at the given host and port.
-
-Host can be a domain name, an IP address, or empty (listen on all
-addresses).
-
-Port can be a name, a port number, or 0 (choose an available port).
-
- -repo-root path
-
-Directory containing git repositories. When a client requests either
-"foo/bar.git" or "foo/bar/.git", git-http-backend will be invoked on
-"path/foo/bar.git" or (if that doesn't exist) "path/foo/bar/.git".
-
- -git-command path
-
-Location of the CGI program to execute for each authorized request
-(normally this is gitolite-shell if repositories are controlled by
-gitolite, otherwise git). It is invoked with a single argument,
-'http-backend'. Default is /usr/bin/git.
-
-*/
-package main
"net"
"net/http"
"net/http/cgi"
+ "os"
)
// gitHandler is an http.Handler that invokes git-http-backend (or
}
func newGitHandler() http.Handler {
+ const glBypass = "GL_BYPASS_ACCESS_CHECKS"
+ const glHome = "GITOLITE_HTTP_HOME"
+ var env []string
+ path := os.Getenv("PATH")
+ if theConfig.GitoliteHome != "" {
+ env = append(env,
+ glHome+"="+theConfig.GitoliteHome,
+ glBypass+"=1")
+ path = path + ":" + theConfig.GitoliteHome + "/bin"
+ } else if home, bypass := os.Getenv(glHome), os.Getenv(glBypass); home != "" || bypass != "" {
+ env = append(env, glHome+"="+home, glBypass+"="+bypass)
+ log.Printf("DEPRECATED: Passing through %s and %s environment variables. Use GitoliteHome configuration instead.", glHome, glBypass)
+ }
+ env = append(env,
+ "GIT_PROJECT_ROOT="+theConfig.RepoRoot,
+ "GIT_HTTP_EXPORT_ALL=",
+ "SERVER_ADDR="+theConfig.Listen,
+ "PATH="+path)
return &gitHandler{
Handler: cgi.Handler{
Path: theConfig.GitCommand,
Dir: theConfig.RepoRoot,
- Env: []string{
- "GIT_PROJECT_ROOT=" + theConfig.RepoRoot,
- "GIT_HTTP_EXPORT_ALL=",
- "SERVER_ADDR=" + theConfig.Listen,
- },
- InheritEnv: []string{
- "PATH",
- // Needed if GitCommand is gitolite-shell:
- "GITOLITE_HTTP_HOME",
- "GL_BYPASS_ACCESS_CHECKS",
- },
+ Env: env,
Args: []string{"http-backend"},
},
}
"net/http"
"net/http/httptest"
"net/url"
- "os"
"regexp"
check "gopkg.in/check.v1"
type GitHandlerSuite struct{}
func (s *GitHandlerSuite) TestEnvVars(c *check.C) {
+ theConfig = defaultConfig()
+ theConfig.RepoRoot = "/"
+ theConfig.GitoliteHome = "/test/ghh"
+
u, err := url.Parse("git.zzzzz.arvadosapi.com/test")
c.Check(err, check.Equals, nil)
resp := httptest.NewRecorder()
h := newGitHandler()
h.(*gitHandler).Path = "/bin/sh"
h.(*gitHandler).Args = []string{"-c", "printf 'Content-Type: text/plain\r\n\r\n'; env"}
- os.Setenv("GITOLITE_HTTP_HOME", "/test/ghh")
- os.Setenv("GL_BYPASS_ACCESS_CHECKS", "yesplease")
h.ServeHTTP(resp, req)
c.Check(resp.Code, check.Equals, http.StatusOK)
body := resp.Body.String()
+ c.Check(body, check.Matches, `(?ms).*^PATH=.*:/test/ghh/bin$.*`)
c.Check(body, check.Matches, `(?ms).*^GITOLITE_HTTP_HOME=/test/ghh$.*`)
- c.Check(body, check.Matches, `(?ms).*^GL_BYPASS_ACCESS_CHECKS=yesplease$.*`)
+ c.Check(body, check.Matches, `(?ms).*^GL_BYPASS_ACCESS_CHECKS=1$.*`)
c.Check(body, check.Matches, `(?ms).*^REMOTE_HOST=::1$.*`)
c.Check(body, check.Matches, `(?ms).*^REMOTE_PORT=12345$.*`)
c.Check(body, check.Matches, `(?ms).*^SERVER_ADDR=`+regexp.QuoteMeta(theConfig.Listen)+`$.*`)
APIHost: arvadostest.APIHost(),
Insecure: true,
},
- Listen: ":0",
- GitCommand: "/usr/share/gitolite3/gitolite-shell",
- RepoRoot: s.tmpRepoRoot,
+ Listen: ":0",
+ GitCommand: "/usr/share/gitolite3/gitolite-shell",
+ GitoliteHome: s.gitoliteHome,
+ RepoRoot: s.tmpRepoRoot,
}
s.IntegrationSuite.SetUpTest(c)
// (*IntegrationTest)SetUpTest() -- see 2.2.4 at
// http://gitolite.com/gitolite/gitolite.html
runGitolite("gitolite", "setup")
-
- os.Setenv("GITOLITE_HTTP_HOME", s.gitoliteHome)
- os.Setenv("GL_BYPASS_ACCESS_CHECKS", "1")
}
func (s *GitoliteSuite) TearDownTest(c *check.C) {
s.tmpWorkdir = ""
s.Config = nil
+
+ theConfig = defaultConfig()
}
func (s *IntegrationSuite) RunGit(c *check.C, token, gitCmd, repo string, args ...string) error {
// Server configuration
type Config struct {
- Client arvados.Client
- Listen string
- GitCommand string
- RepoRoot string
+ Client arvados.Client
+ Listen string
+ GitCommand string
+ RepoRoot string
+ GitoliteHome string
}
var theConfig = defaultConfig()
func defaultConfig() *Config {
- cwd, err := os.Getwd()
- if err != nil {
- log.Fatalln("Getwd():", err)
- }
return &Config{
Listen: ":80",
GitCommand: "/usr/bin/git",
- RepoRoot: cwd,
+ RepoRoot: "/var/lib/arvados/git/repositories",
}
}
-func init() {
+func main() {
const defaultCfgPath = "/etc/arvados/git-httpd/git-httpd.yml"
const deprecated = " (DEPRECATED -- use config file instead)"
flag.StringVar(&theConfig.Listen, "address", theConfig.Listen,
"Path to git or gitolite-shell executable. Each authenticated request will execute this program with a single argument, \"http-backend\"."+deprecated)
flag.StringVar(&theConfig.RepoRoot, "repo-root", theConfig.RepoRoot,
"Path to git repositories."+deprecated)
+ flag.StringVar(&theConfig.GitoliteHome, "gitolite-home", theConfig.GitoliteHome,
+ "Value for GITOLITE_HTTP_HOME environment variable. If not empty, GL_BYPASS_ACCESS_CHECKS=1 will also be set."+deprecated)
cfgPath := flag.String("config", defaultCfgPath, "Configuration file `path`.")
flag.Usage = usage
log.Print("Current configuration:\n", string(j))
}
}
-}
-func main() {
srv := &server{}
if err := srv.Start(); err != nil {
log.Fatal(err)
+// arvados-git-httpd provides authenticated access to Arvados-hosted
+// git repositories.
+//
+// See http://doc.arvados.org/install/install-arv-git-httpd.html.
package main
import (
- "encoding/json"
"flag"
"fmt"
"os"
+
+ "github.com/ghodss/yaml"
)
func usage() {
c := defaultConfig()
c.Client.APIHost = "zzzzz.arvadosapi.com:443"
- exampleConfigFile, err := json.MarshalIndent(c, " ", " ")
+ exampleConfigFile, err := yaml.Marshal(c)
if err != nil {
panic(err)
}
fmt.Fprintf(os.Stderr, `
-arv-git-httpd provides authenticated access to Arvados-hosted git repositories.
+arvados-git-httpd provides authenticated access to Arvados-hosted git
+repositories.
See http://doc.arvados.org/install/install-arv-git-httpd.html.
-Usage: arv-git-httpd [-config path/to/arv-git-httpd.yml]
+Usage: arvados-git-httpd [-config path/to/arvados/git-httpd.yml]
Options:
`)
flag.PrintDefaults()
fmt.Fprintf(os.Stderr, `
Example config file:
- %s
+
+%s
Client.APIHost:
True if your Arvados API endpoint uses an unverifiable SSL/TLS
certificate.
-Listen:
-
- Local port to listen on. Can be "address:port" or ":port", where
- "address" is a host IP address or name and "port" is a port number
- or name.
-
GitCommand:
Path to git or gitolite-shell executable. Each authenticated
request will execute this program with the single argument
"http-backend".
+GitoliteHome:
+
+ Path to Gitolite's home directory. If a non-empty path is given,
+ the CGI environment will be set up to support the use of
+ gitolite-shell as a GitCommand: for example, if GitoliteHome is
+ "/gh", then the CGI environment will have GITOLITE_HTTP_HOME=/gh,
+ PATH=$PATH:/gh/bin, and GL_BYPASS_ACCESS_CHECKS=1.
+
+Listen:
+
+ Local port to listen on. Can be "address:port" or ":port", where
+ "address" is a host IP address or name and "port" is a port number
+ or name.
+
RepoRoot:
- Path to git repositories. Defaults to current working directory.
+ Path to git repositories.
`, exampleConfigFile)
}
log.Printf("Error creating stdout pipe for squeue: %v", err)
return
}
- cmd.Start()
+ err = cmd.Start()
+ if err != nil {
+ log.Printf("Error running squeue: %v", err)
+ return
+ }
scanner := bufio.NewScanner(sq)
for scanner.Scan() {
newSqueueContents = append(newSqueueContents, scanner.Text())
self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024)
self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024)
+ self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
+
self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
def __enter__(self):
llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
- if self.listen_for_events:
+ if self.listen_for_events and not self.args.disable_event_listening:
self.operations.listen_for_events()
self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
self.llfuse_thread.daemon = True
self.daemon_ctx.open()
# Subscribe to change events from API server
- if self.listen_for_events:
+ if self.listen_for_events and not self.args.disable_event_listening:
self.operations.listen_for_events()
self._llfuse_main()
run_test_server.fixture('users')['active']['uuid'])
self.assertEqual(True, self.mnt.listen_for_events)
+ @noexit
+ @mock.patch('arvados.events.subscribe')
+ def test_disable_event_listening(self, mock_subscribe):
+ args = arvados_fuse.command.ArgumentParser().parse_args([
+ '--disable-event-listening',
+ '--by-id',
+ '--foreground', self.mntdir])
+ self.mnt = arvados_fuse.command.Mount(args)
+ self.assertEqual(True, self.mnt.listen_for_events)
+ self.assertEqual(True, self.mnt.args.disable_event_listening)
+ with self.mnt:
+ pass
+ self.assertEqual(0, mock_subscribe.call_count)
+
@noexit
@mock.patch('arvados.events.subscribe')
def test_custom(self, mock_subscribe):
def setUp(self):
super(TokenExpiryTest, self).setUp(local_store=False)
+ @unittest.skip("bug #10008")
@mock.patch('arvados.keep.KeepClient.get')
def runTest(self, mocked_get):
self.api._rootDesc = {"blobSignatureTtl": 2}
}
type azureVolumeAdder struct {
- *volumeSet
+ *Config
}
-func (s *azureVolumeAdder) Set(containerName string) error {
- if trashLifetime != 0 {
- return ErrNotImplemented
- }
+// String implements flag.Value
+func (s *azureVolumeAdder) String() string {
+ return "-"
+}
- if containerName == "" {
- return errors.New("no container name given")
- }
- if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
- return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
- }
- accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
- if err != nil {
- return err
- }
- azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
- if err != nil {
- return errors.New("creating Azure storage client: " + err.Error())
- }
- if flagSerializeIO {
- log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
- }
- v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
- if err := v.Check(); err != nil {
- return err
- }
- *s.volumeSet = append(*s.volumeSet, v)
+func (s *azureVolumeAdder) Set(containerName string) error {
+ s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
+ ContainerName: containerName,
+ StorageAccountName: azureStorageAccountName,
+ StorageAccountKeyFile: azureStorageAccountKeyFile,
+ AzureReplication: azureStorageReplication,
+ ReadOnly: deprecated.flagReadonly,
+ })
return nil
}
func init() {
- flag.Var(&azureVolumeAdder{&volumes},
+ VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
+
+ flag.Var(&azureVolumeAdder{theConfig},
"azure-storage-container-volume",
"Use the given container as a storage volume. Can be given multiple times.")
flag.StringVar(
&azureStorageAccountKeyFile,
"azure-storage-account-key-file",
"",
- "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+ "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
flag.IntVar(
&azureStorageReplication,
"azure-storage-replication",
// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
// container.
type AzureBlobVolume struct {
- azClient storage.Client
- bsClient storage.BlobStorageClient
- containerName string
- readonly bool
- replication int
+ StorageAccountName string
+ StorageAccountKeyFile string
+ ContainerName string
+ AzureReplication int
+ ReadOnly bool
+
+ azClient storage.Client
+ bsClient storage.BlobStorageClient
}
-// NewAzureBlobVolume returns a new AzureBlobVolume using the given
-// client and container name. The replication argument specifies the
-// replication level to report when writing data.
-func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
- return &AzureBlobVolume{
- azClient: client,
- bsClient: client.GetBlobService(),
- containerName: containerName,
- readonly: readonly,
- replication: replication,
+// Examples implements VolumeWithExamples.
+func (*AzureBlobVolume) Examples() []Volume {
+ return []Volume{
+ &AzureBlobVolume{
+ StorageAccountName: "example-account-name",
+ StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
+ ContainerName: "example-container-name",
+ AzureReplication: 3,
+ },
}
}
-// Check returns nil if the volume is usable.
-func (v *AzureBlobVolume) Check() error {
- ok, err := v.bsClient.ContainerExists(v.containerName)
+// Type implements Volume.
+func (v *AzureBlobVolume) Type() string {
+ return "Azure"
+}
+
+// Start implements Volume.
+func (v *AzureBlobVolume) Start() error {
+ if v.ContainerName == "" {
+ return errors.New("no container name given")
+ }
+ if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
+ return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
+ }
+ accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
+ if err != nil {
+ return err
+ }
+ v.azClient, err = storage.NewBasicClient(v.StorageAccountName, accountKey)
+ if err != nil {
+ return fmt.Errorf("creating Azure storage client: %s", err)
+ }
+ v.bsClient = v.azClient.GetBlobService()
+
+ ok, err := v.bsClient.ContainerExists(v.ContainerName)
if err != nil {
return err
}
if !ok {
- return errors.New("container does not exist")
+ return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
}
return nil
}
// Return true if expires_at metadata attribute is found on the block
func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
- metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+ metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
if err != nil {
return false, metadata, v.translateError(err)
}
if azureMaxGetBytes < BlockSize {
// Unfortunately the handler doesn't tell us how long the blob
// is expected to be, so we have to ask Azure.
- props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
if err != nil {
return 0, v.translateError(err)
}
var rdr io.ReadCloser
var err error
if startPos == 0 && endPos == expectSize {
- rdr, err = v.bsClient.GetBlob(v.containerName, loc)
+ rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
} else {
- rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+ rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
}
if err != nil {
errors[p] = err
if trashed {
return os.ErrNotExist
}
- rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+ rdr, err := v.bsClient.GetBlob(v.ContainerName, loc)
if err != nil {
return v.translateError(err)
}
// Put stores a Keep block as a block blob in the container.
func (v *AzureBlobVolume) Put(loc string, block []byte) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
- return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
+ return v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
}
// Touch updates the last-modified property of a block blob.
func (v *AzureBlobVolume) Touch(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
trashed, metadata, err := v.checkTrashed(loc)
}
metadata["touch"] = fmt.Sprintf("%d", time.Now())
- return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+ return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
}
// Mtime returns the last-modified property of a block blob.
return time.Time{}, os.ErrNotExist
}
- props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
if err != nil {
return time.Time{}, err
}
Include: "metadata",
}
for {
- resp, err := v.bsClient.ListBlobs(v.containerName, params)
+ resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
if err != nil {
return err
}
// Trash a Keep block.
func (v *AzureBlobVolume) Trash(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
// we get the Etag before checking Mtime, and use If-Match to
// ensure we don't delete data if Put() or Touch() happens
// between our calls to Mtime() and DeleteBlob().
- props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
if err != nil {
return err
}
if t, err := v.Mtime(loc); err != nil {
return err
- } else if time.Since(t) < blobSignatureTTL {
+ } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
return nil
}
- // If trashLifetime == 0, just delete it
- if trashLifetime == 0 {
- return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+ // If TrashLifetime == 0, just delete it
+ if theConfig.TrashLifetime == 0 {
+ return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
"If-Match": props.Etag,
})
}
// Otherwise, mark as trash
- return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
- "expires_at": fmt.Sprintf("%d", time.Now().Add(trashLifetime).Unix()),
+ return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
+ "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
}, map[string]string{
"If-Match": props.Etag,
})
// Delete the expires_at metadata attribute
func (v *AzureBlobVolume) Untrash(loc string) error {
// if expires_at does not exist, return NotFoundError
- metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+ metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
if err != nil {
return v.translateError(err)
}
// reset expires_at metadata attribute
metadata["expires_at"] = ""
- err = v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+ err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
return v.translateError(err)
}
// String returns a volume label, including the container name.
func (v *AzureBlobVolume) String() string {
- return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
+ return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
}
// Writable returns true, unless the -readonly flag was on when the
// volume was added.
func (v *AzureBlobVolume) Writable() bool {
- return !v.readonly
+ return !v.ReadOnly
}
// Replication returns the replication level of the container, as
// specified by the -azure-storage-replication argument.
func (v *AzureBlobVolume) Replication() int {
- return v.replication
+ return v.AzureReplication
}
// If possible, translate an Azure SDK error to a recognizable error
return keepBlockRegexp.MatchString(s)
}
-// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
// and deletes them from the volume.
func (v *AzureBlobVolume) EmptyTrash() {
var bytesDeleted, bytesInTrash int64
params := storage.ListBlobsParameters{Include: "metadata"}
for {
- resp, err := v.bsClient.ListBlobs(v.containerName, params)
+ resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
if err != nil {
log.Printf("EmptyTrash: ListBlobs: %v", err)
break
continue
}
- err = v.bsClient.DeleteBlob(v.containerName, b.Name, map[string]string{
+ err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
"If-Match": b.Properties.Etag,
})
if err != nil {
}
}
- v := NewAzureBlobVolume(azClient, container, readonly, replication)
+ v := &AzureBlobVolume{
+ ContainerName: container,
+ ReadOnly: readonly,
+ AzureReplication: replication,
+ azClient: azClient,
+ bsClient: azClient.GetBlobService(),
+ }
return &TestableAzureBlobVolume{
AzureBlobVolume: v,
}
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
- v.azHandler.PutRaw(v.containerName, locator, data)
+ v.azHandler.PutRaw(v.ContainerName, locator, data)
}
func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
- v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
+ v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
}
func (v *TestableAzureBlobVolume) Teardown() {
// Initialize a default-sized buffer pool for the benefit of test
// suites that don't run main().
func init() {
- bufs = newBufferPool(maxBuffers, BlockSize)
+ bufs = newBufferPool(theConfig.MaxBuffers, BlockSize)
}
// Restore sane default after bufferpool's own tests
func (s *BufferPoolSuite) TearDownTest(c *C) {
- bufs = newBufferPool(maxBuffers, BlockSize)
+ bufs = newBufferPool(theConfig.MaxBuffers, BlockSize)
}
func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
--- /dev/null
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type Config struct {
+ Listen string
+
+ PIDFile string
+
+ MaxBuffers int
+ MaxRequests int
+
+ BlobSignatureTTL arvados.Duration
+ BlobSigningKeyFile string
+ RequireSignatures bool
+ SystemAuthTokenFile string
+ EnableDelete bool
+ TrashLifetime arvados.Duration
+ TrashCheckInterval arvados.Duration
+
+ Volumes VolumeList
+
+ blobSigningKey []byte
+ systemAuthToken string
+}
+
+var theConfig = DefaultConfig()
+
+// DefaultConfig returns the default configuration.
+func DefaultConfig() *Config {
+ return &Config{
+ Listen: ":25107",
+ MaxBuffers: 128,
+ RequireSignatures: true,
+ BlobSignatureTTL: arvados.Duration(14 * 24 * time.Hour),
+ TrashLifetime: arvados.Duration(14 * 24 * time.Hour),
+ TrashCheckInterval: arvados.Duration(24 * time.Hour),
+ Volumes: []Volume{},
+ }
+}
+
+// Start should be called exactly once: after setting all public
+// fields, and before using the config.
+func (cfg *Config) Start() error {
+ if cfg.MaxBuffers < 0 {
+ return fmt.Errorf("MaxBuffers must be greater than zero")
+ }
+ bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
+
+ if cfg.MaxRequests < 1 {
+ cfg.MaxRequests = cfg.MaxBuffers * 2
+ log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
+ }
+
+ if cfg.BlobSigningKeyFile != "" {
+ buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
+ if err != nil {
+ return fmt.Errorf("reading blob signing key file: %s", err)
+ }
+ cfg.blobSigningKey = bytes.TrimSpace(buf)
+ if len(cfg.blobSigningKey) == 0 {
+ return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
+ }
+ } else if cfg.RequireSignatures {
+ return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
+ } else {
+ log.Println("Running without a blob signing key. Block locators " +
+ "returned by this server will not be signed, and will be rejected " +
+ "by a server that enforces permissions.")
+ log.Println("To fix this, use the BlobSigningKeyFile config entry.")
+ }
+
+ if fn := cfg.SystemAuthTokenFile; fn != "" {
+ buf, err := ioutil.ReadFile(fn)
+ if err != nil {
+ return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
+ }
+ cfg.systemAuthToken = strings.TrimSpace(string(buf))
+ }
+
+ if cfg.EnableDelete {
+ log.Print("Trash/delete features are enabled. WARNING: this has not " +
+ "been extensively tested. You should disable this unless you can afford to lose data.")
+ }
+
+ if len(cfg.Volumes) == 0 {
+ if (&unixVolumeAdder{cfg}).Discover() == 0 {
+ return fmt.Errorf("no volumes found")
+ }
+ }
+ for _, v := range cfg.Volumes {
+ if err := v.Start(); err != nil {
+ return fmt.Errorf("volume %s: %s", v, err)
+ }
+ log.Printf("Using volume %v (writable=%v)", v, v.Writable())
+ }
+ return nil
+}
+
+// VolumeTypes is built up by init() funcs in the source files that
+// define the volume types.
+var VolumeTypes = []func() VolumeWithExamples{}
+
+type VolumeList []Volume
+
+// UnmarshalJSON, given an array of objects, deserializes each object
+// as the volume type indicated by the object's Type field.
+func (vols *VolumeList) UnmarshalJSON(data []byte) error {
+ typeMap := map[string]func() VolumeWithExamples{}
+ for _, factory := range VolumeTypes {
+ t := factory().Type()
+ if _, ok := typeMap[t]; ok {
+ log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
+ }
+ typeMap[t] = factory
+ }
+
+ var mapList []map[string]interface{}
+ err := json.Unmarshal(data, &mapList)
+ if err != nil {
+ return err
+ }
+ for _, mapIn := range mapList {
+ typeIn, ok := mapIn["Type"].(string)
+ if !ok {
+ return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
+ }
+ factory, ok := typeMap[typeIn]
+ if !ok {
+ return fmt.Errorf("unsupported volume type %+q", typeIn)
+ }
+ data, err := json.Marshal(mapIn)
+ if err != nil {
+ return err
+ }
+ vol := factory()
+ err = json.Unmarshal(data, vol)
+ if err != nil {
+ return err
+ }
+ *vols = append(*vols, vol)
+ }
+ return nil
+}
+
+// MarshalJSON adds a "Type" field to each volume corresponding to its
+// Type().
+func (vl *VolumeList) MarshalJSON() ([]byte, error) {
+ data := []byte{'['}
+ for _, vs := range *vl {
+ j, err := json.Marshal(vs)
+ if err != nil {
+ return nil, err
+ }
+ if len(data) > 1 {
+ data = append(data, byte(','))
+ }
+ t, err := json.Marshal(vs.Type())
+ if err != nil {
+ panic(err)
+ }
+ data = append(data, j[0])
+ data = append(data, []byte(`"Type":`)...)
+ data = append(data, t...)
+ data = append(data, byte(','))
+ data = append(data, j[1:]...)
+ }
+ return append(data, byte(']')), nil
+}
--- /dev/null
+package main
+
+import (
+ "flag"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type deprecatedOptions struct {
+ flagSerializeIO bool
+ flagReadonly bool
+ neverDelete bool
+ signatureTTLSeconds int
+}
+
+var deprecated = deprecatedOptions{
+ neverDelete: !theConfig.EnableDelete,
+ signatureTTLSeconds: int(theConfig.BlobSignatureTTL.Duration() / time.Second),
+}
+
+func (depr *deprecatedOptions) beforeFlagParse(cfg *Config) {
+ flag.StringVar(&cfg.Listen, "listen", cfg.Listen, "see Listen configuration")
+ flag.IntVar(&cfg.MaxBuffers, "max-buffers", cfg.MaxBuffers, "see MaxBuffers configuration")
+ flag.IntVar(&cfg.MaxRequests, "max-requests", cfg.MaxRequests, "see MaxRequests configuration")
+ flag.BoolVar(&depr.neverDelete, "never-delete", depr.neverDelete, "see EnableDelete configuration")
+ flag.BoolVar(&cfg.RequireSignatures, "enforce-permissions", cfg.RequireSignatures, "see RequireSignatures configuration")
+ flag.StringVar(&cfg.BlobSigningKeyFile, "permission-key-file", cfg.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
+ flag.StringVar(&cfg.BlobSigningKeyFile, "blob-signing-key-file", cfg.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
+ flag.StringVar(&cfg.SystemAuthTokenFile, "data-manager-token-file", cfg.SystemAuthTokenFile, "see SystemAuthToken`File` configuration")
+ flag.IntVar(&depr.signatureTTLSeconds, "permission-ttl", depr.signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
+ flag.IntVar(&depr.signatureTTLSeconds, "blob-signature-ttl", depr.signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
+ flag.Var(&cfg.TrashLifetime, "trash-lifetime", "see TrashLifetime configuration")
+ flag.BoolVar(&depr.flagSerializeIO, "serialize", depr.flagSerializeIO, "serialize read and write operations on the following volumes.")
+ flag.BoolVar(&depr.flagReadonly, "readonly", depr.flagReadonly, "do not write, delete, or touch anything on the following volumes.")
+ flag.StringVar(&cfg.PIDFile, "pid", cfg.PIDFile, "see `PIDFile` configuration")
+ flag.Var(&cfg.TrashCheckInterval, "trash-check-interval", "see TrashCheckInterval configuration")
+}
+
+func (depr *deprecatedOptions) afterFlagParse(cfg *Config) {
+ cfg.BlobSignatureTTL = arvados.Duration(depr.signatureTTLSeconds) * arvados.Duration(time.Second)
+ cfg.EnableDelete = !depr.neverDelete
+}
"strings"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
// A RequestTester represents the parameters for an HTTP request to
// Create locators for testing.
// Turn on permission settings so we can generate signed locators.
- enforcePermissions = true
- PermissionSecret = []byte(knownKey)
- blobSignatureTTL = 300 * time.Second
+ theConfig.RequireSignatures = true
+ theConfig.blobSigningKey = []byte(knownKey)
+ theConfig.BlobSignatureTTL.Set("5m")
var (
unsignedLocator = "/" + TestHash
- validTimestamp = time.Now().Add(blobSignatureTTL)
+ validTimestamp = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
expiredTimestamp = time.Now().Add(-time.Hour)
signedLocator = "/" + SignLocator(TestHash, knownToken, validTimestamp)
expiredLocator = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
// -----------------
// Test unauthenticated request with permissions off.
- enforcePermissions = false
+ theConfig.RequireSignatures = false
// Unauthenticated request, unsigned locator
// => OK
// ----------------
// Permissions: on.
- enforcePermissions = true
+ theConfig.RequireSignatures = true
// Authenticated request, signed locator
// => OK
// ------------------
// With a server key.
- PermissionSecret = []byte(knownKey)
- blobSignatureTTL = 300 * time.Second
+ theConfig.blobSigningKey = []byte(knownKey)
+ theConfig.BlobSignatureTTL.Set("5m")
// When a permission key is available, the locator returned
// from an authenticated PUT request will be signed.
func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
defer teardown()
- dataManagerToken = "fake-data-manager-token"
+ theConfig.systemAuthToken = "fake-data-manager-token"
vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
vols[0].Readonly = true
KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
requestBody: TestBlock,
})
defer func(orig bool) {
- neverDelete = orig
- }(neverDelete)
- neverDelete = false
+ theConfig.EnableDelete = orig
+ }(theConfig.EnableDelete)
+ theConfig.EnableDelete = true
IssueRequest(
&RequestTester{
method: "DELETE",
uri: "/" + TestHash,
requestBody: TestBlock,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
})
type expect struct {
volnum int
// - authenticated /index/prefix request | superuser
//
// The only /index requests that should succeed are those issued by the
-// superuser. They should pass regardless of the value of enforcePermissions.
+// superuser. They should pass regardless of the value of RequireSignatures.
//
func TestIndexHandler(t *testing.T) {
defer teardown()
vols[0].Put(TestHash+".meta", []byte("metadata"))
vols[1].Put(TestHash2+".meta", []byte("metadata"))
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
unauthenticatedReq := &RequestTester{
method: "GET",
superuserReq := &RequestTester{
method: "GET",
uri: "/index",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
unauthPrefixReq := &RequestTester{
method: "GET",
superuserPrefixReq := &RequestTester{
method: "GET",
uri: "/index/" + TestHash[0:3],
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
superuserNoSuchPrefixReq := &RequestTester{
method: "GET",
uri: "/index/abcd",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
superuserInvalidPrefixReq := &RequestTester{
method: "GET",
uri: "/index/xyz",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
// -------------------------------------------------------------
// Only the superuser should be allowed to issue /index requests.
// ---------------------------
- // enforcePermissions enabled
+ // RequireSignatures enabled
// This setting should not affect tests passing.
- enforcePermissions = true
+ theConfig.RequireSignatures = true
// unauthenticated /index request
// => UnauthorizedError
response := IssueRequest(unauthenticatedReq)
ExpectStatusCode(t,
- "enforcePermissions on, unauthenticated request",
+ "RequireSignatures on, unauthenticated request",
UnauthorizedError.HTTPCode,
response)
response)
// ----------------------------
- // enforcePermissions disabled
+ // RequireSignatures disabled
// Valid Request should still pass.
- enforcePermissions = false
+ theConfig.RequireSignatures = false
// superuser /index request
// => OK
vols := KeepVM.AllWritable()
vols[0].Put(TestHash, TestBlock)
- // Explicitly set the blobSignatureTTL to 0 for these
+ // Explicitly set the BlobSignatureTTL to 0 for these
// tests, to ensure the MockVolume deletes the blocks
// even though they have just been created.
- blobSignatureTTL = time.Duration(0)
+ theConfig.BlobSignatureTTL = arvados.Duration(0)
var userToken = "NOT DATA MANAGER TOKEN"
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
- neverDelete = false
+ theConfig.EnableDelete = true
unauthReq := &RequestTester{
method: "DELETE",
superuserExistingBlockReq := &RequestTester{
method: "DELETE",
uri: "/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
superuserNonexistentBlockReq := &RequestTester{
method: "DELETE",
uri: "/" + TestHash2,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
// Unauthenticated request returns PermissionError.
http.StatusNotFound,
response)
- // Authenticated admin request for existing block while neverDelete is set.
- neverDelete = true
+ // Authenticated admin request for existing block while EnableDelete is false.
+ theConfig.EnableDelete = false
response = IssueRequest(superuserExistingBlockReq)
ExpectStatusCode(t,
"authenticated request, existing block, method disabled",
MethodDisabledError.HTTPCode,
response)
- neverDelete = false
+ theConfig.EnableDelete = true
// Authenticated admin request for existing block.
response = IssueRequest(superuserExistingBlockReq)
t.Error("superuserExistingBlockReq: block not deleted")
}
- // A DELETE request on a block newer than blobSignatureTTL
+ // A DELETE request on a block newer than BlobSignatureTTL
// should return success but leave the block on the volume.
vols[0].Put(TestHash, TestBlock)
- blobSignatureTTL = time.Hour
+ theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
response = IssueRequest(superuserExistingBlockReq)
ExpectStatusCode(t,
defer teardown()
var userToken = "USER TOKEN"
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
pullq = NewWorkQueue()
},
{
"Valid pull request from the data manager",
- RequestTester{"/pull", dataManagerToken, "PUT", goodJSON},
+ RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
http.StatusOK,
"Received 3 pull requests\n",
},
{
"Invalid pull request from the data manager",
- RequestTester{"/pull", dataManagerToken, "PUT", badJSON},
+ RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
http.StatusBadRequest,
"",
},
defer teardown()
var userToken = "USER TOKEN"
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
trashq = NewWorkQueue()
},
{
"Valid trash list from the data manager",
- RequestTester{"/trash", dataManagerToken, "PUT", goodJSON},
+ RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
http.StatusOK,
"Received 3 trash requests\n",
},
{
"Invalid trash list from the data manager",
- RequestTester{"/trash", dataManagerToken, "PUT", badJSON},
+ RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
http.StatusBadRequest,
"",
},
select {
case <-ok:
case <-time.After(time.Second):
- t.Fatal("PUT deadlocks with maxBuffers==1")
+ t.Fatal("PUT deadlocks with MaxBuffers==1")
}
}
ok := make(chan bool)
go func() {
- for i := 0; i < maxBuffers+1; i++ {
+ for i := 0; i < theConfig.MaxBuffers+1; i++ {
// Unauthenticated request, no server key
// => OK (unsigned response)
unsignedLocator := "/" + TestHash
func TestGetHandlerClientDisconnect(t *testing.T) {
defer func(was bool) {
- enforcePermissions = was
- }(enforcePermissions)
- enforcePermissions = false
+ theConfig.RequireSignatures = was
+ }(theConfig.RequireSignatures)
+ theConfig.RequireSignatures = false
defer func(orig *bufferPool) {
bufs = orig
// Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
// leak.
-func TestGetHandlerNoBufferleak(t *testing.T) {
+func TestGetHandlerNoBufferLeak(t *testing.T) {
defer teardown()
// Prepare two test Keep volumes. Our block is stored on the second volume.
ok := make(chan bool)
go func() {
- for i := 0; i < maxBuffers+1; i++ {
+ for i := 0; i < theConfig.MaxBuffers+1; i++ {
// Unauthenticated request, unsigned locator
// => OK
unsignedLocator := "/" + TestHash
vols := KeepVM.AllWritable()
vols[0].Put(TestHash, TestBlock)
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
// unauthenticatedReq => UnauthorizedError
unauthenticatedReq := &RequestTester{
datamanagerWithBadHashReq := &RequestTester{
method: "PUT",
uri: "/untrash/thisisnotalocator",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response = IssueRequest(datamanagerWithBadHashReq)
ExpectStatusCode(t,
datamanagerWrongMethodReq := &RequestTester{
method: "GET",
uri: "/untrash/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response = IssueRequest(datamanagerWrongMethodReq)
ExpectStatusCode(t,
datamanagerReq := &RequestTester{
method: "PUT",
uri: "/untrash/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response = IssueRequest(datamanagerReq)
ExpectStatusCode(t,
KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
defer KeepVM.Close()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
// datamanagerReq => StatusOK
datamanagerReq := &RequestTester{
method: "PUT",
uri: "/untrash/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response := IssueRequest(datamanagerReq)
ExpectStatusCode(t,
// GetBlockHandler is a HandleFunc to address Get block requests.
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
- if enforcePermissions {
+ if theConfig.RequireSignatures {
locator := req.URL.Path[1:] // strip leading slash
if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
// return it to the client.
returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
apiToken := GetAPIToken(req)
- if PermissionSecret != nil && apiToken != "" {
- expiry := time.Now().Add(blobSignatureTTL)
+ if theConfig.blobSigningKey != nil && apiToken != "" {
+ expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
returnHash = SignLocator(returnHash, apiToken, expiry)
}
resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
// IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
func IndexHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
return
}
- if neverDelete {
+ if !theConfig.EnableDelete {
http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
return
}
// PullHandler processes "PUT /pull" requests for the data manager.
func PullHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
// TrashHandler processes /trash requests.
func TrashHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
}
// Blocks may be deleted only when Keep has been configured with a
// data manager.
- if IsDataManagerToken(apiToken) {
+ if IsSystemAuth(apiToken) {
return true
}
// TODO(twp): look up apiToken with the API server
return false
}
-// IsDataManagerToken returns true if apiToken represents the data
-// manager's token.
-func IsDataManagerToken(apiToken string) bool {
- return dataManagerToken != "" && apiToken == dataManagerToken
+// IsSystemAuth returns true if the given token is allowed to perform
+// system level actions like deleting data.
+func IsSystemAuth(token string) bool {
+ return token != "" && token == theConfig.systemAuthToken
}
package main
import (
- "bytes"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
- "strings"
"syscall"
"time"
-)
-// ======================
-// Configuration settings
-//
-// TODO(twp): make all of these configurable via command line flags
-// and/or configuration file settings.
-
-// Default TCP address on which to listen for requests.
-// Initialized by the --listen flag.
-const DefaultAddr = ":25107"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/coreos/go-systemd/daemon"
+ "github.com/ghodss/yaml"
+)
// A Keep "block" is 64MB.
const BlockSize = 64 * 1024 * 1024
// ProcMounts /proc/mounts
var ProcMounts = "/proc/mounts"
-// enforcePermissions controls whether permission signatures
-// should be enforced (affecting GET and DELETE requests).
-// Initialized by the -enforce-permissions flag.
-var enforcePermissions bool
-
-// blobSignatureTTL is the time duration for which new permission
-// signatures (returned by PUT requests) will be valid.
-// Initialized by the -permission-ttl flag.
-var blobSignatureTTL time.Duration
-
-// dataManagerToken represents the API token used by the
-// Data Manager, and is required on certain privileged operations.
-// Initialized by the -data-manager-token-file flag.
-var dataManagerToken string
-
-// neverDelete can be used to prevent the DELETE handler from
-// actually deleting anything.
-var neverDelete = true
-
-// trashLifetime is the time duration after a block is trashed
-// during which it can be recovered using an /untrash request
-// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
-var trashLifetime time.Duration
-
-// trashCheckInterval is the time duration at which the emptyTrash goroutine
-// will check and delete expired trashed blocks. Default is one day.
-// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
-var trashCheckInterval time.Duration
-
-var maxBuffers = 128
var bufs *bufferPool
// KeepError types.
var pullq *WorkQueue
var trashq *WorkQueue
-type volumeSet []Volume
-
-var (
- flagSerializeIO bool
- flagReadonly bool
- volumes volumeSet
-)
-
-func (vs *volumeSet) String() string {
- return fmt.Sprintf("%+v", (*vs)[:])
-}
-
-// TODO(twp): continue moving as much code as possible out of main
-// so it can be effectively tested. Esp. handling and postprocessing
-// of command line flags (identifying Keep volumes and initializing
-// permission arguments).
-
func main() {
- log.Println("keepstore starting, pid", os.Getpid())
- defer log.Println("keepstore exiting, pid", os.Getpid())
+ deprecated.beforeFlagParse(theConfig)
- var (
- dataManagerTokenFile string
- listen string
- blobSigningKeyFile string
- permissionTTLSec int
- pidfile string
- maxRequests int
- )
- flag.StringVar(
- &dataManagerTokenFile,
- "data-manager-token-file",
- "",
- "File with the API token used by the Data Manager. All DELETE "+
- "requests or GET /index requests must carry this token.")
- flag.BoolVar(
- &enforcePermissions,
- "enforce-permissions",
- false,
- "Enforce permission signatures on requests.")
- flag.StringVar(
- &listen,
- "listen",
- DefaultAddr,
- "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
- flag.IntVar(
- &maxRequests,
- "max-requests",
- 0,
- "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
- flag.BoolVar(
- &neverDelete,
- "never-delete",
- true,
- "If true, nothing will be deleted. "+
- "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
- "You should leave this option alone unless you can afford to lose data.")
- flag.StringVar(
- &blobSigningKeyFile,
- "permission-key-file",
- "",
- "Synonym for -blob-signing-key-file.")
- flag.StringVar(
- &blobSigningKeyFile,
- "blob-signing-key-file",
- "",
- "File containing the secret key for generating and verifying "+
- "blob permission signatures.")
- flag.IntVar(
- &permissionTTLSec,
- "permission-ttl",
- 0,
- "Synonym for -blob-signature-ttl.")
- flag.IntVar(
- &permissionTTLSec,
- "blob-signature-ttl",
- 2*7*24*3600,
- "Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. "+
- "See services/api/config/application.default.yml.")
- flag.BoolVar(
- &flagSerializeIO,
- "serialize",
- false,
- "Serialize read and write operations on the following volumes.")
- flag.BoolVar(
- &flagReadonly,
- "readonly",
- false,
- "Do not write, delete, or touch anything on the following volumes.")
- flag.StringVar(
- &pidfile,
- "pid",
- "",
- "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
- flag.IntVar(
- &maxBuffers,
- "max-buffers",
- maxBuffers,
- fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
- flag.DurationVar(
- &trashLifetime,
- "trash-lifetime",
- 0,
- "Time duration after a block is trashed during which it can be recovered using an /untrash request")
- flag.DurationVar(
- &trashCheckInterval,
- "trash-check-interval",
- 24*time.Hour,
- "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
+ dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+ defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
+ var configPath string
+ flag.StringVar(
+ &configPath,
+ "config",
+ defaultConfigPath,
+ "YAML or JSON configuration file `path`")
+ flag.Usage = usage
flag.Parse()
- if maxBuffers < 0 {
- log.Fatal("-max-buffers must be greater than zero.")
+ deprecated.afterFlagParse(theConfig)
+
+ err := config.LoadFile(theConfig, configPath)
+ if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
+ log.Fatal(err)
+ }
+
+ if *dumpConfig {
+ y, err := yaml.Marshal(theConfig)
+ if err != nil {
+ log.Fatal(err)
+ }
+ os.Stdout.Write(y)
+ os.Exit(0)
}
- bufs = newBufferPool(maxBuffers, BlockSize)
- if pidfile != "" {
+ err = theConfig.Start()
+
+ if pidfile := theConfig.PIDFile; pidfile != "" {
f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
if err != nil {
log.Fatalf("open pidfile (%s): %s", pidfile, err)
}
+ defer f.Close()
err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
log.Fatalf("flock pidfile (%s): %s", pidfile, err)
}
+ defer os.Remove(pidfile)
err = f.Truncate(0)
if err != nil {
log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
if err != nil {
log.Fatalf("sync pidfile (%s): %s", pidfile, err)
}
- defer f.Close()
- defer os.Remove(pidfile)
- }
-
- if len(volumes) == 0 {
- if (&unixVolumeAdder{&volumes}).Discover() == 0 {
- log.Fatal("No volumes found.")
- }
- }
-
- for _, v := range volumes {
- log.Printf("Using volume %v (writable=%v)", v, v.Writable())
}
- // Initialize data manager token and permission key.
- // If these tokens are specified but cannot be read,
- // raise a fatal error.
- if dataManagerTokenFile != "" {
- if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
- dataManagerToken = strings.TrimSpace(string(buf))
- } else {
- log.Fatalf("reading data manager token: %s\n", err)
- }
- }
-
- if neverDelete != true {
- log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
- "been extensively tested. You should leave this option alone unless you can afford to lose data.")
- }
-
- if blobSigningKeyFile != "" {
- if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
- PermissionSecret = bytes.TrimSpace(buf)
- } else {
- log.Fatalf("reading permission key: %s\n", err)
- }
- }
-
- blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
-
- if PermissionSecret == nil {
- if enforcePermissions {
- log.Fatal("-enforce-permissions requires a permission key")
- } else {
- log.Println("Running without a PermissionSecret. Block locators " +
- "returned by this server will not be signed, and will be rejected " +
- "by a server that enforces permissions.")
- log.Println("To fix this, use the -blob-signing-key-file flag " +
- "to specify the file containing the permission key.")
- }
- }
-
- if maxRequests <= 0 {
- maxRequests = maxBuffers * 2
- log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
- }
+ log.Println("keepstore starting, pid", os.Getpid())
+ defer log.Println("keepstore exiting, pid", os.Getpid())
// Start a round-robin VolumeManager with the volumes we have found.
- KeepVM = MakeRRVolumeManager(volumes)
+ KeepVM = MakeRRVolumeManager(theConfig.Volumes)
- // Middleware stack: logger, maxRequests limiter, method handlers
+ // Middleware stack: logger, MaxRequests limiter, method handlers
http.Handle("/", &LoggingRESTRouter{
- httpserver.NewRequestLimiter(maxRequests,
+ httpserver.NewRequestLimiter(theConfig.MaxRequests,
MakeRESTRouter()),
})
// Set up a TCP listener.
- listener, err := net.Listen("tcp", listen)
+ listener, err := net.Listen("tcp", theConfig.Listen)
if err != nil {
log.Fatal(err)
}
// Start emptyTrash goroutine
doneEmptyingTrash := make(chan bool)
- go emptyTrash(doneEmptyingTrash, trashCheckInterval)
+ go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, syscall.SIGINT)
- log.Println("listening at", listen)
- srv := &http.Server{Addr: listen}
+ if _, err := daemon.SdNotify("READY=1"); err != nil {
+ log.Printf("Error notifying init daemon: %v", err)
+ }
+ log.Println("listening at", listener.Addr())
+ srv := &http.Server{}
srv.Serve(listener)
}
-// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
-func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
- ticker := time.NewTicker(trashCheckInterval)
+// Periodically (once per interval) invoke EmptyTrash on all volumes.
+func emptyTrash(done <-chan bool, interval time.Duration) {
+ ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
- for _, v := range volumes {
+ for _, v := range theConfig.Volumes {
if v.Writable() {
v.EmptyTrash()
}
}
- case <-doneEmptyingTrash:
+ case <-done:
ticker.Stop()
return
}
--- /dev/null
+[Unit]
+Description=Arvados Keep Storage Daemon
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/keepstore/keepstore.yml
+
+[Service]
+Type=notify
+ExecStart=/usr/bin/keepstore
+Restart=always
+
+[Install]
+WantedBy=multi-user.target
f.Close()
ProcMounts = f.Name()
- resultVols := volumeSet{}
- added := (&unixVolumeAdder{&resultVols}).Discover()
+ cfg := &Config{}
+ added := (&unixVolumeAdder{cfg}).Discover()
- if added != len(resultVols) {
+ if added != len(cfg.Volumes) {
t.Errorf("Discover returned %d, but added %d volumes",
- added, len(resultVols))
+ added, len(cfg.Volumes))
}
if added != len(tempVols) {
t.Errorf("Discover returned %d but we set up %d volumes",
added, len(tempVols))
}
for i, tmpdir := range tempVols {
- if tmpdir != resultVols[i].(*UnixVolume).root {
+ if tmpdir != cfg.Volumes[i].(*UnixVolume).Root {
t.Errorf("Discover returned %s, expected %s\n",
- resultVols[i].(*UnixVolume).root, tmpdir)
+ cfg.Volumes[i].(*UnixVolume).Root, tmpdir)
}
- if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
+ if expectReadonly := i%2 == 1; expectReadonly != cfg.Volumes[i].(*UnixVolume).ReadOnly {
t.Errorf("Discover added %s with readonly=%v, should be %v",
tmpdir, !expectReadonly, expectReadonly)
}
f.Close()
ProcMounts = f.Name()
- resultVols := volumeSet{}
- added := (&unixVolumeAdder{&resultVols}).Discover()
- if added != 0 || len(resultVols) != 0 {
- t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
+ cfg := &Config{}
+ added := (&unixVolumeAdder{cfg}).Discover()
+ if added != 0 || len(cfg.Volumes) != 0 {
+ t.Fatalf("got %d, %v; expected 0, []", added, cfg.Volumes)
}
}
// teardown cleans up after each test.
func teardown() {
- dataManagerToken = ""
- enforcePermissions = false
- PermissionSecret = nil
+ theConfig.systemAuthToken = ""
+ theConfig.RequireSignatures = false
+ theConfig.blobSigningKey = nil
KeepVM = nil
}
"time"
)
-// The PermissionSecret is the secret key used to generate SHA1
-// digests for permission hints. apiserver and Keep must use the same
-// key.
-var PermissionSecret []byte
-
// SignLocator takes a blobLocator, an apiToken and an expiry time, and
// returns a signed locator string.
func SignLocator(blobLocator, apiToken string, expiry time.Time) string {
- return keepclient.SignLocator(blobLocator, apiToken, expiry, blobSignatureTTL, PermissionSecret)
+ return keepclient.SignLocator(blobLocator, apiToken, expiry, theConfig.BlobSignatureTTL.Duration(), theConfig.blobSigningKey)
}
// VerifySignature returns nil if the signature on the signedLocator
// something the client could have figured out independently) or
// PermissionError.
func VerifySignature(signedLocator, apiToken string) error {
- err := keepclient.VerifySignature(signedLocator, apiToken, blobSignatureTTL, PermissionSecret)
+ err := keepclient.VerifySignature(signedLocator, apiToken, theConfig.BlobSignatureTTL.Duration(), theConfig.blobSigningKey)
if err == keepclient.ErrSignatureExpired {
return ExpiredError
} else if err != nil {
"strconv"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
const (
"gokee3eamvjy8qq1fvy238838enjmy5wzy2md7yvsitp5vztft6j4q866efym7e6" +
"vu5wm9fpnwjyxfldw3vbo01mgjs75rgo7qioh8z8ij7jpyp8508okhgbbex3ceei" +
"786u5rw2a9gx743dj3fgq2irk"
- knownSignatureTTL = 1209600 * time.Second
+ knownSignatureTTL = arvados.Duration(24 * 14 * time.Hour)
knownSignature = "89118b78732c33104a4d6231e8b5a5fa1e4301e3"
knownTimestamp = "7fffffff"
knownSigHint = "+A" + knownSignature + "@" + knownTimestamp
func TestSignLocator(t *testing.T) {
defer func(b []byte) {
- PermissionSecret = b
- }(PermissionSecret)
+ theConfig.blobSigningKey = b
+ }(theConfig.blobSigningKey)
tsInt, err := strconv.ParseInt(knownTimestamp, 16, 0)
if err != nil {
}
t0 := time.Unix(tsInt, 0)
- blobSignatureTTL = knownSignatureTTL
+ theConfig.BlobSignatureTTL = knownSignatureTTL
- PermissionSecret = []byte(knownKey)
+ theConfig.blobSigningKey = []byte(knownKey)
if x := SignLocator(knownLocator, knownToken, t0); x != knownSignedLocator {
t.Fatalf("Got %+q, expected %+q", x, knownSignedLocator)
}
- PermissionSecret = []byte("arbitrarykey")
+ theConfig.blobSigningKey = []byte("arbitrarykey")
if x := SignLocator(knownLocator, knownToken, t0); x == knownSignedLocator {
- t.Fatalf("Got same signature %+q, even though PermissionSecret changed", x)
+ t.Fatalf("Got same signature %+q, even though blobSigningKey changed", x)
}
}
func TestVerifyLocator(t *testing.T) {
defer func(b []byte) {
- PermissionSecret = b
- }(PermissionSecret)
+ theConfig.blobSigningKey = b
+ }(theConfig.blobSigningKey)
- blobSignatureTTL = knownSignatureTTL
+ theConfig.BlobSignatureTTL = knownSignatureTTL
- PermissionSecret = []byte(knownKey)
+ theConfig.blobSigningKey = []byte(knownKey)
if err := VerifySignature(knownSignedLocator, knownToken); err != nil {
t.Fatal(err)
}
- PermissionSecret = []byte("arbitrarykey")
+ theConfig.blobSigningKey = []byte("arbitrarykey")
if err := VerifySignature(knownSignedLocator, knownToken); err == nil {
- t.Fatal("Verified signature even with wrong PermissionSecret")
+ t.Fatal("Verified signature even with wrong blobSigningKey")
}
}
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello",
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hola",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "unused",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "unused",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hello hello",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello again",
pullq.ReplaceQueue(makeTestWorkList(firstInput))
testPullLists["Added_before_actual_test_item"] = string(1)
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_items_latest_replacing_old",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hola de nuevo",
}
// In this case, the item will not be placed on pullq
-func (s *PullWorkerTestSuite) TestPullWorker_invalid_dataManagerToken(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", "invalid_dataManagerToken", "PUT", firstPullList},
+ req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
responseCode: http.StatusUnauthorized,
responseBody: "Unauthorized\n",
readContent: "hello",
"os"
"regexp"
"strings"
+ "sync"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
)
)
type s3VolumeAdder struct {
- *volumeSet
+ *Config
+}
+
+// String implements flag.Value
+func (s *s3VolumeAdder) String() string {
+ return "-"
}
func (s *s3VolumeAdder) Set(bucketName string) error {
if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
}
- region, ok := aws.Regions[s3RegionName]
- if s3Endpoint == "" {
- if !ok {
- return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
- }
- } else {
- if ok {
- return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
- "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
- }
- region = aws.Region{
- Name: s3RegionName,
- S3Endpoint: s3Endpoint,
- }
- }
- var err error
- var auth aws.Auth
- auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
- if err != nil {
- return err
- }
- auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
- if err != nil {
- return err
- }
- if flagSerializeIO {
+ if deprecated.flagSerializeIO {
log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
}
- v := NewS3Volume(auth, region, bucketName, s3RaceWindow, flagReadonly, s3Replication)
- if err := v.Check(); err != nil {
- return err
- }
- *s.volumeSet = append(*s.volumeSet, v)
+ s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
+ Bucket: bucketName,
+ AccessKeyFile: s3AccessKeyFile,
+ SecretKeyFile: s3SecretKeyFile,
+ Endpoint: s3Endpoint,
+ Region: s3RegionName,
+ RaceWindow: arvados.Duration(s3RaceWindow),
+ S3Replication: s3Replication,
+ UnsafeDelete: s3UnsafeDelete,
+ ReadOnly: deprecated.flagReadonly,
+ IndexPageSize: 1000,
+ })
return nil
}
}
func init() {
- flag.Var(&s3VolumeAdder{&volumes},
+ VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
+
+ flag.Var(&s3VolumeAdder{theConfig},
"s3-bucket-volume",
"Use the given bucket as a storage volume. Can be given multiple times.")
flag.StringVar(
&s3AccessKeyFile,
"s3-access-key-file",
"",
- "File containing the access key used for subsequent -s3-bucket-volume arguments.")
+ "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
flag.StringVar(
&s3SecretKeyFile,
"s3-secret-key-file",
"",
- "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
+ "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
flag.DurationVar(
&s3RaceWindow,
"s3-race-window",
// S3Volume implements Volume using an S3 bucket.
type S3Volume struct {
- *s3.Bucket
- raceWindow time.Duration
- readonly bool
- replication int
- indexPageSize int
-}
-
-// NewS3Volume returns a new S3Volume using the given auth, region,
-// and bucket name. The replication argument specifies the replication
-// level to report when writing data.
-func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow time.Duration, readonly bool, replication int) *S3Volume {
- return &S3Volume{
- Bucket: &s3.Bucket{
- S3: s3.New(auth, region),
- Name: bucket,
+ AccessKeyFile string
+ SecretKeyFile string
+ Endpoint string
+ Region string
+ Bucket string
+ LocationConstraint bool
+ IndexPageSize int
+ S3Replication int
+ RaceWindow arvados.Duration
+ ReadOnly bool
+ UnsafeDelete bool
+
+ bucket *s3.Bucket
+
+ startOnce sync.Once
+}
+
+// Examples implements VolumeWithExamples.
+func (*S3Volume) Examples() []Volume {
+ return []Volume{
+ &S3Volume{
+ AccessKeyFile: "/etc/aws_s3_access_key.txt",
+ SecretKeyFile: "/etc/aws_s3_secret_key.txt",
+ Endpoint: "",
+ Region: "us-east-1",
+ Bucket: "example-bucket-name",
+ IndexPageSize: 1000,
+ S3Replication: 2,
+ RaceWindow: arvados.Duration(24 * time.Hour),
+ },
+ &S3Volume{
+ AccessKeyFile: "/etc/gce_s3_access_key.txt",
+ SecretKeyFile: "/etc/gce_s3_secret_key.txt",
+ Endpoint: "https://storage.googleapis.com",
+ Region: "",
+ Bucket: "example-bucket-name",
+ IndexPageSize: 1000,
+ S3Replication: 2,
+ RaceWindow: arvados.Duration(24 * time.Hour),
},
- raceWindow: raceWindow,
- readonly: readonly,
- replication: replication,
- indexPageSize: 1000,
}
}
-// Check returns an error if the volume is inaccessible (e.g., config
-// error).
-func (v *S3Volume) Check() error {
+// Type implements Volume.
+func (*S3Volume) Type() string {
+ return "S3"
+}
+
+// Start populates private fields and verifies the configuration is
+// valid.
+func (v *S3Volume) Start() error {
+ region, ok := aws.Regions[v.Region]
+ if v.Endpoint == "" {
+ if !ok {
+ return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
+ }
+ } else if ok {
+ return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
+ "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
+ } else {
+ region = aws.Region{
+ Name: v.Region,
+ S3Endpoint: v.Endpoint,
+ S3LocationConstraint: v.LocationConstraint,
+ }
+ }
+
+ var err error
+ var auth aws.Auth
+ auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
+ if err != nil {
+ return err
+ }
+ auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
+ if err != nil {
+ return err
+ }
+ v.bucket = &s3.Bucket{
+ S3: s3.New(auth, region),
+ Name: v.Bucket,
+ }
return nil
}
// disappeared in a Trash race, getReader calls fixRace to recover the
// data, and tries again.
func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
- rdr, err = v.Bucket.GetReader(loc)
+ rdr, err = v.bucket.GetReader(loc)
err = v.translateError(err)
if err == nil || !os.IsNotExist(err) {
return
}
- _, err = v.Bucket.Head("recent/"+loc, nil)
+ _, err = v.bucket.Head("recent/"+loc, nil)
err = v.translateError(err)
if err != nil {
// If we can't read recent/X, there's no point in
err = os.ErrNotExist
return
}
- rdr, err = v.Bucket.GetReader(loc)
+ rdr, err = v.bucket.GetReader(loc)
if err != nil {
log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
err = v.translateError(err)
// Put writes a block.
func (v *S3Volume) Put(loc string, block []byte) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
var opts s3.Options
}
opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
}
- err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
+ err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
if err != nil {
return v.translateError(err)
}
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
// Touch sets the timestamp for the given locator to the current time.
func (v *S3Volume) Touch(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
- _, err := v.Bucket.Head(loc, nil)
+ _, err := v.bucket.Head(loc, nil)
err = v.translateError(err)
if os.IsNotExist(err) && v.fixRace(loc) {
// The data object got trashed in a race, but fixRace
} else if err != nil {
return err
}
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
// Mtime returns the stored timestamp for the given locator.
func (v *S3Volume) Mtime(loc string) (time.Time, error) {
- _, err := v.Bucket.Head(loc, nil)
+ _, err := v.bucket.Head(loc, nil)
if err != nil {
return zeroTime, v.translateError(err)
}
- resp, err := v.Bucket.Head("recent/"+loc, nil)
+ resp, err := v.bucket.Head("recent/"+loc, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// The data object X exists, but recent/X is missing.
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
log.Printf("error: creating %q: %s", "recent/"+loc, err)
return zeroTime, v.translateError(err)
}
log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
- resp, err = v.Bucket.Head("recent/"+loc, nil)
+ resp, err = v.bucket.Head("recent/"+loc, nil)
if err != nil {
log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
return zeroTime, v.translateError(err)
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
- Bucket: v.Bucket,
+ Bucket: v.bucket,
Prefix: prefix,
- PageSize: v.indexPageSize,
+ PageSize: v.IndexPageSize,
}
recentL := s3Lister{
- Bucket: v.Bucket,
+ Bucket: v.bucket,
Prefix: "recent/" + prefix,
- PageSize: v.indexPageSize,
+ PageSize: v.IndexPageSize,
}
for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
if data.Key >= "g" {
// Trash a Keep block.
func (v *S3Volume) Trash(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
if t, err := v.Mtime(loc); err != nil {
return err
- } else if time.Since(t) < blobSignatureTTL {
+ } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
return nil
}
- if trashLifetime == 0 {
+ if theConfig.TrashLifetime == 0 {
if !s3UnsafeDelete {
return ErrS3TrashDisabled
}
- return v.Bucket.Del(loc)
+ return v.bucket.Del(loc)
}
err := v.checkRaceWindow(loc)
if err != nil {
if err != nil {
return err
}
- return v.translateError(v.Bucket.Del(loc))
+ return v.translateError(v.bucket.Del(loc))
}
// checkRaceWindow returns a non-nil error if trash/loc is, or might
// be, in the race window (i.e., it's not safe to trash loc).
func (v *S3Volume) checkRaceWindow(loc string) error {
- resp, err := v.Bucket.Head("trash/"+loc, nil)
+ resp, err := v.bucket.Head("trash/"+loc, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// OK, trash/X doesn't exist so we're not in the race
// Can't parse timestamp
return err
}
- safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow))
+ safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
if safeWindow <= 0 {
// We can't count on "touch trash/X" to prolong
// trash/X's lifetime. The new timestamp might not
// (PutCopy returns 200 OK if the request was received, even if the
// copy failed).
func (v *S3Volume) safeCopy(dst, src string) error {
- resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
+ resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
ContentType: "application/octet-stream",
MetadataDirective: "REPLACE",
- }, v.Bucket.Name+"/"+src)
+ }, v.bucket.Name+"/"+src)
err = v.translateError(err)
if err != nil {
return err
if err != nil {
return err
}
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
// String implements fmt.Stringer.
func (v *S3Volume) String() string {
- return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
+ return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
}
// Writable returns false if all future Put, Mtime, and Delete calls
// are expected to fail.
func (v *S3Volume) Writable() bool {
- return !v.readonly
+ return !v.ReadOnly
}
// Replication returns the storage redundancy of the underlying
// device. Configured via command line flag.
func (v *S3Volume) Replication() int {
- return v.replication
+ return v.S3Replication
}
var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
// there was a race between Put and Trash, fixRace recovers from the
// race by Untrashing the block.
func (v *S3Volume) fixRace(loc string) bool {
- trash, err := v.Bucket.Head("trash/"+loc, nil)
+ trash, err := v.bucket.Head("trash/"+loc, nil)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
return false
}
- recent, err := v.Bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil {
log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
return false
}
ageWhenTrashed := trashTime.Sub(recentTime)
- if ageWhenTrashed >= blobSignatureTTL {
+ if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
// No evidence of a race: block hasn't been written
// since it became eligible for Trash. No fix needed.
return false
}
- log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, blobSignatureTTL)
+ log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
err = v.safeCopy(loc, "trash/"+loc)
if err != nil {
return err
}
-// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
// and deletes them from the volume.
func (v *S3Volume) EmptyTrash() {
var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
// Use a merge sort to find matching sets of trash/X and recent/X.
trashL := s3Lister{
- Bucket: v.Bucket,
+ Bucket: v.bucket,
Prefix: "trash/",
- PageSize: v.indexPageSize,
+ PageSize: v.IndexPageSize,
}
// Define "ready to delete" as "...when EmptyTrash started".
startT := time.Now()
log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
continue
}
- recent, err := v.Bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil && os.IsNotExist(v.translateError(err)) {
log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
err = v.Untrash(loc)
log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
continue
}
- if trashT.Sub(recentT) < blobSignatureTTL {
- if age := startT.Sub(recentT); age >= blobSignatureTTL-v.raceWindow {
+ if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
+ if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
// recent/loc is too old to protect
// loc from being Trashed again during
// the raceWindow that starts if we
// delete trash/X now.
//
- // Note this means (trashCheckInterval
- // < blobSignatureTTL - raceWindow) is
+ // Note this means (TrashCheckInterval
+ // < BlobSignatureTTL - raceWindow) is
// necessary to avoid starvation.
log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
v.fixRace(loc)
v.Touch(loc)
continue
- } else if _, err := v.Bucket.Head(loc, nil); os.IsNotExist(err) {
+ } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
v.fixRace(loc)
continue
continue
}
}
- if startT.Sub(trashT) < trashLifetime {
+ if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
continue
}
- err = v.Bucket.Del(trash.Key)
+ err = v.bucket.Del(trash.Key)
if err != nil {
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
continue
bytesDeleted += trash.Size
blocksDeleted++
- _, err = v.Bucket.Head(loc, nil)
+ _, err = v.bucket.Head(loc, nil)
if os.IsNotExist(err) {
- err = v.Bucket.Del("recent/" + loc)
+ err = v.bucket.Del("recent/" + loc)
if err != nil {
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
}
"bytes"
"crypto/md5"
"fmt"
+ "io/ioutil"
"log"
"os"
"time"
- "github.com/AdRoll/goamz/aws"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/s3"
"github.com/AdRoll/goamz/s3/s3test"
check "gopkg.in/check.v1"
)
-type TestableS3Volume struct {
- *S3Volume
- server *s3test.Server
- c *check.C
- serverClock *fakeClock
-}
-
const (
TestBucketName = "testbucket"
)
s3UnsafeDelete = true
}
-func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
- clock := &fakeClock{}
- srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
- c.Assert(err, check.IsNil)
- auth := aws.Auth{}
- region := aws.Region{
- Name: "test-region-1",
- S3Endpoint: srv.URL(),
- S3LocationConstraint: true,
- }
- bucket := &s3.Bucket{
- S3: s3.New(auth, region),
- Name: TestBucketName,
- }
- err = bucket.PutBucket(s3.ACL("private"))
- c.Assert(err, check.IsNil)
-
- return &TestableS3Volume{
- S3Volume: NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
- server: srv,
- serverClock: clock,
- }
-}
-
var _ = check.Suite(&StubbedS3Suite{})
type StubbedS3Suite struct {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
// Use a negative raceWindow so s3test's 1-second
// timestamp precision doesn't confuse fixRace.
- return NewTestableS3Volume(c, -2*time.Second, false, 2)
+ return s.newTestableVolume(c, -2*time.Second, false, 2)
})
}
func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
- return NewTestableS3Volume(c, -2*time.Second, true, 2)
+ return s.newTestableVolume(c, -2*time.Second, true, 2)
})
}
func (s *StubbedS3Suite) TestIndex(c *check.C) {
- v := NewTestableS3Volume(c, 0, false, 2)
- v.indexPageSize = 3
+ v := s.newTestableVolume(c, 0, false, 2)
+ v.IndexPageSize = 3
for i := 0; i < 256; i++ {
v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
}
}
func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
- defer func(tl, bs time.Duration) {
- trashLifetime = tl
- blobSignatureTTL = bs
- }(trashLifetime, blobSignatureTTL)
- trashLifetime = time.Hour
- blobSignatureTTL = time.Hour
+ defer func(tl, bs arvados.Duration) {
+ theConfig.TrashLifetime = tl
+ theConfig.BlobSignatureTTL = bs
+ }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
+ theConfig.TrashLifetime.Set("1h")
+ theConfig.BlobSignatureTTL.Set("1h")
- v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
+ v := s.newTestableVolume(c, 5*time.Minute, false, 2)
var none time.Time
putS3Obj := func(t time.Time, key string, data []byte) {
return
}
v.serverClock.now = &t
- v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+ v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
}
t0 := time.Now()
false, false, false, true, false, false,
},
{
- "Erroneously trashed during a race, detected before trashLifetime",
+ "Erroneously trashed during a race, detected before TrashLifetime",
none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
true, false, true, true, true, false,
},
{
- "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
+ "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
true, false, true, true, true, false,
},
// freshAfterEmpty
loc, blk = setupScenario()
v.EmptyTrash()
- _, err = v.Bucket.Head("trash/"+loc, nil)
+ _, err = v.bucket.Head("trash/"+loc, nil)
c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
if scenario.freshAfterEmpty {
t, err := v.Mtime(loc)
}
}
+type TestableS3Volume struct {
+ *S3Volume
+ server *s3test.Server
+ c *check.C
+ serverClock *fakeClock
+}
+
+func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
+ clock := &fakeClock{}
+ srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
+ c.Assert(err, check.IsNil)
+
+ tmp, err := ioutil.TempFile("", "keepstore")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(tmp.Name())
+ _, err = tmp.Write([]byte("xxx\n"))
+ c.Assert(err, check.IsNil)
+ c.Assert(tmp.Close(), check.IsNil)
+
+ v := &TestableS3Volume{
+ S3Volume: &S3Volume{
+ Bucket: TestBucketName,
+ AccessKeyFile: tmp.Name(),
+ SecretKeyFile: tmp.Name(),
+ Endpoint: srv.URL(),
+ Region: "test-region-1",
+ LocationConstraint: true,
+ RaceWindow: arvados.Duration(raceWindow),
+ S3Replication: replication,
+ UnsafeDelete: s3UnsafeDelete,
+ ReadOnly: readonly,
+ IndexPageSize: 1000,
+ },
+ server: srv,
+ serverClock: clock,
+ }
+ c.Assert(v.Start(), check.IsNil)
+ err = v.bucket.PutBucket(s3.ACL("private"))
+ c.Assert(err, check.IsNil)
+ return v
+}
+
// PutRaw skips the ContentMD5 test
func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
- err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+ err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
log.Printf("PutRaw: %+v", err)
}
// while we do this.
func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
v.serverClock.now = &lastPut
- err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
panic(err)
}
"errors"
"log"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
// TrashItem deletes the indicated block from every writable volume.
func TrashItem(trashRequest TrashRequest) {
reqMtime := time.Unix(0, trashRequest.BlockMtime)
- if time.Since(reqMtime) < blobSignatureTTL {
+ if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
- time.Since(reqMtime),
+ arvados.Duration(time.Since(reqMtime)),
trashRequest.Locator,
trashRequest.BlockMtime,
reqMtime,
- blobSignatureTTL)
+ theConfig.BlobSignatureTTL)
return
}
continue
}
- if neverDelete {
- err = errors.New("did not delete block because neverDelete is true")
+ if !theConfig.EnableDelete {
+ err = errors.New("did not delete block because EnableDelete is false")
} else {
err = volume.Trash(trashRequest.Locator)
}
Expect no errors.
*/
func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: "5d41402abc4b2a76b9719d911017c592",
Block1: []byte("hello"),
Expect the second locator in volume 2 to be unaffected.
*/
func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
Expect the first locator in volume 1 to be unaffected.
*/
func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
Expect locator to be deleted from both volumes.
*/
func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
Delete the second and expect the first to be still around.
*/
func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
Expect the other unaffected.
*/
func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
will not be deleted because its Mtime is within the trash life time.
*/
func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
performTrashWorkerTest(testData, t)
}
-/* Delete a block with matching mtime for locator in both volumes, but neverDelete is true,
+/* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
so block won't be deleted.
*/
-func TestTrashWorkerIntegration_NeverDelete(t *testing.T) {
- neverDelete = true
+func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) {
+ theConfig.EnableDelete = false
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
}
}
- oldBlockTime := time.Now().Add(-blobSignatureTTL - time.Minute)
+ oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute)
// Create TrashRequest for the test
trashRequest := TrashRequest{
--- /dev/null
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+ "sort"
+ "strings"
+
+ "github.com/ghodss/yaml"
+)
+
+func usage() {
+ c := DefaultConfig()
+ knownTypes := []string{}
+ for _, vt := range VolumeTypes {
+ c.Volumes = append(c.Volumes, vt().Examples()...)
+ knownTypes = append(knownTypes, vt().Type())
+ }
+ exampleConfigFile, err := yaml.Marshal(c)
+ if err != nil {
+ panic(err)
+ }
+ sort.Strings(knownTypes)
+ knownTypeList := strings.Join(knownTypes, ", ")
+ fmt.Fprintf(os.Stderr, `
+
+keepstore provides a content-addressed data store backed by a local filesystem or networked storage.
+
+Usage: keepstore -config path/to/keepstore.yml
+ keepstore [OPTIONS] -dump-config
+
+NOTE: All options (other than -config) are deprecated in favor of YAML
+ configuration. Use -dump-config to translate existing
+ configurations to YAML format.
+
+Options:
+`)
+ flag.PrintDefaults()
+ fmt.Fprintf(os.Stderr, `
+Example config file:
+
+%s
+
+Listen:
+
+ Local port to listen on. Can be "address:port" or ":port", where
+ "address" is a host IP address or name and "port" is a port number
+ or name.
+
+PIDFile:
+
+ Path to write PID file during startup. This file is kept open and
+ locked with LOCK_EX until keepstore exits, so "fuser -k pidfile" is
+ one way to shut down. Exit immediately if there is an error
+ opening, locking, or writing the PID file.
+
+MaxBuffers:
+
+ Maximum RAM to use for data buffers, given in multiples of block
+ size (64 MiB). When this limit is reached, HTTP requests requiring
+ buffers (like GET and PUT) will wait for buffer space to be
+ released.
+
+MaxRequests:
+
+ Maximum concurrent requests. When this limit is reached, new
+ requests will receive 503 responses. Note: this limit does not
+ include idle connections from clients using HTTP keepalive, so it
+ does not strictly limit the number of concurrent connections. If
+ omitted or zero, the default is 2 * MaxBuffers.
+
+BlobSigningKeyFile:
+
+ Local file containing the secret blob signing key (used to
+ generate and verify blob signatures). This key should be
+ identical to the API server's blob_signing_key configuration
+ entry.
+
+RequireSignatures:
+
+ Honor read requests only if a valid signature is provided. This
+ should be true, except for development use and when migrating from
+ a very old version.
+
+BlobSignatureTTL:
+
+ Duration for which new permission signatures (returned in PUT
+ responses) will be valid. This should be equal to the API
+ server's blob_signature_ttl configuration entry.
+
+SystemAuthTokenFile:
+
+ Local file containing the Arvados API token used by keep-balance
+ or data manager. Delete, trash, and index requests are honored
+ only for this token.
+
+EnableDelete:
+
+ Enable trash and delete features. If false, trash lists will be
+ accepted but blocks will not be trashed or deleted.
+
+TrashLifetime:
+
+ Time duration after a block is trashed during which it can be
+ recovered using an /untrash request.
+
+TrashCheckInterval:
+
+ How often to check for (and delete) trashed blocks whose
+ TrashLifetime has expired.
+
+Volumes:
+
+ List of storage volumes. If omitted or empty, the default is to
+ use all directories named "keep" that exist in the top level
+ directory of a mount point at startup time.
+
+ Volume types: %s
+
+ (See volume configuration examples above.)
+
+`, exampleConfigFile, knownTypeList)
+}
// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
// etc.
type Volume interface {
+ // Volume type as specified in config file. Examples: "S3",
+ // "Directory".
+ Type() string
+
+ // Do whatever private setup tasks and configuration checks
+ // are needed. Return non-nil if the volume is unusable (e.g.,
+ // invalid config).
+ Start() error
+
// Get a block: copy the block data into buf, and return the
// number of bytes copied.
//
// loc is as described in Get.
//
// If the timestamp for the given locator is newer than
- // blobSignatureTTL, Trash must not trash the data.
+ // BlobSignatureTTL, Trash must not trash the data.
//
// If a Trash operation overlaps with any Touch or Put
// operations on the same locator, the implementation must
// reliably or fail outright.
//
// Corollary: A successful Touch or Put guarantees a block
- // will not be trashed for at least blobSignatureTTL
+ // will not be trashed for at least BlobSignatureTTL
// seconds.
Trash(loc string) error
// responses to PUT requests.
Replication() int
- // EmptyTrash looks for trashed blocks that exceeded trashLifetime
+ // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
// and deletes them from the volume.
EmptyTrash()
}
+// A VolumeWithExamples provides example configs to display in the
+// -help message.
+type VolumeWithExamples interface {
+ Volume
+ Examples() []Volume
+}
+
// A VolumeManager tells callers which volumes can read, which volumes
// can write, and on which volume the next write should be attempted.
type VolumeManager interface {
"strings"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
)
func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- blobSignatureTTL = 300 * time.Second
+ theConfig.BlobSignatureTTL.Set("5m")
if v.Writable() == false {
return
}
// Calling Delete() for a block with a timestamp older than
-// blobSignatureTTL seconds in the past should delete the data.
+// BlobSignatureTTL seconds in the past should delete the data.
// Test is intended for only writable volumes
func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- blobSignatureTTL = 300 * time.Second
+ theConfig.BlobSignatureTTL.Set("5m")
if v.Writable() == false {
return
}
v.Put(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
}
-// With trashLifetime != 0, perform:
+// With TrashLifetime != 0, perform:
// Trash an old block - which either raises ErrNotImplemented or succeeds
// Untrash - which either raises ErrNotImplemented or succeeds
// Get - which must succeed
v := factory(t)
defer v.Teardown()
defer func() {
- trashLifetime = 0
+ theConfig.TrashLifetime = 0
}()
- trashLifetime = 3600 * time.Second
+ theConfig.TrashLifetime.Set("1h")
// put block and backdate it
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
buf := make([]byte, BlockSize)
n, err := v.Get(TestHash, buf)
func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- defer func(orig time.Duration) {
- trashLifetime = orig
- }(trashLifetime)
+ defer func(orig arvados.Duration) {
+ theConfig.TrashLifetime = orig
+ }(theConfig.TrashLifetime)
checkGet := func() error {
buf := make([]byte, BlockSize)
// First set: EmptyTrash before reaching the trash deadline.
- trashLifetime = time.Hour
+ theConfig.TrashLifetime.Set("1h")
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
err := checkGet()
if err != nil {
err = v.Trash(TestHash)
if err == MethodDisabledError || err == ErrNotImplemented {
// Skip the trash tests for read-only volumes, and
- // volume types that don't support trashLifetime>0.
+ // volume types that don't support TrashLifetime>0.
return
}
}
// Because we Touch'ed, need to backdate again for next set of tests
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
// If the only block in the trash has already been untrashed,
// most volumes will fail a subsequent Untrash with a 404, but
}
// Untrash might have updated the timestamp, so backdate again
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
// Second set: EmptyTrash after the trash deadline has passed.
- trashLifetime = time.Nanosecond
+ theConfig.TrashLifetime.Set("1ns")
err = v.Trash(TestHash)
if err != nil {
// Trash it again, and this time call EmptyTrash so it really
// goes away.
// (In Azure volumes, un/trash changes Mtime, so first backdate again)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
err = v.Trash(TestHash)
err = checkGet()
if err == nil || !os.IsNotExist(err) {
// un-trashed copy doesn't get deleted along with it.
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
- trashLifetime = time.Nanosecond
+ theConfig.TrashLifetime.Set("1ns")
err = v.Trash(TestHash)
if err != nil {
t.Fatal(err)
}
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
// EmptyTrash should not delete the untrashed copy.
v.EmptyTrash()
// untrash the block whose deadline is "C".
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
- trashLifetime = time.Nanosecond
+ theConfig.TrashLifetime.Set("1ns")
err = v.Trash(TestHash)
if err != nil {
t.Fatal(err)
}
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
- trashLifetime = time.Hour
+ theConfig.TrashLifetime.Set("1h")
err = v.Trash(TestHash)
if err != nil {
t.Fatal(err)
return MethodDisabledError
}
if _, ok := v.Store[loc]; ok {
- if time.Since(v.Timestamps[loc]) < blobSignatureTTL {
+ if time.Since(v.Timestamps[loc]) < time.Duration(theConfig.BlobSignatureTTL) {
return nil
}
delete(v.Store, loc)
return os.ErrNotExist
}
-// TBD
+func (v *MockVolume) Type() string {
+ return "Mock"
+}
+
+func (v *MockVolume) Start() error {
+ return nil
+}
+
func (v *MockVolume) Untrash(loc string) error {
return nil
}
import (
"bufio"
- "errors"
"flag"
"fmt"
"io"
)
type unixVolumeAdder struct {
- *volumeSet
+ *Config
}
-func (vs *unixVolumeAdder) Set(value string) error {
- if dirs := strings.Split(value, ","); len(dirs) > 1 {
+// String implements flag.Value
+func (s *unixVolumeAdder) String() string {
+ return "-"
+}
+
+func (vs *unixVolumeAdder) Set(path string) error {
+ if dirs := strings.Split(path, ","); len(dirs) > 1 {
log.Print("DEPRECATED: using comma-separated volume list.")
for _, dir := range dirs {
if err := vs.Set(dir); err != nil {
}
return nil
}
- if len(value) == 0 || value[0] != '/' {
- return errors.New("Invalid volume: must begin with '/'.")
- }
- if _, err := os.Stat(value); err != nil {
- return err
- }
- var locker sync.Locker
- if flagSerializeIO {
- locker = &sync.Mutex{}
- }
- *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
- root: value,
- locker: locker,
- readonly: flagReadonly,
+ vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
+ Root: path,
+ ReadOnly: deprecated.flagReadonly,
+ Serialize: deprecated.flagSerializeIO,
})
return nil
}
func init() {
- flag.Var(
- &unixVolumeAdder{&volumes},
- "volumes",
- "Deprecated synonym for -volume.")
- flag.Var(
- &unixVolumeAdder{&volumes},
- "volume",
- "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+ VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
+
+ flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
+ flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
}
// Discover adds a UnixVolume for every directory named "keep" that is
}
// Set the -readonly flag (but only for this volume)
// if the filesystem is mounted readonly.
- flagReadonlyWas := flagReadonly
+ flagReadonlyWas := deprecated.flagReadonly
for _, fsopt := range strings.Split(args[3], ",") {
if fsopt == "ro" {
- flagReadonly = true
+ deprecated.flagReadonly = true
break
}
if fsopt == "rw" {
} else {
added++
}
- flagReadonly = flagReadonlyWas
+ deprecated.flagReadonly = flagReadonlyWas
}
return added
}
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- // path to the volume's root directory
- root string
+ Root string // path to the volume's root directory
+ ReadOnly bool
+ Serialize bool
+ DirectoryReplication int
+
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
- locker sync.Locker
- readonly bool
+ locker sync.Locker
+}
+
+// Examples implements VolumeWithExamples.
+func (*UnixVolume) Examples() []Volume {
+ return []Volume{
+ &UnixVolume{
+ Root: "/mnt/local-disk",
+ Serialize: true,
+ DirectoryReplication: 1,
+ },
+ &UnixVolume{
+ Root: "/mnt/network-disk",
+ Serialize: false,
+ DirectoryReplication: 2,
+ },
+ }
+}
+
+// Type implements Volume
+func (v *UnixVolume) Type() string {
+ return "Directory"
+}
+
+// Start implements Volume
+func (v *UnixVolume) Start() error {
+ if v.Serialize {
+ v.locker = &sync.Mutex{}
+ }
+ if !strings.HasPrefix(v.Root, "/") {
+ return fmt.Errorf("volume root does not start with '/': %q", v.Root)
+ }
+ if v.DirectoryReplication == 0 {
+ v.DirectoryReplication = 1
+ }
+ _, err := os.Stat(v.Root)
+ return err
}
// Touch sets the timestamp for the given locator to the current time
func (v *UnixVolume) Touch(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
p := v.blockPath(loc)
// returns a FullError. If the write fails due to some other error,
// that error is returned.
func (v *UnixVolume) Put(loc string, block []byte) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
if v.IsFull() {
var fs syscall.Statfs_t
var devnum uint64
- if fi, err := os.Stat(v.root); err == nil {
+ if fi, err := os.Stat(v.Root); err == nil {
devnum = fi.Sys().(*syscall.Stat_t).Dev
} else {
log.Printf("%s: os.Stat: %s\n", v, err)
return nil
}
- err := syscall.Statfs(v.root, &fs)
+ err := syscall.Statfs(v.Root, &fs)
if err != nil {
log.Printf("%s: statfs: %s\n", v, err)
return nil
// uses fs.Blocks - fs.Bfree.
free := fs.Bavail * uint64(fs.Bsize)
used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
- return &VolumeStatus{v.root, devnum, free, used}
+ return &VolumeStatus{v.Root, devnum, free, used}
}
var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
//
func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
var lastErr error
- rootdir, err := os.Open(v.root)
+ rootdir, err := os.Open(v.Root)
if err != nil {
return err
}
if !blockDirRe.MatchString(names[0]) {
continue
}
- blockdirpath := filepath.Join(v.root, names[0])
+ blockdirpath := filepath.Join(v.Root, names[0])
blockdir, err := os.Open(blockdirpath)
if err != nil {
log.Print("Error reading ", blockdirpath, ": ", err)
}
// Trash trashes the block data from the unix storage
-// If trashLifetime == 0, the block is deleted
+// If TrashLifetime == 0, the block is deleted
// Else, the block is renamed as path/{loc}.trash.{deadline},
-// where deadline = now + trashLifetime
+// where deadline = now + TrashLifetime
func (v *UnixVolume) Trash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
// Trash() will read the correct up-to-date timestamp and choose not to
// trash the file.
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
if v.locker != nil {
// anyway (because the permission signatures have expired).
if fi, err := os.Stat(p); err != nil {
return err
- } else if time.Since(fi.ModTime()) < blobSignatureTTL {
+ } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
return nil
}
- if trashLifetime == 0 {
+ if theConfig.TrashLifetime == 0 {
return os.Remove(p)
}
- return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+ return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
}
// Untrash moves block from trash back into store
// Look for path/{loc}.trash.{deadline} in storage,
// and rename the first such file as path/{loc}
func (v *UnixVolume) Untrash(loc string) (err error) {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
// blockDir returns the fully qualified directory name for the directory
// where loc is (or would be) stored on this volume.
func (v *UnixVolume) blockDir(loc string) string {
- return filepath.Join(v.root, loc[0:3])
+ return filepath.Join(v.Root, loc[0:3])
}
// blockPath returns the fully qualified pathname for the path to loc
// MinFreeKilobytes.
//
func (v *UnixVolume) IsFull() (isFull bool) {
- fullSymlink := v.root + "/full"
+ fullSymlink := v.Root + "/full"
// Check if the volume has been marked as full in the last hour.
if link, err := os.Readlink(fullSymlink); err == nil {
//
func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
var fs syscall.Statfs_t
- err = syscall.Statfs(v.root, &fs)
+ err = syscall.Statfs(v.Root, &fs)
if err == nil {
// Statfs output is not guaranteed to measure free
// space in terms of 1K blocks.
}
func (v *UnixVolume) String() string {
- return fmt.Sprintf("[UnixVolume %s]", v.root)
+ return fmt.Sprintf("[UnixVolume %s]", v.Root)
}
// Writable returns false if all future Put, Mtime, and Delete calls
// are expected to fail.
func (v *UnixVolume) Writable() bool {
- return !v.readonly
+ return !v.ReadOnly
}
// Replication returns the number of replicas promised by the
-// underlying device (currently assumed to be 1).
+// underlying device (as specified in configuration).
func (v *UnixVolume) Replication() int {
- return 1
+ return v.DirectoryReplication
}
// lockfile and unlockfile use flock(2) to manage kernel file locks.
var bytesDeleted, bytesInTrash int64
var blocksDeleted, blocksInTrash int
- err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+ err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
return nil
}
return &TestableUnixVolume{
UnixVolume: UnixVolume{
- root: d,
+ Root: d,
+ ReadOnly: readonly,
locker: locker,
- readonly: readonly,
},
t: t,
}
// the volume is readonly.
func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
defer func(orig bool) {
- v.readonly = orig
- }(v.readonly)
- v.readonly = false
+ v.ReadOnly = orig
+ }(v.ReadOnly)
+ v.ReadOnly = false
err := v.Put(locator, data)
if err != nil {
v.t.Fatal(err)
}
func (v *TestableUnixVolume) Teardown() {
- if err := os.RemoveAll(v.root); err != nil {
+ if err := os.RemoveAll(v.Root); err != nil {
v.t.Fatal(err)
}
}
})
}
+func TestReplicationDefault1(t *testing.T) {
+ v := &UnixVolume{
+ Root: "/",
+ ReadOnly: true,
+ }
+ if err := v.Start(); err != nil {
+ t.Error(err)
+ }
+ if got := v.Replication(); got != 1 {
+ t.Errorf("Replication() returned %d, expected 1 if no config given", got)
+ }
+}
+
func TestGetNotFound(t *testing.T) {
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
if err != nil {
t.Error(err)
}
- p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+ p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
if buf, err := ioutil.ReadFile(p); err != nil {
t.Error(err)
} else if bytes.Compare(buf, TestBlock) != 0 {
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- os.Chmod(v.root, 000)
+ os.Chmod(v.Root, 000)
err := v.Put(TestHash, TestBlock)
if err == nil {
t.Error("Write should have failed")
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- fullPath := v.root + "/full"
+ fullPath := v.Root + "/full"
now := fmt.Sprintf("%d", time.Now().Unix())
os.Symlink(now, fullPath)
if !v.IsFull() {
// Get node status and make a basic sanity check.
volinfo := v.Status()
- if volinfo.MountPoint != v.root {
- t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
+ if volinfo.MountPoint != v.Root {
+ t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
}
if volinfo.DeviceNum == 0 {
t.Errorf("uninitialized device_num in %v", volinfo)
t.Errorf("Got err %q, expected %q", err, DiskHashError)
}
- p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+ p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
os.Chmod(p, 000)
err = v.Compare(TestHash, TestBlock)
if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
f.write(@key)
f.close()
end
- FileUtils.chown_R(l[:username], l[:username], userdotssh)
+ FileUtils.chown_R(l[:username], nil, userdotssh)
File.chmod(0700, userdotssh)
File.chmod(0750, @homedir)
File.chmod(0600, userauthkeys)
export ARVADOS_API_HOST=$localip:${services[api]}
export ARVADOS_API_HOST_INSECURE=1
-export GITOLITE_HTTP_HOME=/var/lib/arvados/git
-export GL_BYPASS_ACCESS_CHECKS=1
export PATH="$PATH:/var/lib/arvados/git/bin"
cd ~git
exec /usr/local/bin/arv-git-httpd \
- -address=:${services[arv-git-httpd]} \
- -git-command=/usr/share/gitolite3/gitolite-shell \
- -repo-root=/var/lib/arvados/git/repositories
+ -address=:${services[arv-git-httpd]} \
+ -git-command=/usr/share/gitolite3/gitolite-shell \
+ -gitolite-home=/var/lib/arvados/git \
+ -repo-root=/var/lib/arvados/git/repositories