# Methods that don't require login should
# skip_around_filter :require_thread_api_token
around_filter :require_thread_api_token, except: ERROR_ACTIONS
+ before_filter :ensure_arvados_api_exists, only: [:index, :show]
before_filter :set_cache_buster
before_filter :accept_uuid_as_id_param, except: ERROR_ACTIONS
before_filter :check_user_agreements, except: ERROR_ACTIONS
end
end
+ def ensure_arvados_api_exists
+ if model_class.is_a?(Class) && model_class < ArvadosBase && !model_class.api_exists?(params['action'].to_sym)
+ @errors = ["#{params['action']} method is not supported for #{params['controller']}"]
+ return render_error(status: 404)
+ end
+ end
+
def index
find_objects_for_index if !@objects
render_index
}
@@notification_tests.push lambda { |controller, current_user|
- PipelineInstance.limit(1).where(created_by: current_user.uuid).each do
+ if PipelineInstance.api_exists?(:index)
+ PipelineInstance.limit(1).where(created_by: current_user.uuid).each do
+ return nil
+ end
+ else
return nil
end
return lambda { |view|
def recent_processes lim
lim = 12 if lim.nil?
- cols = %w(uuid owner_uuid created_at modified_at pipeline_template_uuid name state started_at finished_at)
- pipelines = PipelineInstance.select(cols).limit(lim).order(["created_at desc"])
+ procs = {}
+ if PipelineInstance.api_exists?(:index)
+ cols = %w(uuid owner_uuid created_at modified_at pipeline_template_uuid name state started_at finished_at)
+ pipelines = PipelineInstance.select(cols).limit(lim).order(["created_at desc"])
+ pipelines.results.each { |pi| procs[pi] = pi.created_at }
+ end
crs = ContainerRequest.limit(lim).order(["created_at desc"]).filter([["requesting_container_uuid", "=", nil]])
- procs = {}
- pipelines.results.each { |pi| procs[pi] = pi.created_at }
crs.results.each { |c| procs[c] = c.created_at }
Hash[procs.sort_by {|key, value| value}].keys.reverse.first(lim)
render 'hash_matches'
return
else
- jobs_with = lambda do |conds|
- Job.limit(RELATION_LIMIT).where(conds)
- .results.sort_by { |j| j.finished_at || j.created_at }
+ if Job.api_exists?(:index)
+ jobs_with = lambda do |conds|
+ Job.limit(RELATION_LIMIT).where(conds)
+ .results.sort_by { |j| j.finished_at || j.created_at }
+ end
+ @output_of = jobs_with.call(output: @object.portable_data_hash)
+ @log_of = jobs_with.call(log: @object.portable_data_hash)
end
- @output_of = jobs_with.call(output: @object.portable_data_hash)
- @log_of = jobs_with.call(log: @object.portable_data_hash)
+
@project_links = Link.limit(RELATION_LIMIT).order("modified_at DESC")
.where(head_uuid: @object.uuid, link_class: 'name').results
project_hash = Group.where(uuid: @project_links.map(&:tail_uuid)).to_hash
# It also seems to me that something like these could be used to configure the contents of the panes.
def show_pane_list
pane_list = []
+
+ procs = ["arvados#containerRequest"]
+ if PipelineInstance.api_exists?(:index)
+ procs << "arvados#pipelineInstance"
+ end
+
+ workflows = ["arvados#workflow"]
+ workflows_pane_name = 'Workflows'
+ if PipelineTemplate.api_exists?(:index)
+ workflows << "arvados#pipelineTemplate"
+ workflows_pane_name = 'Pipeline_templates'
+ end
+
if @object.uuid != current_user.andand.uuid
pane_list << 'Description'
end
pane_list <<
{
:name => 'Pipelines_and_processes',
- :filters => [%w(uuid is_a) + [%w(arvados#containerRequest arvados#pipelineInstance)]]
+ :filters => [%w(uuid is_a) + [procs]]
}
pane_list <<
{
- :name => 'Pipeline_templates',
- :filters => [%w(uuid is_a) + [%w(arvados#pipelineTemplate arvados#workflow)]]
+ :name => workflows_pane_name,
+ :filters => [%w(uuid is_a) + [workflows]]
}
pane_list <<
{
@name_link_for = {}
kind_filters.each do |attr,op,val|
(val.is_a?(Array) ? val : [val]).each do |type|
+ klass = type.split('#')[-1]
+ klass[0] = klass[0].capitalize
+ next if(!Object.const_get(klass).api_exists?(:index))
+
filters = @filters - kind_filters + [['uuid', 'is_a', type]]
if type == 'arvados#containerRequest'
filters = filters + [['container_requests.requesting_container_uuid', '=', nil]]
@filters = @filters || []
# get next page of pipeline_templates
- filters = @filters + [["uuid", "is_a", ["arvados#pipelineTemplate"]]]
- pipelines = PipelineTemplate.limit(@limit).order(["created_at desc"]).filter(filters)
+ if PipelineTemplate.api_exists?(:index)
+ filters = @filters + [["uuid", "is_a", ["arvados#pipelineTemplate"]]]
+ pipelines = PipelineTemplate.limit(@limit).order(["created_at desc"]).filter(filters)
+ end
# get next page of workflows
filters = @filters + [["uuid", "is_a", ["arvados#workflow"]]]
@filters = @filters || []
# get next page of pipeline_instances
- filters = @filters + [["uuid", "is_a", ["arvados#pipelineInstance"]]]
- pipelines = PipelineInstance.limit(@limit).order(["created_at desc"]).filter(filters)
+ if PipelineInstance.api_exists?(:index)
+ filters = @filters + [["uuid", "is_a", ["arvados#pipelineInstance"]]]
+ pipelines = PipelineInstance.limit(@limit).order(["created_at desc"]).filter(filters)
+ end
# get next page of jobs
- filters = @filters + [["uuid", "is_a", ["arvados#job"]]]
- jobs = Job.limit(@limit).order(["created_at desc"]).filter(filters)
+ if Job.api_exists?(:index)
+ filters = @filters + [["uuid", "is_a", ["arvados#job"]]]
+ jobs = Job.limit(@limit).order(["created_at desc"]).filter(filters)
+ end
# get next page of container_requests
filters = @filters + [["uuid", "is_a", ["arvados#containerRequest"]]]
end
def self.creatable?
- current_user.andand.is_active
+ current_user.andand.is_active && api_exists?(:create)
end
def self.goes_in_projects?
editable?
end
+ def self.api_exists?(method)
+ arvados_api_client.discovery[:resources][self.to_s.underscore.pluralize.to_sym].andand[:methods].andand[method]
+ end
+
# Array of strings that are the names of attributes that can be edited
# with X-Editable.
def editable_attributes
--- /dev/null
+<%= render_pane 'tab_contents', to_string: true, locals: {
+ limit: 50,
+ filters: [['uuid', 'is_a', ["arvados#workflow"]]],
+ sortable_columns: { 'name' => 'workflows.name', 'description' => 'workflows.description' }
+ }.merge(local_assigns) %>
--- /dev/null
+require 'test_helper'
+require 'helpers/share_object_helper'
+
+class DisabledApiTest < ActionController::TestCase
+ test "dashboard recent processes when pipeline_instance index API is disabled" do
+ @controller = ProjectsController.new
+
+ dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+ dd[:resources][:pipeline_instances][:methods].delete(:index)
+ ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+
+ get :index, {}, session_for(:active)
+ assert_includes @response.body, "zzzzz-xvhdp-cr4runningcntnr" # expect crs
+ assert_not_includes @response.body, "zzzzz-d1hrv-" # expect no pipelines
+ end
+
+ [
+ [:jobs, JobsController.new],
+ [:job_tasks, JobTasksController.new],
+ [:pipeline_instances, PipelineInstancesController.new],
+ [:pipeline_templates, PipelineTemplatesController.new],
+ ].each do |ctrl_name, ctrl|
+ test "#{ctrl_name} index page when API is disabled" do
+ @controller = ctrl
+
+ dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+ dd[:resources][ctrl_name][:methods].delete(:index)
+ ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+
+ get :index, {}, session_for(:active)
+ assert_response 404
+ end
+ end
+
+ [
+ :active,
+ nil,
+ ].each do |user|
+ test "project tabs as user #{user} when pipeline related index APIs are disabled" do
+ @controller = ProjectsController.new
+
+ Rails.configuration.anonymous_user_token = api_fixture('api_client_authorizations')['anonymous']['api_token']
+
+ dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+ dd[:resources][:pipeline_templates][:methods].delete(:index)
+ ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+
+ proj_uuid = api_fixture('groups')['anonymously_accessible_project']['uuid']
+
+ if user
+ get(:show, {id: proj_uuid}, session_for(user))
+ else
+ get(:show, {id: proj_uuid})
+ end
+
+ resp = @response.body
+ assert_includes resp, "href=\"#Data_collections\""
+ assert_includes resp, "href=\"#Pipelines_and_processes\""
+ assert_includes resp, "href=\"#Workflows\""
+ assert_not_includes resp, "href=\"#Pipeline_templates\""
+ end
+ end
+end
--- /dev/null
+require 'test_helper'
+
+class DisabledApiTest < ActiveSupport::TestCase
+ test 'Job.creatable? reflects whether jobs.create API is enabled' do
+ use_token(:active) do
+ assert(Job.creatable?)
+ end
+ dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+ dd[:resources][:jobs][:methods].delete(:create)
+ ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+ use_token(:active) do
+ refute(Job.creatable?)
+ end
+ end
+end
"Verify that all data from one set of Keep servers to another was copied"
package_go_binary tools/keep-rsync keep-rsync \
"Copy all data from one set of Keep servers to another"
+package_go_binary tools/keep-exercise keep-exercise \
+ "Performance testing tool for Arvados Keep"
# The Python SDK
# Please resist the temptation to add --no-python-fix-name to the fpm call here
fpm_build cwltest "" "" python 1.0.20160907111242
# And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17
-fpm_build cwltool "" "" python 1.0.20161007181528
+fpm_build cwltool "" "" python 1.0.20161107145355
# FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25
fpm_build rdflib-jsonld "" "" python 0.3.0
sdk/go/crunchrunner
sdk/cwl
tools/crunchstat-summary
+tools/keep-exercise
tools/keep-rsync
tools/keep-block-check
echo -n 'go: '
go version \
|| fatal "No go binary. See http://golang.org/doc/install"
- [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 6 ]] \
- || fatal "Go >= 1.6 required. See http://golang.org/doc/install"
+ [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 7 ]] \
+ || fatal "Go >= 1.7 required. See http://golang.org/doc/install"
echo -n 'gcc: '
gcc --version | egrep ^gcc \
|| fatal "No gcc. Try: apt-get install build-essential"
services/crunch-dispatch-local
services/crunch-dispatch-slurm
services/crunch-run
- tools/keep-rsync
tools/keep-block-check
+ tools/keep-exercise
+ tools/keep-rsync
)
for g in "${gostuff[@]}"
do
value = params[parametername.to_s]
elsif parameter.has_key?(:default)
value = parameter[:default]
+ elsif [false, 'false', 0, '0'].index(parameter[:required])
+ value = nil
else
errors << [componentname, parametername, "required parameter is missing"]
next
srccollections = {}
for k,v in generatemapper.items():
+ if k.startswith("_:"):
+ if v.type == "Directory":
+ continue
+ if v.type == "CreateFile":
+ with final.open(v.target, "wb") as f:
+ f.write(v.resolved.encode("utf-8"))
+ continue
+
+ if not k.startswith("keep:"):
+ raise Exception("Output source is not in keep or a literal")
sp = k.split("/")
srccollection = sp[0][5:]
if srccollection not in srccollections:
- srccollections[srccollection] = arvados.collection.CollectionReader(
- srccollection,
- api_client=self.api,
- keep_client=self.keep_client,
- num_retries=self.num_retries)
+ try:
+ srccollections[srccollection] = arvados.collection.CollectionReader(
+ srccollection,
+ api_client=self.api,
+ keep_client=self.keep_client,
+ num_retries=self.num_retries)
+ except arvados.errors.ArgumentError as e:
+ logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+ raise
reader = srccollections[srccollection]
try:
srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
def rewrite(fileobj):
fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
- for k in ("basename", "size", "listing"):
+ for k in ("basename", "listing", "contents"):
if k in fileobj:
del fileobj[k]
"head_uuid": final_uuid, "link_class": "tag", "name": tag
}).execute(num_retries=self.num_retries)
- self.final_output_collection = final
+ def finalcollection(fileobj):
+ fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+
+ adjustDirObjs(outputObj, finalcollection)
+ adjustFileObjs(outputObj, finalcollection)
+
+ return (outputObj, final)
def set_crunch_output(self):
if self.work_api == "containers":
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
if self.output_tags is None:
self.output_tags = ""
- self.make_output_collection(self.output_name, self.output_tags, self.final_output)
+ self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
self.set_crunch_output()
if self.final_status != "success":
"kind": "tmp"
}
}
+ scheduling_parameters = {}
dirs = set()
for f in self.pathmapper.files():
partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
if partition_req:
- runtime_constraints["partition"] = aslist(partition_req["partition"])
+ scheduling_parameters["partitions"] = aslist(partition_req["partition"])
container_request["mounts"] = mounts
container_request["runtime_constraints"] = runtime_constraints
container_request["use_existing"] = kwargs.get("enable_reuse", True)
+ container_request["scheduling_parameters"] = scheduling_parameters
try:
response = self.arvrunner.api.container_requests().create(
with Perf(metrics, "arv_docker_get_image %s" % self.name):
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
+ if docker_req.get("dockerOutputDirectory"):
+ raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
else:
runtime_constraints["docker_image"] = arvados_jobs_image(self.arvrunner)
joborder_keepmount = copy.deepcopy(joborder)
def keepmount(obj):
+ if "location" not in obj:
+ raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
if obj["location"].startswith("keep:"):
obj["location"] = "/keep/" + obj["location"][5:]
if "listing" in obj:
else:
return super(ArvPathMapper, self).reversemap(target)
-class InitialWorkDirPathMapper(PathMapper):
+class StagingPathMapper(PathMapper):
+ _follow_dirs = True
def visit(self, obj, stagedir, basedir, copy=False):
# type: (Dict[unicode, Any], unicode, unicode, bool) -> None
loc = obj["location"]
+ tgt = os.path.join(stagedir, obj["basename"])
if obj["class"] == "Directory":
- self._pathmap[loc] = MapperEnt(obj["location"], stagedir, "Directory")
- self.visitlisting(obj.get("listing", []), stagedir, basedir)
+ self._pathmap[loc] = MapperEnt(loc, tgt, "Directory")
+ if loc.startswith("_:") or self._follow_dirs:
+ self.visitlisting(obj.get("listing", []), tgt, basedir)
elif obj["class"] == "File":
if loc in self._pathmap:
return
- tgt = os.path.join(stagedir, obj["basename"])
- if "contents" in obj and obj["location"].startswith("_:"):
+ if "contents" in obj and loc.startswith("_:"):
self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile")
else:
if copy:
- self._pathmap[loc] = MapperEnt(obj["path"], tgt, "WritableFile")
+ self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile")
else:
- self._pathmap[loc] = MapperEnt(obj["path"], tgt, "File")
+ self._pathmap[loc] = MapperEnt(loc, tgt, "File")
self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
+
+class InitialWorkDirPathMapper(StagingPathMapper):
def setup(self, referenced_files, basedir):
# type: (List[Any], unicode) -> None
self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
-class FinalOutputPathMapper(PathMapper):
- def visit(self, obj, stagedir, basedir, copy=False):
- # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
- loc = obj["location"]
- if obj["class"] == "Directory":
- self._pathmap[loc] = MapperEnt(loc, stagedir, "Directory")
- elif obj["class"] == "File":
- if loc in self._pathmap:
- return
- tgt = os.path.join(stagedir, obj["basename"])
- self._pathmap[loc] = MapperEnt(loc, tgt, "File")
- self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
-
+class FinalOutputPathMapper(StagingPathMapper):
+ _follow_dirs = False
def setup(self, referenced_files, basedir):
# type: (List[Any], unicode) -> None
self.visitlisting(referenced_files, self.stagedir, basedir)
import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.utils import aslist
+from cwltool.builder import substitute
import arvados.collection
import ruamel.yaml as yaml
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
+ if docker_req.get("dockerOutputDirectory"):
+ # TODO: can be supported by containers API, but not jobs API.
+ raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
def upload_instance(arvrunner, name, tool, job_order):
upload_docker(arvrunner, tool)
+ for t in tool.tool["inputs"]:
+ def setSecondary(fileobj):
+ if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+ if "secondaryFiles" not in fileobj:
+ fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+ if isinstance(fileobj, list):
+ for e in fileobj:
+ setSecondary(e)
+
+ if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+ setSecondary(job_order[shortname(t["id"])])
+
workflowmapper = upload_dependencies(arvrunner,
name,
tool.doc_loader,
# Make sure to update arvados/build/run-build-packages.sh as well
# when updating the cwltool version pin.
install_requires=[
- 'cwltool==1.0.20161007181528',
+ 'cwltool==1.0.20161107145355',
'arvados-python-client>=0.1.20160826210445'
],
data_files=[
'output_path': '/var/spool/cwl',
'container_image': '99999999999999999999999999999993+99',
'command': ['ls', '/var/spool/cwl'],
- 'cwd': '/var/spool/cwl'
+ 'cwd': '/var/spool/cwl',
+ 'scheduling_parameters': {}
})
# The test passes some fields in builder.resources
make_fs_access=make_fs_access, tmpdir="/tmp"):
j.run()
- runner.api.container_requests().create.assert_called_with(
- body={
+ call_args, call_kwargs = runner.api.container_requests().create.call_args
+
+ call_body_expected = {
'environment': {
'HOME': '/var/spool/cwl',
'TMPDIR': '/tmp'
'vcpus': 3,
'ram': 3145728000,
'keep_cache_ram': 512,
- 'API': True,
- 'partition': ['blurb']
+ 'API': True
},
'use_existing': True,
'priority': 1,
'output_path': '/var/spool/cwl',
'container_image': '99999999999999999999999999999993+99',
'command': ['ls'],
- 'cwd': '/var/spool/cwl'
- })
+ 'cwd': '/var/spool/cwl',
+ 'scheduling_parameters': {
+ 'partitions': ['blurb']
+ }
+ }
+
+ call_body = call_kwargs.get('body', None)
+ self.assertNotEqual(None, call_body)
+ for key in call_body:
+ self.assertEqual(call_body_expected.get(key), call_body.get(key))
@mock.patch("arvados.collection.Collection")
def test_done(self, col):
final.open.return_value = openmock
openmock.__enter__.return_value = cwlout
- runner.make_output_collection("Test output", "tag0,tag1,tag2", {
+ _, runner.final_output_collection = runner.make_output_collection("Test output", "tag0,tag1,tag2", {
"foo": {
"class": "File",
"location": "keep:99999999999999999999999999999991+99/foo.txt",
"bar": {
"class": "File",
"location": "keep:99999999999999999999999999999992+99/bar.txt",
- "basename": "baz.txt"
+ "basename": "baz.txt",
+ "size": 4
}
})
self.assertEqual("""{
"bar": {
"class": "File",
- "location": "baz.txt"
+ "location": "baz.txt",
+ "size": 4
},
"foo": {
"class": "File",
- "location": "foo.txt"
+ "location": "foo.txt",
+ "size": 3
}
}""", cwlout.getvalue())
// Container is an arvados#container resource.
type Container struct {
- UUID string `json:"uuid"`
- Command []string `json:"command"`
- ContainerImage string `json:"container_image"`
- Cwd string `json:"cwd"`
- Environment map[string]string `json:"environment"`
- LockedByUUID string `json:"locked_by_uuid"`
- Mounts map[string]Mount `json:"mounts"`
- Output string `json:"output"`
- OutputPath string `json:"output_path"`
- Priority int `json:"priority"`
- RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
- State ContainerState `json:"state"`
+ UUID string `json:"uuid"`
+ Command []string `json:"command"`
+ ContainerImage string `json:"container_image"`
+ Cwd string `json:"cwd"`
+ Environment map[string]string `json:"environment"`
+ LockedByUUID string `json:"locked_by_uuid"`
+ Mounts map[string]Mount `json:"mounts"`
+ Output string `json:"output"`
+ OutputPath string `json:"output_path"`
+ Priority int `json:"priority"`
+ RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
+ State ContainerState `json:"state"`
+ SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
}
// Mount is special behavior to attach to a filesystem path or device.
// CPU) and network connectivity.
type RuntimeConstraints struct {
API *bool
- RAM int `json:"ram"`
- VCPUs int `json:"vcpus"`
- KeepCacheRAM int `json:"keep_cache_ram"`
- Partition []string `json:"partition"`
+ RAM int `json:"ram"`
+ VCPUs int `json:"vcpus"`
+ KeepCacheRAM int `json:"keep_cache_ram"`
+}
+
+// SchedulingParameters specify a container's scheduling parameters
+// such as Partitions
+type SchedulingParameters struct {
+ Partitions []string `json:"partitions"`
}
// ContainerList is an arvados#containerList resource.
protected
def load_limit_offset_order_params *args
+ super
if action_name == 'index'
# Omit manifest_text from index results unless expressly selected.
@select ||= model_class.selectable_attributes - ["manifest_text"]
end
- super
end
end
serialize :mounts, Hash
serialize :runtime_constraints, Hash
serialize :command, Array
+ serialize :scheduling_parameters, Hash
before_validation :fill_field_defaults, :if => :new_record?
before_validation :set_timestamps
t.add :started_at
t.add :state
t.add :auth_uuid
+ t.add :scheduling_parameters
end
# Supported states for a container
self.mounts ||= {}
self.cwd ||= "."
self.priority ||= 1
+ self.scheduling_parameters ||= {}
end
def permission_to_create
if self.new_record?
permitted.push(:owner_uuid, :command, :container_image, :cwd,
:environment, :mounts, :output_path, :priority,
- :runtime_constraints)
+ :runtime_constraints, :scheduling_parameters)
end
case self.state
if self.runtime_constraints_changed?
self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
end
+ if self.scheduling_parameters_changed?
+ self.scheduling_parameters = self.class.deep_sort_hash(self.scheduling_parameters)
+ end
end
def handle_completed
output_path: self.output_path,
container_image: self.container_image,
mounts: self.mounts,
- runtime_constraints: self.runtime_constraints
+ runtime_constraints: self.runtime_constraints,
+ scheduling_parameters: self.scheduling_parameters
}
c = Container.create! c_attrs
retryable_requests.each do |cr|
serialize :mounts, Hash
serialize :runtime_constraints, Hash
serialize :command, Array
+ serialize :scheduling_parameters, Hash
before_validation :fill_field_defaults, :if => :new_record?
before_validation :validate_runtime_constraints
+ before_validation :validate_scheduling_parameters
before_validation :set_container
validates :command, :container_image, :output_path, :cwd, :presence => true
validate :validate_state_change
t.add :runtime_constraints
t.add :state
t.add :use_existing
+ t.add :scheduling_parameters
end
# Supported states for a container request
self.mounts ||= {}
self.cwd ||= "."
self.container_count_max ||= Rails.configuration.container_count_max
+ self.scheduling_parameters ||= {}
end
# Create a new container (or find an existing one) to satisfy this
if not reusable.nil?
reusable
else
+ c_attrs[:scheduling_parameters] = self.scheduling_parameters
Container.create!(c_attrs)
end
end
end
end
+ def validate_scheduling_parameters
+ if self.state == Committed
+ if scheduling_parameters.include? 'partitions' and
+ (!scheduling_parameters['partitions'].is_a?(Array) ||
+ scheduling_parameters['partitions'].reject{|x| !x.is_a?(String)}.size !=
+ scheduling_parameters['partitions'].size)
+ errors.add :scheduling_parameters, "partitions must be an array of strings"
+ end
+ end
+ end
+
def validate_change
permitted = [:owner_uuid]
:container_image, :cwd, :description, :environment,
:filters, :mounts, :name, :output_path, :priority,
:properties, :requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid, :use_existing
+ :state, :container_uuid, :use_existing, :scheduling_parameters
when Committed
if container_uuid.nil?
permitted.push :command, :container_image, :cwd, :description, :environment,
:filters, :mounts, :name, :output_path, :properties,
:requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid
+ :state, :container_uuid, :scheduling_parameters
end
when Final
belongs_to(:job, foreign_key: :job_uuid, primary_key: :uuid)
attr_accessor :job_readable
+ UNUSED_NODE_IP = '127.40.4.0'
+
api_accessible :user, :extend => :common do |t|
t.add :hostname
t.add :domain
end
def dns_server_update
- if self.hostname_changed? or self.ip_address_changed?
- if not self.ip_address.nil?
- stale_conflicting_nodes = Node.where('id != ? and ip_address = ? and last_ping_at < ?',self.id,self.ip_address,10.minutes.ago)
- if not stale_conflicting_nodes.empty?
- # One or more stale compute node records have the same IP address as the new node.
- # Clear the ip_address field on the stale nodes.
- stale_conflicting_nodes.each do |stale_node|
- stale_node.ip_address = nil
- stale_node.save!
- end
+ if hostname_changed? && hostname_was
+ self.class.dns_server_update(hostname_was, UNUSED_NODE_IP)
+ end
+ if hostname_changed? or ip_address_changed?
+ if ip_address
+ Node.where('id != ? and ip_address = ? and last_ping_at < ?',
+ id, ip_address, 10.minutes.ago).each do |stale_node|
+ # One or more stale compute node records have the same IP
+ # address as the new node. Clear the ip_address field on
+ # the stale nodes.
+ stale_node.ip_address = nil
+ stale_node.save!
end
end
- if self.hostname and self.ip_address
- self.class.dns_server_update(self.hostname, self.ip_address)
+ if hostname
+ self.class.dns_server_update(hostname, ip_address || UNUSED_NODE_IP)
end
end
end
if !File.exists? hostfile
n = Node.where(:slot_number => slot_number).first
if n.nil? or n.ip_address.nil?
- dns_server_update(hostname, '127.40.4.0')
+ dns_server_update(hostname, UNUSED_NODE_IP)
else
dns_server_update(hostname, n.ip_address)
end
--- /dev/null
+class AddSchedulingParametersToContainer < ActiveRecord::Migration
+ def change
+ add_column :containers, :scheduling_parameters, :text
+ add_column :container_requests, :scheduling_parameters, :text
+ end
+end
filters text,
updated_at timestamp without time zone NOT NULL,
container_count integer DEFAULT 0,
- use_existing boolean DEFAULT true
+ use_existing boolean DEFAULT true,
+ scheduling_parameters text
);
updated_at timestamp without time zone NOT NULL,
exit_code integer,
auth_uuid character varying(255),
- locked_by_uuid character varying(255)
+ locked_by_uuid character varying(255),
+ scheduling_parameters text
);
INSERT INTO schema_migrations (version) VALUES ('20160926194129');
-INSERT INTO schema_migrations (version) VALUES ('20161019171346');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20161019171346');
+
+INSERT INTO schema_migrations (version) VALUES ('20161111143147');
\ No newline at end of file
hostname: compute3
slot_number: ~
domain: ""
- ip_address: 172.17.2.173
+ ip_address: 172.17.2.174
last_ping_at: <%= 1.hour.ago.to_s(:db) %>
first_ping_at: <%= 23.hour.ago.to_s(:db) %>
job_uuid: ~
owner_uuid: zzzzz-tpzed-000000000000000
hostname: ~
slot_number: ~
- ip_address: 172.17.2.173
+ ip_address: 172.17.2.175
last_ping_at: ~
first_ping_at: ~
job_uuid: ~
owner_uuid: zzzzz-tpzed-000000000000000
hostname: custom1
slot_number: 23
- ip_address: 172.17.2.173
+ ip_address: 172.17.2.176
last_ping_at: ~
first_ping_at: ~
job_uuid: ~
end
end
+ test 'index without select returns everything except manifest' do
+ authorize_with :active
+ get :index
+ assert_response :success
+ assert json_response['items'].any?
+ json_response['items'].each do |coll|
+ assert_includes(coll.keys, 'uuid')
+ assert_includes(coll.keys, 'name')
+ assert_includes(coll.keys, 'created_at')
+ refute_includes(coll.keys, 'manifest_text')
+ end
+ end
+
+ ['', nil, false, 'null'].each do |select|
+ test "index with select=#{select.inspect} returns everything except manifest" do
+ authorize_with :active
+ get :index, select: select
+ assert_response :success
+ assert json_response['items'].any?
+ json_response['items'].each do |coll|
+ assert_includes(coll.keys, 'uuid')
+ assert_includes(coll.keys, 'name')
+ assert_includes(coll.keys, 'created_at')
+ refute_includes(coll.keys, 'manifest_text')
+ end
+ end
+ end
+
+ [["uuid"],
+ ["uuid", "manifest_text"],
+ '["uuid"]',
+ '["uuid", "manifest_text"]'].each do |select|
+ test "index with select=#{select.inspect} returns no name" do
+ authorize_with :active
+ get :index, select: select
+ assert_response :success
+ assert json_response['items'].any?
+ json_response['items'].each do |coll|
+ refute_includes(coll.keys, 'name')
+ end
+ end
+ end
+
[0,1,2].each do |limit|
test "get index with limit=#{limit}" do
authorize_with :active
end
end
end
+
+ [
+ [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+ [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Uncommitted],
+ [{"partitions" => "fastcpu"}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+ [{"partitions" => "fastcpu"}, ContainerRequest::Uncommitted],
+ [{"partitions" => ["fastcpu","vfastcpu"]}, ContainerRequest::Committed],
+ ].each do |sp, state, expected|
+ test "create container request with scheduling_parameters #{sp} in state #{state} and verify #{expected}" do
+ common_attrs = {cwd: "test",
+ priority: 1,
+ command: ["echo", "hello"],
+ output_path: "test",
+ scheduling_parameters: sp,
+ mounts: {"test" => {"kind" => "json"}}}
+ set_user_from_auth :active
+
+ if expected == ActiveRecord::RecordInvalid
+ assert_raises(ActiveRecord::RecordInvalid) do
+ create_minimal_req!(common_attrs.merge({state: state}))
+ end
+ else
+ cr = create_minimal_req!(common_attrs.merge({state: state}))
+ assert_equal sp, cr.scheduling_parameters
+
+ if state == ContainerRequest::Committed
+ c = Container.find_by_uuid(cr.container_uuid)
+ assert_equal sp, c.scheduling_parameters
+ end
+ end
+ end
+ end
end
refute_nil node2.slot_number
assert_equal "custom1", node2.hostname
end
+
+ test "update dns when nodemanager clears hostname and ip_address" do
+ act_as_system_user do
+ node = ping_node(:new_with_custom_hostname, {})
+ Node.expects(:dns_server_update).with(node.hostname, Node::UNUSED_NODE_IP)
+ node.update_attributes(hostname: nil, ip_address: nil)
+ end
+ end
+
+ test "update dns when hostname changes" do
+ act_as_system_user do
+ node = ping_node(:new_with_custom_hostname, {})
+
+ Node.expects(:dns_server_update).with(node.hostname, Node::UNUSED_NODE_IP)
+ Node.expects(:dns_server_update).with('foo0', node.ip_address)
+ node.update_attributes!(hostname: 'foo0')
+
+ Node.expects(:dns_server_update).with('foo0', Node::UNUSED_NODE_IP)
+ node.update_attributes!(hostname: nil, ip_address: nil)
+
+ Node.expects(:dns_server_update).with('foo0', '10.11.12.13')
+ node.update_attributes!(hostname: 'foo0', ip_address: '10.11.12.13')
+
+ Node.expects(:dns_server_update).with('foo0', '10.11.12.14')
+ node.update_attributes!(hostname: 'foo0', ip_address: '10.11.12.14')
+ end
+ end
end
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
- if container.RuntimeConstraints.Partition != nil {
- sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.RuntimeConstraints.Partition, ",")))
+ if container.SchedulingParameters.Partitions != nil {
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
return exec.Command("sbatch", sbatchArgs...)
func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
theConfig.SbatchArguments = nil
- container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1, Partition: []string{"blurb", "b2"}}}
+ container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1}, SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}}}
sbatchCmd := sbatchFunc(container)
var expected []string
import (
"bytes"
+ "context"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
+ "net/http"
"os"
"regexp"
"strconv"
"sync"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/curoverse/azure-sdk-for-go/storage"
)
+const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
+
var (
azureMaxGetBytes int
azureStorageAccountName string
ContainerName string
AzureReplication int
ReadOnly bool
+ RequestTimeout arvados.Duration
azClient storage.Client
bsClient storage.BlobStorageClient
StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
ContainerName: "example-container-name",
AzureReplication: 3,
+ RequestTimeout: azureDefaultRequestTimeout,
},
}
}
if err != nil {
return fmt.Errorf("creating Azure storage client: %s", err)
}
+
+ if v.RequestTimeout == 0 {
+ v.RequestTimeout = azureDefaultRequestTimeout
+ }
+ v.azClient.HTTPClient = &http.Client{
+ Timeout: time.Duration(v.RequestTimeout),
+ }
v.bsClient = v.azClient.GetBlobService()
ok, err := v.bsClient.ContainerExists(v.ContainerName)
// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a PutBlob operation is in progress, and
// wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
+func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
trashed, _, err := v.checkTrashed(loc)
if err != nil {
return 0, err
}
// Compare the given data with existing stored data.
-func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
trashed, _, err := v.checkTrashed(loc)
if err != nil {
return err
return v.translateError(err)
}
defer rdr.Close()
- return compareReaderWithBuf(rdr, expect, loc[:32])
+ return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
}
// Put stores a Keep block as a block blob in the container.
-func (v *AzureBlobVolume) Put(loc string, block []byte) error {
+func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
if v.ReadOnly {
return MethodDisabledError
}
import (
"bytes"
+ "context"
"crypto/md5"
"encoding/base64"
"encoding/xml"
data[i] = byte((i + 7) & 0xff)
}
hash := fmt.Sprintf("%x", md5.Sum(data))
- err := v.Put(hash, data)
+ err := v.Put(context.Background(), hash, data)
if err != nil {
t.Error(err)
}
gotData := make([]byte, len(data))
- gotLen, err := v.Get(hash, gotData)
+ gotLen, err := v.Get(context.Background(), hash, gotData)
if err != nil {
t.Error(err)
}
allDone := make(chan struct{})
v.azHandler.race = make(chan chan struct{})
go func() {
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.Background(), TestHash, TestBlock)
if err != nil {
t.Error(err)
}
v.azHandler.race <- continuePut
go func() {
buf := make([]byte, len(TestBlock))
- _, err := v.Get(TestHash, buf)
+ _, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
}
go func() {
defer close(allDone)
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
return
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"io"
return <-outcome
}
-func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
+func compareReaderWithBuf(ctx context.Context, rdr io.Reader, expect []byte, hash string) error {
bufLen := 1 << 20
if bufLen > len(expect) && len(expect) > 0 {
// No need for bufLen to be longer than
// expected to equal the next N bytes read from
// rdr.
for {
- n, err := rdr.Read(buf)
+ ready := make(chan bool)
+ var n int
+ var err error
+ go func() {
+ n, err = rdr.Read(buf)
+ close(ready)
+ }()
+ select {
+ case <-ready:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
}
)
type Config struct {
+ Debug bool
Listen string
PIDFile string
blobSigningKey []byte
systemAuthToken string
+ debugLogf func(string, ...interface{})
}
var theConfig = DefaultConfig()
// Start should be called exactly once: after setting all public
// fields, and before using the config.
func (cfg *Config) Start() error {
+ if cfg.Debug {
+ cfg.debugLogf = log.Printf
+ cfg.debugLogf("debugging enabled")
+ } else {
+ cfg.debugLogf = func(string, ...interface{}) {}
+ }
+
if cfg.MaxBuffers < 0 {
return fmt.Errorf("MaxBuffers must be greater than zero")
}
--- /dev/null
+package main
+
+import (
+ "log"
+)
+
+func init() {
+ theConfig.debugLogf = log.Printf
+}
import (
"bytes"
+ "context"
"encoding/json"
"fmt"
"net/http"
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- if err := vols[0].Put(TestHash, TestBlock); err != nil {
+ if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, TestBlock)
- vols[1].Put(TestHash2, TestBlock2)
- vols[0].Put(TestHash+".meta", []byte("metadata"))
- vols[1].Put(TestHash2+".meta", []byte("metadata"))
+ vols[0].Put(context.Background(), TestHash, TestBlock)
+ vols[1].Put(context.Background(), TestHash2, TestBlock2)
+ vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
+ vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
theConfig.systemAuthToken = "DATA MANAGER TOKEN"
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, TestBlock)
+ vols[0].Put(context.Background(), TestHash, TestBlock)
// Explicitly set the BlobSignatureTTL to 0 for these
// tests, to ensure the MockVolume deletes the blocks
}
// Confirm the block has been deleted
buf := make([]byte, BlockSize)
- _, err := vols[0].Get(TestHash, buf)
+ _, err := vols[0].Get(context.Background(), TestHash, buf)
var blockDeleted = os.IsNotExist(err)
if !blockDeleted {
t.Error("superuserExistingBlockReq: block not deleted")
// A DELETE request on a block newer than BlobSignatureTTL
// should return success but leave the block on the volume.
- vols[0].Put(TestHash, TestBlock)
+ vols[0].Put(context.Background(), TestHash, TestBlock)
theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
response = IssueRequest(superuserExistingBlockReq)
expectedDc, responseDc)
}
// Confirm the block has NOT been deleted.
- _, err = vols[0].Get(TestHash, buf)
+ _, err = vols[0].Get(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("testing delete on new block: %s\n", err)
}
KeepVM = MakeTestVolumeManager(2)
defer KeepVM.Close()
- if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+ if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- if err := vols[0].Put(TestHash, TestBlock); err != nil {
+ if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
KeepVM = MakeTestVolumeManager(2)
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, TestBlock)
+ vols[0].Put(context.Background(), TestHash, TestBlock)
theConfig.systemAuthToken = "DATA MANAGER TOKEN"
import (
"container/list"
+ "context"
"crypto/md5"
"encoding/json"
"fmt"
// GetBlockHandler is a HandleFunc to address Get block requests.
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
+ ctx, cancel := contextForResponse(context.TODO(), resp)
+ defer cancel()
+
if theConfig.RequireSignatures {
locator := req.URL.Path[1:] // strip leading slash
if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
// isn't here, we can return 404 now instead of waiting for a
// buffer.
- buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
+ buf, err := getBufferWithContext(ctx, bufs, BlockSize)
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
defer bufs.Put(buf)
- size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
+ size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
if err != nil {
code := http.StatusInternalServerError
if err, ok := err.(*KeepError); ok {
resp.Write(buf[:size])
}
+// Return a new context that gets cancelled by resp's CloseNotifier.
+func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
+ ctx, cancel := context.WithCancel(parent)
+ if cn, ok := resp.(http.CloseNotifier); ok {
+ go func(c <-chan bool) {
+ select {
+ case <-c:
+ theConfig.debugLogf("cancel context")
+ cancel()
+ case <-ctx.Done():
+ }
+ }(cn.CloseNotify())
+ }
+ return ctx, cancel
+}
+
// Get a buffer from the pool -- but give up and return a non-nil
-// error if resp implements http.CloseNotifier and tells us that the
-// client has disconnected before we get a buffer.
-func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
- var closeNotifier <-chan bool
- if resp, ok := resp.(http.CloseNotifier); ok {
- closeNotifier = resp.CloseNotify()
- }
- var buf []byte
+// error if ctx ends before we get a buffer.
+func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
bufReady := make(chan []byte)
go func() {
bufReady <- bufs.Get(bufSize)
- close(bufReady)
}()
select {
- case buf = <-bufReady:
+ case buf := <-bufReady:
return buf, nil
- case <-closeNotifier:
+ case <-ctx.Done():
go func() {
// Even if closeNotifier happened first, we
// need to keep waiting for our buf so we can
// PutBlockHandler is a HandleFunc to address Put block requests.
func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+ ctx, cancel := contextForResponse(context.TODO(), resp)
+ defer cancel()
+
hash := mux.Vars(req)["hash"]
// Detect as many error conditions as possible before reading
return
}
- buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
+ buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
return
}
- replication, err := PutBlock(buf, hash)
+ replication, err := PutBlock(ctx, buf, hash)
bufs.Put(buf)
if err != nil {
- ke := err.(*KeepError)
- http.Error(resp, ke.Error(), ke.HTTPCode)
+ code := http.StatusInternalServerError
+ if err, ok := err.(*KeepError); ok {
+ code = err.HTTPCode
+ }
+ http.Error(resp, err.Error(), code)
return
}
// If the block found does not have the correct MD5 hash, returns
// DiskHashError.
//
-func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
// Attempt to read the requested hash from a keep volume.
errorToCaller := NotFoundError
for _, vol := range KeepVM.AllReadable() {
- size, err := vol.Get(hash, buf)
+ size, err := vol.Get(ctx, hash, buf)
+ select {
+ case <-ctx.Done():
+ return 0, ErrClientDisconnect
+ default:
+ }
if err != nil {
// IsNotExist is an expected error and may be
// ignored. All other errors are logged. In
// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
//
-// PutBlock(block, hash)
+// PutBlock(ctx, block, hash)
// Stores the BLOCK (identified by the content id HASH) in Keep.
//
// The MD5 checksum of the block must be identical to the content id HASH.
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+ if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
return n, err
+ } else if ctx.Err() != nil {
+ return 0, ErrClientDisconnect
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if vol := KeepVM.NextWritable(); vol != nil {
- if err := vol.Put(hash, block); err == nil {
+ if err := vol.Put(ctx, hash, block); err == nil {
return vol.Replication(), nil // success!
}
+ if ctx.Err() != nil {
+ return 0, ErrClientDisconnect
+ }
}
writables := KeepVM.AllWritable()
allFull := true
for _, vol := range writables {
- err := vol.Put(hash, block)
+ err := vol.Put(ctx, hash, block)
+ if ctx.Err() != nil {
+ return 0, ErrClientDisconnect
+ }
if err == nil {
return vol.Replication(), nil // success!
}
// the relevant block's modification time in order to protect it from
// premature garbage collection. Otherwise, it returns a non-nil
// error.
-func CompareAndTouch(hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
var bestErr error = NotFoundError
for _, vol := range KeepVM.AllWritable() {
- if err := vol.Compare(hash, buf); err == CollisionError {
+ err := vol.Compare(ctx, hash, buf)
+ if ctx.Err() != nil {
+ return 0, ctx.Err()
+ } else if err == CollisionError {
// Stop if we have a block with same hash but
// different content. (It will be impossible
// to tell which one is wanted if we have
import (
"bytes"
+ "context"
)
// A TestableVolumeManagerFactory creates a volume manager with at least two TestableVolume instances.
// Get should pass
buf := make([]byte, len(testBlock))
- n, err := GetBlock(testHash, buf, nil)
+ n, err := GetBlock(context.Background(), testHash, buf, nil)
if err != nil {
t.Fatalf("Error while getting block %s", err)
}
// Get should fail
buf := make([]byte, BlockSize)
- size, err := GetBlock(testHash, buf, nil)
+ size, err := GetBlock(context.Background(), testHash, buf, nil)
if err == nil {
t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash)
}
setupHandlersWithGenericVolumeTest(t, factory)
// PutBlock
- if _, err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(context.Background(), testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock: %s", err)
}
// Check that PutBlock succeeds again even after CompareAndTouch
- if _, err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(context.Background(), testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock: %s", err)
}
// Check that PutBlock stored the data as expected
buf := make([]byte, BlockSize)
- size, err := GetBlock(testHash, buf, nil)
+ size, err := GetBlock(context.Background(), testHash, buf, nil)
if err != nil {
t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
} else if bytes.Compare(buf[:size], testBlock) != 0 {
testableVolumes[1].PutRaw(testHash, badData)
// Check that PutBlock with good data succeeds
- if _, err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(context.Background(), testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock for %q: %s", testHash, err)
}
// Put succeeded and overwrote the badData in one volume,
// and Get should return the testBlock now, ignoring the bad data.
buf := make([]byte, BlockSize)
- size, err := GetBlock(testHash, buf, nil)
+ size, err := GetBlock(context.Background(), testHash, buf, nil)
if err != nil {
t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
} else if bytes.Compare(buf[:size], testBlock) != 0 {
import (
"bytes"
+ "context"
"fmt"
"io/ioutil"
"os"
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- if err := vols[1].Put(TestHash, TestBlock); err != nil {
+ if err := vols[1].Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
// Check that GetBlock returns success.
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.Background(), TestHash, buf, nil)
if err != nil {
t.Errorf("GetBlock error: %s", err)
}
// Check that GetBlock returns failure.
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.Background(), TestHash, buf, nil)
if err != NotFoundError {
t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
}
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- vols[0].Put(TestHash, BadBlock)
+ vols[0].Put(context.Background(), TestHash, BadBlock)
// Check that GetBlock returns failure.
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.Background(), TestHash, buf, nil)
if err != DiskHashError {
t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
}
defer KeepVM.Close()
// Check that PutBlock stores the data as expected.
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols := KeepVM.AllReadable()
buf := make([]byte, BlockSize)
- n, err := vols[1].Get(TestHash, buf)
+ n, err := vols[1].Get(context.Background(), TestHash, buf)
if err != nil {
t.Fatalf("Volume #0 Get returned error: %v", err)
}
vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
t.Fatalf("PutBlock: n %d err %v", n, err)
}
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.Background(), TestHash, buf, nil)
if err != nil {
t.Fatalf("GetBlock: %v", err)
}
// Check that PutBlock returns the expected error when the hash does
// not match the block.
- if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
+ if _, err := PutBlock(context.Background(), BadBlock, TestHash); err != RequestHashError {
t.Errorf("Expected RequestHashError, got %v", err)
}
// Confirm that GetBlock fails to return anything.
- if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
+ if result, err := GetBlock(context.Background(), TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
string(result), err)
}
// Store a corrupted block under TestHash.
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, BadBlock)
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ vols[0].Put(context.Background(), TestHash, BadBlock)
+ if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
t.Errorf("PutBlock: n %d err %v", n, err)
}
// The block on disk should now match TestBlock.
buf := make([]byte, BlockSize)
- if size, err := GetBlock(TestHash, buf, nil); err != nil {
+ if size, err := GetBlock(context.Background(), TestHash, buf, nil); err != nil {
t.Errorf("GetBlock: %v", err)
} else if bytes.Compare(buf[:size], TestBlock) != 0 {
t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
// Store one block, then attempt to store the other. Confirm that
// PutBlock reported a CollisionError.
- if _, err := PutBlock(b1, locator); err != nil {
+ if _, err := PutBlock(context.Background(), b1, locator); err != nil {
t.Error(err)
}
- if _, err := PutBlock(b2, locator); err == nil {
+ if _, err := PutBlock(context.Background(), b2, locator); err == nil {
t.Error("PutBlock did not report a collision")
} else if err != CollisionError {
t.Errorf("PutBlock returned %v", err)
// Store a block and then make the underlying volume bad,
// so a subsequent attempt to update the file timestamp
// will fail.
- vols[0].Put(TestHash, BadBlock)
+ vols[0].Put(context.Background(), TestHash, BadBlock)
oldMtime, err := vols[0].Mtime(TestHash)
if err != nil {
t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
// vols[0].Touch will fail on the next call, so the volume
// manager will store a copy on vols[1] instead.
vols[0].(*MockVolume).Touchable = false
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols[0].(*MockVolume).Touchable = true
oldMtime, newMtime)
}
buf := make([]byte, BlockSize)
- n, err := vols[1].Get(TestHash, buf)
+ n, err := vols[1].Get(context.Background(), TestHash, buf)
if err != nil {
t.Fatalf("vols[1]: %v", err)
}
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- vols[0].Put(TestHash, TestBlock)
- vols[1].Put(TestHash2, TestBlock2)
- vols[0].Put(TestHash3, TestBlock3)
- vols[0].Put(TestHash+".meta", []byte("metadata"))
- vols[1].Put(TestHash2+".meta", []byte("metadata"))
+ vols[0].Put(context.Background(), TestHash, TestBlock)
+ vols[1].Put(context.Background(), TestHash2, TestBlock2)
+ vols[0].Put(context.Background(), TestHash3, TestBlock3)
+ vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
+ vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
buf := new(bytes.Buffer)
vols[0].IndexTo("", buf)
package main
import (
+ "context"
"crypto/rand"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
// Put block
var PutContent = func(content []byte, locator string) (err error) {
- _, err = PutBlock(content, locator)
+ _, err = PutBlock(context.Background(), content, locator)
return
}
package main
import (
+ "bytes"
+ "context"
"encoding/base64"
"encoding/hex"
"flag"
"fmt"
"io"
+ "io/ioutil"
"log"
"net/http"
"os"
"github.com/AdRoll/goamz/s3"
)
+const (
+ s3DefaultReadTimeout = arvados.Duration(10 * time.Minute)
+ s3DefaultConnectTimeout = arvados.Duration(time.Minute)
+)
+
var (
// ErrS3TrashDisabled is returned by Trash if that operation
// is impossible with the current config.
LocationConstraint bool
IndexPageSize int
S3Replication int
+ ConnectTimeout arvados.Duration
+ ReadTimeout arvados.Duration
RaceWindow arvados.Duration
ReadOnly bool
UnsafeDelete bool
func (*S3Volume) Examples() []Volume {
return []Volume{
&S3Volume{
- AccessKeyFile: "/etc/aws_s3_access_key.txt",
- SecretKeyFile: "/etc/aws_s3_secret_key.txt",
- Endpoint: "",
- Region: "us-east-1",
- Bucket: "example-bucket-name",
- IndexPageSize: 1000,
- S3Replication: 2,
- RaceWindow: arvados.Duration(24 * time.Hour),
+ AccessKeyFile: "/etc/aws_s3_access_key.txt",
+ SecretKeyFile: "/etc/aws_s3_secret_key.txt",
+ Endpoint: "",
+ Region: "us-east-1",
+ Bucket: "example-bucket-name",
+ IndexPageSize: 1000,
+ S3Replication: 2,
+ RaceWindow: arvados.Duration(24 * time.Hour),
+ ConnectTimeout: arvados.Duration(time.Minute),
+ ReadTimeout: arvados.Duration(5 * time.Minute),
},
&S3Volume{
- AccessKeyFile: "/etc/gce_s3_access_key.txt",
- SecretKeyFile: "/etc/gce_s3_secret_key.txt",
- Endpoint: "https://storage.googleapis.com",
- Region: "",
- Bucket: "example-bucket-name",
- IndexPageSize: 1000,
- S3Replication: 2,
- RaceWindow: arvados.Duration(24 * time.Hour),
+ AccessKeyFile: "/etc/gce_s3_access_key.txt",
+ SecretKeyFile: "/etc/gce_s3_secret_key.txt",
+ Endpoint: "https://storage.googleapis.com",
+ Region: "",
+ Bucket: "example-bucket-name",
+ IndexPageSize: 1000,
+ S3Replication: 2,
+ RaceWindow: arvados.Duration(24 * time.Hour),
+ ConnectTimeout: arvados.Duration(time.Minute),
+ ReadTimeout: arvados.Duration(5 * time.Minute),
},
}
}
if err != nil {
return err
}
+
+ // Zero timeouts mean "wait forever", which is a bad
+ // default. Default to long timeouts instead.
+ if v.ConnectTimeout == 0 {
+ v.ConnectTimeout = s3DefaultConnectTimeout
+ }
+ if v.ReadTimeout == 0 {
+ v.ReadTimeout = s3DefaultReadTimeout
+ }
+
+ client := s3.New(auth, region)
+ client.ConnectTimeout = time.Duration(v.ConnectTimeout)
+ client.ReadTimeout = time.Duration(v.ReadTimeout)
v.bucket = &s3.Bucket{
- S3: s3.New(auth, region),
+ S3: client,
Name: v.Bucket,
}
return nil
}
+func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+ ready := make(chan bool)
+ go func() {
+ rdr, err = v.getReader(loc)
+ close(ready)
+ }()
+ select {
+ case <-ready:
+ return
+ case <-ctx.Done():
+ theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
+ go func() {
+ <-ready
+ if err == nil {
+ rdr.Close()
+ }
+ }()
+ return nil, ctx.Err()
+ }
+}
+
// getReader wraps (Bucket)GetReader.
//
// In situations where (Bucket)GetReader would fail because the block
// Get a block: copy the block data into buf, and return the number of
// bytes copied.
-func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
- rdr, err := v.getReader(loc)
+func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+ rdr, err := v.getReaderWithContext(ctx, loc)
if err != nil {
return 0, err
}
- defer rdr.Close()
- n, err := io.ReadFull(rdr, buf)
- switch err {
- case nil, io.EOF, io.ErrUnexpectedEOF:
- return n, nil
- default:
- return 0, v.translateError(err)
+
+ var n int
+ ready := make(chan bool)
+ go func() {
+ defer close(ready)
+
+ defer rdr.Close()
+ n, err = io.ReadFull(rdr, buf)
+
+ switch err {
+ case nil, io.EOF, io.ErrUnexpectedEOF:
+ err = nil
+ default:
+ err = v.translateError(err)
+ }
+ }()
+ select {
+ case <-ctx.Done():
+ theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
+ rdr.Close()
+ // Must wait for ReadFull to return, to ensure it
+ // doesn't write to buf after we return.
+ theConfig.debugLogf("s3: waiting for ReadFull() to fail")
+ <-ready
+ return 0, ctx.Err()
+ case <-ready:
+ return n, err
}
}
// Compare the given data with the stored data.
-func (v *S3Volume) Compare(loc string, expect []byte) error {
- rdr, err := v.getReader(loc)
+func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
+ rdr, err := v.getReaderWithContext(ctx, loc)
if err != nil {
return err
}
defer rdr.Close()
- return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
+ return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
}
// Put writes a block.
-func (v *S3Volume) Put(loc string, block []byte) error {
+func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
if v.ReadOnly {
return MethodDisabledError
}
var opts s3.Options
- if len(block) > 0 {
+ size := len(block)
+ if size > 0 {
md5, err := hex.DecodeString(loc)
if err != nil {
return err
}
opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
}
- err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
- if err != nil {
+
+ // Send the block data through a pipe, so that (if we need to)
+ // we can close the pipe early and abandon our PutReader()
+ // goroutine, without worrying about PutReader() accessing our
+ // block buffer after we release it.
+ bufr, bufw := io.Pipe()
+ go func() {
+ io.Copy(bufw, bytes.NewReader(block))
+ bufw.Close()
+ }()
+
+ var err error
+ ready := make(chan bool)
+ go func() {
+ defer func() {
+ if ctx.Err() != nil {
+ theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+ }
+ }()
+ defer close(ready)
+ err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
+ if err != nil {
+ return
+ }
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ }()
+ select {
+ case <-ctx.Done():
+ theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
+ // Our pipe might be stuck in Write(), waiting for
+ // io.Copy() to read. If so, un-stick it. This means
+ // PutReader will get corrupt data, but that's OK: the
+ // size and MD5 won't match, so the write will fail.
+ go io.Copy(ioutil.Discard, bufr)
+ // CloseWithError() will return once pending I/O is done.
+ bufw.CloseWithError(ctx.Err())
+ theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
+ return ctx.Err()
+ case <-ready:
return v.translateError(err)
}
- err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
- return v.translateError(err)
}
// Touch sets the timestamp for the given locator to the current time.
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"io/ioutil"
// Check canGet
loc, blk := setupScenario()
buf := make([]byte, len(blk))
- _, err := v.Get(loc, buf)
+ _, err := v.Get(context.Background(), loc, buf)
c.Check(err == nil, check.Equals, scenario.canGet)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
loc, blk = setupScenario()
err = v.Trash(loc)
c.Check(err == nil, check.Equals, scenario.canTrash)
- _, err = v.Get(loc, buf)
+ _, err = v.Get(context.Background(), loc, buf)
c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
// should be able to Get after Untrash --
// regardless of timestamps, errors, race
// conditions, etc.
- _, err = v.Get(loc, buf)
+ _, err = v.Get(context.Background(), loc, buf)
c.Check(err, check.IsNil)
}
// Check for current Mtime after Put (applies to all
// scenarios)
loc, blk = setupScenario()
- err = v.Put(loc, blk)
+ err = v.Put(context.Background(), loc, blk)
c.Check(err, check.IsNil)
t, err := v.Mtime(loc)
c.Check(err, check.IsNil)
import (
"container/list"
+ "context"
"testing"
"time"
)
// Put test content
vols := KeepVM.AllWritable()
if testData.CreateData {
- vols[0].Put(testData.Locator1, testData.Block1)
- vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
+ vols[0].Put(context.Background(), testData.Locator1, testData.Block1)
+ vols[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
if testData.CreateInVolume1 {
- vols[0].Put(testData.Locator2, testData.Block2)
- vols[0].Put(testData.Locator2+".meta", []byte("metadata"))
+ vols[0].Put(context.Background(), testData.Locator2, testData.Block2)
+ vols[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
} else {
- vols[1].Put(testData.Locator2, testData.Block2)
- vols[1].Put(testData.Locator2+".meta", []byte("metadata"))
+ vols[1].Put(context.Background(), testData.Locator2, testData.Block2)
+ vols[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
}
}
// Verify Locator1 to be un/deleted as expected
buf := make([]byte, BlockSize)
- size, err := GetBlock(testData.Locator1, buf, nil)
+ size, err := GetBlock(context.Background(), testData.Locator1, buf, nil)
if testData.ExpectLocator1 {
if size == 0 || err != nil {
t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
// Verify Locator2 to be un/deleted as expected
if testData.Locator1 != testData.Locator2 {
- size, err = GetBlock(testData.Locator2, buf, nil)
+ size, err = GetBlock(context.Background(), testData.Locator2, buf, nil)
if testData.ExpectLocator2 {
if size == 0 || err != nil {
t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
locatorFoundIn := 0
for _, volume := range KeepVM.AllReadable() {
buf := make([]byte, BlockSize)
- if _, err := volume.Get(testData.Locator1, buf); err == nil {
+ if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
locatorFoundIn = locatorFoundIn + 1
}
}
package main
import (
+ "context"
"io"
"sync/atomic"
"time"
// any of the data.
//
// len(buf) will not exceed BlockSize.
- Get(loc string, buf []byte) (int, error)
+ Get(ctx context.Context, loc string, buf []byte) (int, error)
// Compare the given data with the stored data (i.e., what Get
// would return). If equal, return nil. If not, return
// CollisionError or DiskHashError (depending on whether the
// data on disk matches the expected hash), or whatever error
// was encountered opening/reading the stored data.
- Compare(loc string, data []byte) error
+ Compare(ctx context.Context, loc string, data []byte) error
// Put writes a block to an underlying storage device.
//
//
// Put should not verify that loc==hash(block): this is the
// caller's responsibility.
- Put(loc string, block []byte) error
+ Put(ctx context.Context, loc string, block []byte) error
// Touch sets the timestamp for the given locator to the
// current time.
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"os"
v.PutRaw(TestHash, TestBlock)
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Fatal(err)
}
defer v.Teardown()
buf := make([]byte, BlockSize)
- if _, err := v.Get(TestHash2, buf); err == nil {
+ if _, err := v.Get(context.Background(), TestHash2, buf); err == nil {
t.Errorf("Expected error while getting non-existing block %v", TestHash2)
}
}
v := factory(t)
defer v.Teardown()
- err := v.Compare(TestHash, TestBlock)
+ err := v.Compare(context.Background(), TestHash, TestBlock)
if err != os.ErrNotExist {
t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
}
v.PutRaw(testHash, testData)
// Compare the block locator with same content
- err := v.Compare(testHash, testData)
+ err := v.Compare(context.Background(), testHash, testData)
if err != nil {
t.Errorf("Got err %q, expected nil", err)
}
v.PutRaw(testHash, testDataA)
// Compare the block locator with different content; collision
- err := v.Compare(TestHash, testDataB)
+ err := v.Compare(context.Background(), TestHash, testDataB)
if err == nil {
t.Errorf("Got err nil, expected error due to collision")
}
v.PutRaw(TestHash, testDataB)
- err := v.Compare(testHash, testDataA)
+ err := v.Compare(context.Background(), testHash, testDataA)
if err == nil || err == CollisionError {
t.Errorf("Got err %+v, expected non-collision error", err)
}
return
}
- err := v.Put(testHash, testData)
+ err := v.Put(context.Background(), testHash, testData)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
}
- err = v.Put(testHash, testData)
+ err = v.Put(context.Background(), testHash, testData)
if err != nil {
t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
}
v.PutRaw(testHash, testDataA)
- putErr := v.Put(testHash, testDataB)
+ putErr := v.Put(context.Background(), testHash, testDataB)
buf := make([]byte, BlockSize)
- n, getErr := v.Get(testHash, buf)
+ n, getErr := v.Get(context.Background(), testHash, buf)
if putErr == nil {
// Put must not return a nil error unless it has
// overwritten the existing data.
return
}
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.Background(), TestHash, TestBlock)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
}
- err = v.Put(TestHash2, TestBlock2)
+ err = v.Put(context.Background(), TestHash2, TestBlock2)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
}
- err = v.Put(TestHash3, TestBlock3)
+ err = v.Put(context.Background(), TestHash3, TestBlock3)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
}
data := make([]byte, BlockSize)
- n, err := v.Get(TestHash, data)
+ n, err := v.Get(context.Background(), TestHash, data)
if err != nil {
t.Error(err)
} else {
}
}
- n, err = v.Get(TestHash2, data)
+ n, err = v.Get(context.Background(), TestHash2, data)
if err != nil {
t.Error(err)
} else {
}
}
- n, err = v.Get(TestHash3, data)
+ n, err = v.Get(context.Background(), TestHash3, data)
if err != nil {
t.Error(err)
} else {
return
}
- if err := v.Put(TestHash, TestBlock); err != nil {
+ if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
}
// Write the same block again.
- if err := v.Put(TestHash, TestBlock); err != nil {
+ if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
return
}
- v.Put(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
data := make([]byte, BlockSize)
- n, err := v.Get(TestHash, data)
+ n, err := v.Get(context.Background(), TestHash, data)
if err != nil {
t.Error(err)
} else if bytes.Compare(data[:n], TestBlock) != 0 {
return
}
- v.Put(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
data := make([]byte, BlockSize)
- if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
+ if _, err := v.Get(context.Background(), TestHash, data); err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
t.Fatalf("os.IsNotExist(%v) should have been true", err)
}
- err = v.Compare(TestHash, TestBlock)
+ err = v.Compare(context.Background(), TestHash, TestBlock)
if err == nil || !os.IsNotExist(err) {
t.Fatalf("os.IsNotExist(%v) should have been true", err)
}
buf := make([]byte, BlockSize)
// Get from read-only volume should succeed
- _, err := v.Get(TestHash, buf)
+ _, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("got err %v, expected nil", err)
}
// Put a new block to read-only volume should result in error
- err = v.Put(TestHash2, TestBlock2)
+ err = v.Put(context.Background(), TestHash2, TestBlock2)
if err == nil {
t.Errorf("Expected error when putting block in a read-only volume")
}
- _, err = v.Get(TestHash2, buf)
+ _, err = v.Get(context.Background(), TestHash2, buf)
if err == nil {
t.Errorf("Expected error when getting block whose put in read-only volume failed")
}
}
// Overwriting an existing block in read-only volume should result in error
- err = v.Put(TestHash, TestBlock)
+ err = v.Put(context.Background(), TestHash, TestBlock)
if err == nil {
t.Errorf("Expected error when putting block in a read-only volume")
}
sem := make(chan int)
go func() {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("err1: %v", err)
}
go func() {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash2, buf)
+ n, err := v.Get(context.Background(), TestHash2, buf)
if err != nil {
t.Errorf("err2: %v", err)
}
go func() {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash3, buf)
+ n, err := v.Get(context.Background(), TestHash3, buf)
if err != nil {
t.Errorf("err3: %v", err)
}
sem := make(chan int)
go func(sem chan int) {
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.Background(), TestHash, TestBlock)
if err != nil {
t.Errorf("err1: %v", err)
}
}(sem)
go func(sem chan int) {
- err := v.Put(TestHash2, TestBlock2)
+ err := v.Put(context.Background(), TestHash2, TestBlock2)
if err != nil {
t.Errorf("err2: %v", err)
}
}(sem)
go func(sem chan int) {
- err := v.Put(TestHash3, TestBlock3)
+ err := v.Put(context.Background(), TestHash3, TestBlock3)
if err != nil {
t.Errorf("err3: %v", err)
}
// Double check that we actually wrote the blocks we expected to write.
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("Get #1: %v", err)
}
t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
}
- n, err = v.Get(TestHash2, buf)
+ n, err = v.Get(context.Background(), TestHash2, buf)
if err != nil {
t.Errorf("Get #2: %v", err)
}
t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
}
- n, err = v.Get(TestHash3, buf)
+ n, err = v.Get(context.Background(), TestHash3, buf)
if err != nil {
t.Errorf("Get #3: %v", err)
}
wdata[0] = 'a'
wdata[BlockSize-1] = 'z'
hash := fmt.Sprintf("%x", md5.Sum(wdata))
- err := v.Put(hash, wdata)
+ err := v.Put(context.Background(), hash, wdata)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, BlockSize)
- n, err := v.Get(hash, buf)
+ n, err := v.Get(context.Background(), hash, buf)
if err != nil {
t.Error(err)
}
v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Fatal(err)
}
t.Fatal(err)
}
} else {
- _, err = v.Get(TestHash, buf)
+ _, err = v.Get(context.Background(), TestHash, buf)
if err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
}
// Get the block - after trash and untrash sequence
- n, err = v.Get(TestHash, buf)
+ n, err = v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Fatal(err)
}
checkGet := func() error {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
return err
}
return err
}
- err = v.Compare(TestHash, TestBlock)
+ err = v.Compare(context.Background(), TestHash, TestBlock)
if err != nil {
return err
}
import (
"bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
}
}
-func (v *MockVolume) Compare(loc string, buf []byte) error {
+func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error {
v.gotCall("Compare")
<-v.Gate
if v.Bad {
}
}
-func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
+func (v *MockVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
v.gotCall("Get")
<-v.Gate
if v.Bad {
return 0, os.ErrNotExist
}
-func (v *MockVolume) Put(loc string, block []byte) error {
+func (v *MockVolume) Put(ctx context.Context, loc string, block []byte) error {
v.gotCall("Put")
<-v.Gate
if v.Bad {
import (
"bufio"
+ "context"
"flag"
"fmt"
"io"
// Lock the locker (if one is in use), open the file for reading, and
// call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
if v.locker != nil {
v.locker.Lock()
defer v.locker.Unlock()
}
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
f, err := os.Open(path)
if err != nil {
return err
// Get retrieves a block, copies it to the given slice, and returns
// the number of bytes copied.
-func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
+func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
path := v.blockPath(loc)
stat, err := v.stat(path)
if err != nil {
}
var read int
size := int(stat.Size())
- err = v.getFunc(path, func(rdr io.Reader) error {
+ err = v.getFunc(ctx, path, func(rdr io.Reader) error {
read, err = io.ReadFull(rdr, buf[:size])
return err
})
// Compare returns nil if Get(loc) would return the same content as
// expect. It is functionally equivalent to Get() followed by
// bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(loc string, expect []byte) error {
+func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
path := v.blockPath(loc)
if _, err := v.stat(path); err != nil {
return v.translateError(err)
}
- return v.getFunc(path, func(rdr io.Reader) error {
- return compareReaderWithBuf(rdr, expect, loc[:32])
+ return v.getFunc(ctx, path, func(rdr io.Reader) error {
+ return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
})
}
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
// that error is returned.
-func (v *UnixVolume) Put(loc string, block []byte) error {
+func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
if v.ReadOnly {
return MethodDisabledError
}
v.locker.Lock()
defer v.locker.Unlock()
}
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
tmpfile.Close()
import (
"bytes"
+ "context"
"errors"
"fmt"
"io"
v.ReadOnly = orig
}(v.ReadOnly)
v.ReadOnly = false
- err := v.Put(locator, data)
+ err := v.Put(context.Background(), locator, data)
if err != nil {
v.t.Fatal(err)
}
func TestGetNotFound(t *testing.T) {
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash2, buf)
+ n, err := v.Get(context.Background(), TestHash2, buf)
switch {
case os.IsNotExist(err):
break
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.Background(), TestHash, TestBlock)
if err != nil {
t.Error(err)
}
defer v.Teardown()
os.Chmod(v.Root, 000)
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.Background(), TestHash, TestBlock)
if err == nil {
t.Error("Write should have failed")
}
v.PutRaw(TestHash, TestBlock)
buf := make([]byte, BlockSize)
- _, err := v.Get(TestHash, buf)
+ _, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("got err %v, expected nil", err)
}
- err = v.Put(TestHash, TestBlock)
+ err = v.Put(context.Background(), TestHash, TestBlock)
if err != MethodDisabledError {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
mockErr := errors.New("Mock error")
- err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
return mockErr
})
if err != mockErr {
defer v.Teardown()
funcCalled := false
- err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
funcCalled = true
return nil
})
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
mtx := NewMockMutex()
v.locker = mtx
funcCalled := make(chan struct{})
- go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
funcCalled <- struct{}{}
return nil
})
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
- err := v.Compare(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
+ err := v.Compare(context.Background(), TestHash, TestBlock)
if err != nil {
t.Errorf("Got err %q, expected nil", err)
}
- err = v.Compare(TestHash, []byte("baddata"))
+ err = v.Compare(context.Background(), TestHash, []byte("baddata"))
if err != CollisionError {
t.Errorf("Got err %q, expected %q", err, CollisionError)
}
- v.Put(TestHash, []byte("baddata"))
- err = v.Compare(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, []byte("baddata"))
+ err = v.Compare(context.Background(), TestHash, TestBlock)
if err != DiskHashError {
t.Errorf("Got err %q, expected %q", err, DiskHashError)
}
p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
os.Chmod(p, 000)
- err = v.Compare(TestHash, TestBlock)
+ err = v.Compare(context.Background(), TestHash, TestBlock)
if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
t.Errorf("Got err %q, expected %q", err, "permission denied")
}
if err != nil {
log.Fatal(err)
}
- kc, err := keepclient.MakeKeepClient(&arv)
+ kc, err := keepclient.MakeKeepClient(arv)
if err != nil {
log.Fatal(err)
}
overrideServices(kc)
- nextBuf := make(chan []byte, *WriteThreads)
nextLocator := make(chan string, *ReadThreads+*WriteThreads)
go countBeans(nextLocator)
for i := 0; i < *WriteThreads; i++ {
+ nextBuf := make(chan []byte, 1)
go makeBufs(nextBuf, i)
go doWrites(kc, nextBuf, nextLocator)
}
}
}
-func makeBufs(nextBuf chan []byte, threadID int) {
+func makeBufs(nextBuf chan<- []byte, threadID int) {
buf := make([]byte, *BlockSize)
if *VaryThread {
binary.PutVarint(buf, int64(threadID))
}
+ randSize := 524288
+ if randSize > *BlockSize {
+ randSize = *BlockSize
+ }
for {
if *VaryRequest {
- buf = make([]byte, *BlockSize)
- if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+ rnd := make([]byte, randSize)
+ if _, err := io.ReadFull(rand.Reader, rnd); err != nil {
log.Fatal(err)
}
+ buf = append(rnd, buf[randSize:]...)
}
nextBuf <- buf
}
}
-func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) {
+func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string) {
for buf := range nextBuf {
locator, _, err := kc.PutB(buf)
if err != nil {
}
}
-func doReads(kc *keepclient.KeepClient, nextLocator chan string) {
+func doReads(kc *keepclient.KeepClient, nextLocator <-chan string) {
for locator := range nextLocator {
rdr, size, url, err := kc.Get(locator)
if err != nil {