onclick: db.login.bind(db, session.baseURL),
}, session.listedHost ? 'Enable ':'Log in ', m('span.glyphicon.glyphicon-log-in')))
],
- m('td', session.isFromRails ? null : m('button.btn.btn-xs.btn-default', {
- uuidPrefix: uuidPrefix,
- onclick: m.withAttr('uuidPrefix', db.trash),
- }, 'Remove ', m('span.glyphicon.glyphicon-trash'))),
+ m('td', (session.isFromRails || session.listedHost) ? null :
+ m('button.btn.btn-xs.btn-default', {
+ uuidPrefix: uuidPrefix,
+ onclick: m.withAttr('uuidPrefix', db.trash),
+ }, 'Remove ', m('span.glyphicon.glyphicon-trash'))
+ ),
])
}),
]),
return;
}
- wasatbottom = ($(this).scrollTop() + $(this).height() >= this.scrollHeight);
+ wasatbottom = (this.scrollTop + this.clientHeight >= this.scrollHeight);
if (eventData.prepend) {
$(this).prepend(txt);
} else {
--- /dev/null
+<%# Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: AGPL-3.0 %>
+
+<div class="nowrap">
+ <%= object.content_summary %><br />
+ <%= render partial: 'container_requests/state_label', locals: {object: object} %>
+</div>
--- /dev/null
+<%# Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: AGPL-3.0 %>
+
+<% wu = object.work_unit object.name %>
+<span class="label label-<%=wu.state_bootstrap_class%>">
+ <%=wu.state_label%>
+</span>
style="display:none"
data-object-uuid="<%= @object.uuid %>"></div>
-<div id="event_log_div"
+<pre id="event_log_div"
class="arv-log-event-listener arv-log-event-handler-append-logs arv-job-log-window"
data-object-uuid="<%= @object.uuid %>"
><%= @object.stderr_log_lines(Rails.configuration.running_job_log_records_to_fetch).join("\n") %>
-</div>
+</pre>
<%# Applying a long throttle suppresses the auto-refresh of this
partial that would normally be triggered by arv-log-event. %>
<% unless still_logging.empty? %>
<h4>Logs in progress</h4>
- <div id="event_log_div"
+ <pre id="event_log_div"
class="arv-log-event-listener arv-log-event-handler-append-logs arv-log-event-subscribe-to-pipeline-job-uuids arv-job-log-window"
data-object-uuids="<%= @object.stderr_log_object_uuids.join(' ') %>"
- ><%= @object.stderr_log_lines.join("\n") %></div>
+ ><%= @object.stderr_log_lines.join("\n") %></pre>
<%# Applying a long throttle suppresses the auto-refresh of this
partial that would normally be triggered by arv-log-event. %>
<%# Still running, or recently finished and logs are still available from logs table %>
<%# Show recent logs in terminal window %>
<h4>Recent logs</h4>
-<div id="event_log_div"
+<pre id="event_log_div"
class="arv-log-event-listener arv-log-event-handler-append-logs arv-job-log-window"
data-object-uuids="<%= wu.log_object_uuids.join(' ') %>"
><%= live_log_lines %>
-</div>
+</pre>
<%# Applying a long throttle suppresses the auto-refresh of this
partial that would normally be triggered by arv-log-event. %>
assert_select "a", {:href=>"/collections/#{container['log']}", :text=>"Download the log"}
assert_select "a", {:href=>"#{container['log']}/baz"}
- assert_not_includes @response.body, '<div id="event_log_div"'
+ assert_not_includes @response.body, '<pre id="event_log_div"'
end
test "visit running container request log tab" do
get :show, {id: cr['uuid'], tab_pane: 'Log'}, session_for(:active)
assert_response :success
- assert_includes @response.body, '<div id="event_log_div"'
+ assert_includes @response.body, '<pre id="event_log_div"'
assert_select 'Download the log', false
end
lib/cli
lib/cmd
lib/crunchstat
+lib/dispatchcloud
services/api
services/arv-git-httpd
services/crunchstat
install_R_sdk() {
cd "$WORKSPACE/sdk/R" \
- && R --quiet --vanilla <<EOF
-options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
-if (!requireNamespace("devtools")) {
- install.packages("devtools")
-}
-if (!requireNamespace("roxygen2")) {
- install.packages("roxygen2")
-}
-if (!requireNamespace("pkgdown")) {
- devtools::install_github("hadley/pkgdown")
-}
-devtools::install_dev_deps()
-EOF
+ && R --quiet --vanilla --file=install_deps.R
}
do_install sdk/R R_sdk
lib/cli
lib/cmd
lib/crunchstat
+ lib/dispatchcloud
sdk/go/arvados
sdk/go/arvadosclient
sdk/go/blockdigest
- sdk/go/index.html.textile.liquid
- sdk/go/example.html.textile.liquid
- R:
- - sdk/R/R.html.textile.liquid
+ - sdk/R/index.html.textile.liquid
- Perl:
- sdk/perl/index.html.textile.liquid
- sdk/perl/example.html.textile.liquid
* "Python SDK":{{site.baseurl}}/sdk/python/sdk-python.html
* "Command line SDK":{{site.baseurl}}/sdk/cli/install.html ("arv")
* "Go SDK":{{site.baseurl}}/sdk/go/index.html
-* "R SDK":{{site.baseurl}}/sdk/go/index.html
+* "R SDK":{{site.baseurl}}/sdk/R/index.html
* "Perl SDK":{{site.baseurl}}/sdk/perl/index.html
* "Ruby SDK":{{site.baseurl}}/sdk/ruby/index.html
* "Java SDK":{{site.baseurl}}/sdk/java/index.html
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "testing"
+
+ check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "bytes"
+ "errors"
+ "log"
+ "os/exec"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var (
+ ErrConstraintsNotSatisfiable = errors.New("constraints not satisfiable by any configured instance type")
+ ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
+ discountConfiguredRAMPercent = 5
+)
+
+// ChooseInstanceType returns the cheapest available
+// arvados.InstanceType big enough to run ctr.
+func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
+ if len(cc.InstanceTypes) == 0 {
+ err = ErrInstanceTypesNotConfigured
+ return
+ }
+
+ needScratch := int64(0)
+ for _, m := range ctr.Mounts {
+ if m.Kind == "tmp" {
+ needScratch += m.Capacity
+ }
+ }
+
+ needVCPUs := ctr.RuntimeConstraints.VCPUs
+
+ needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
+ needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
+
+ err = ErrConstraintsNotSatisfiable
+ for _, it := range cc.InstanceTypes {
+ switch {
+ case err == nil && it.Price > best.Price:
+ case it.Scratch < needScratch:
+ case it.RAM < needRAM:
+ case it.VCPUs < needVCPUs:
+ case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs):
+ // Equal price, but worse specs
+ default:
+ // Lower price || (same price && better specs)
+ best = it
+ err = nil
+ }
+ }
+ return
+}
+
+// SlurmNodeTypeFeatureKludge ensures SLURM accepts every instance
+// type name as a valid feature name, even if no instances of that
+// type have appeared yet.
+//
+// It takes advantage of some SLURM peculiarities:
+//
+// (1) A feature is valid after it has been offered by a node, even if
+// it is no longer offered by any node. So, to make a feature name
+// valid, we can add it to a dummy node ("compute0"), then remove it.
+//
+// (2) when srun is given an invalid --gres argument and an invalid
+// --constraint argument, the error message mentions "Invalid feature
+// specification". So, to test whether a feature name is valid without
+// actually submitting a job, we can call srun with the feature name
+// and an invalid --gres argument.
+//
+// SlurmNodeTypeFeatureKludge does a test-and-fix operation
+// immediately, and then periodically, in case slurm restarts and
+// forgets the list of valid features. It never returns (unless there
+// are no node types configured, in which case it returns
+// immediately), so it should generally be invoked with "go".
+func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
+ if len(cc.InstanceTypes) == 0 {
+ return
+ }
+ var features []string
+ for _, it := range cc.InstanceTypes {
+ features = append(features, "instancetype="+it.Name)
+ }
+ for {
+ slurmKludge(features)
+ time.Sleep(time.Minute)
+ }
+}
+
+var (
+ slurmDummyNode = "compute0"
+ slurmErrBadFeature = "Invalid feature"
+ slurmErrBadGres = "Invalid generic resource"
+)
+
+func slurmKludge(features []string) {
+ cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(features, "&"), "true")
+ out, err := cmd.CombinedOutput()
+ switch {
+ case err == nil:
+ log.Printf("warning: guaranteed-to-fail srun command did not fail: %q %q", cmd.Path, cmd.Args)
+ log.Printf("output was: %q", out)
+
+ case bytes.Contains(out, []byte(slurmErrBadFeature)):
+ log.Printf("temporarily configuring node %q with all node type features", slurmDummyNode)
+ for _, nodeFeatures := range []string{strings.Join(features, ","), ""} {
+ cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+nodeFeatures)
+ log.Printf("running: %q %q", cmd.Path, cmd.Args)
+ out, err := cmd.CombinedOutput()
+ if err != nil {
+ log.Printf("error: scontrol: %s (output was %q)", err, out)
+ }
+ }
+
+ case bytes.Contains(out, []byte(slurmErrBadGres)):
+ // Evidently our node-type feature names are all valid.
+
+ default:
+ log.Printf("warning: expected srun error %q or %q, but output was %q", slurmErrBadFeature, slurmErrBadGres, out)
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&NodeSizeSuite{})
+
+const GiB = int64(1 << 30)
+
+type NodeSizeSuite struct{}
+
+func (*NodeSizeSuite) TestChooseNotConfigured(c *check.C) {
+ _, err := ChooseInstanceType(&arvados.Cluster{}, &arvados.Container{
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ RAM: 1234567890,
+ VCPUs: 2,
+ },
+ })
+ c.Check(err, check.Equals, ErrInstanceTypesNotConfigured)
+}
+
+func (*NodeSizeSuite) TestChooseUnsatisfiable(c *check.C) {
+ checkUnsatisfiable := func(ctr *arvados.Container) {
+ _, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: []arvados.InstanceType{
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small1"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "small2"},
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "small4", Scratch: GiB},
+ }}, ctr)
+ c.Check(err, check.Equals, ErrConstraintsNotSatisfiable)
+ }
+
+ for _, rc := range []arvados.RuntimeConstraints{
+ {RAM: 9876543210, VCPUs: 2},
+ {RAM: 1234567890, VCPUs: 20},
+ {RAM: 1234567890, VCPUs: 2, KeepCacheRAM: 9876543210},
+ } {
+ checkUnsatisfiable(&arvados.Container{RuntimeConstraints: rc})
+ }
+ checkUnsatisfiable(&arvados.Container{
+ Mounts: map[string]arvados.Mount{"/tmp": {Kind: "tmp", Capacity: 2 * GiB}},
+ RuntimeConstraints: arvados.RuntimeConstraints{RAM: 12345, VCPUs: 1},
+ })
+}
+
+func (*NodeSizeSuite) TestChoose(c *check.C) {
+ for _, menu := range [][]arvados.InstanceType{
+ {
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
+ },
+ {
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "goodenough"},
+ {Price: 2.2, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
+ },
+ {
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "goodenough"},
+ {Price: 2.2, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+ },
+ {
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: GiB, Name: "small"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: GiB, Name: "nearly"},
+ {Price: 3.3, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+ },
+ } {
+ best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu}, &arvados.Container{
+ Mounts: map[string]arvados.Mount{
+ "/tmp": {Kind: "tmp", Capacity: 2 * GiB},
+ },
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 2,
+ RAM: 987654321,
+ KeepCacheRAM: 123456789,
+ },
+ })
+ c.Check(err, check.IsNil)
+ c.Check(best.Name, check.Equals, "best")
+ c.Check(best.RAM >= 1234567890, check.Equals, true)
+ c.Check(best.VCPUs >= 2, check.Equals, true)
+ c.Check(best.Scratch >= 2*GiB, check.Equals, true)
+ }
+}
--- /dev/null
+options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
+if (!requireNamespace("devtools")) {
+ install.packages("devtools")
+}
+if (!requireNamespace("roxygen2")) {
+ install.packages("roxygen2")
+}
+
+# These install from github so install known-good versions instead of
+# letting any push to master break our build.
+if (!requireNamespace("pkgload")) {
+ devtools::install_github("r-lib/pkgload", ref="7a97de62adf1793c03e73095937e4655baad79c9")
+}
+if (!requireNamespace("pkgdown")) {
+ devtools::install_github("r-lib/pkgdown", ref="897ffbc016549c11c4263cb5d1f6e9f5c99efb45")
+}
+
+devtools::install_dev_deps()
"success")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+ self.eval_timeout = kwargs.get("eval_timeout")
kwargs["make_fs_access"] = make_fs_access
kwargs["enable_reuse"] = kwargs.get("enable_reuse")
kwargs["compute_checksum"] = kwargs.get("compute_checksum")
if self.work_api == "containers":
+ if self.ignore_docker_for_reuse:
+ raise validate.ValidationException("--ignore-docker-for-reuse not supported with containers API.")
kwargs["outdir"] = "/var/spool/cwl"
kwargs["docker_outdir"] = "/var/spool/cwl"
kwargs["tmpdir"] = "/tmp"
if self.arvrunner.project_uuid:
command.append("--project-uuid="+self.arvrunner.project_uuid)
+ command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout)
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
min_entries=2):
self.api_client = api_client
self.keep_client = keep_client
+ self.num_retries = num_retries
self.collections = OrderedDict()
self.lock = threading.Lock()
self.total = 0
if pdh not in self.collections:
logger.debug("Creating collection reader for %s", pdh)
cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
- keep_client=self.keep_client)
+ keep_client=self.keep_client,
+ num_retries=self.num_retries)
sz = len(cr.manifest_text()) * 128
self.collections[pdh] = (cr, sz)
self.total += sz
'state': 'Committed',
'owner_uuid': None,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue',
+ '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = [
'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--disable-reuse', '--on-error=continue',
+ '--disable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = [
'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--disable-reuse', '--on-error=continue',
+ '--disable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
expect_container["name"] = "submit_wf_no_reuse.cwl"
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=stop',
+ '--enable-reuse', '--on-error=stop', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- "--output-name="+output_name, '--enable-reuse', '--on-error=continue',
+ "--output-name="+output_name, '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["output_name"] = output_name
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=continue',
- "--intermediate-output-ttl=3600",
+ "--intermediate-output-ttl=3600", '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=continue',
- "--trash-intermediate",
+ "--trash-intermediate", '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
+ "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
'name': 'expect_arvworkflow.cwl#main',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue',
+ '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
'name': 'a test workflow',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue',
+ '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["owner_uuid"] = project_uuid
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid,
+ '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid, '--eval-timeout=20',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+ @stubs
+ def test_submit_container_eval_timeout(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--eval-timeout=60",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--enable-reuse', '--on-error=continue', '--eval-timeout=60.0',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
err = s.fs.Remove("foo/bar")
c.Check(err, check.IsNil)
- // mkdir succeds after the file is deleted
+ // mkdir succeeds after the file is deleted
err = s.fs.Mkdir("foo/bar", 0755)
c.Check(err, check.IsNil)
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package arvados
import (
ClusterID string `json:"-"`
ManagementToken string
SystemNodes map[string]SystemNode
+ InstanceTypes []InstanceType
+}
+
+type InstanceType struct {
+ Name string
+ ProviderType string
+ VCPUs int
+ RAM int64
+ Scratch int64
+ Price float64
}
// GetThisSystemNode returns a SystemNode for the node we're running
// CPU) and network connectivity.
type RuntimeConstraints struct {
API *bool
- RAM int `json:"ram"`
- VCPUs int `json:"vcpus"`
- KeepCacheRAM int `json:"keep_cache_ram"`
+ RAM int64 `json:"ram"`
+ VCPUs int `json:"vcpus"`
+ KeepCacheRAM int64 `json:"keep_cache_ram"`
}
// SchedulingParameters specify a container's scheduling parameters
"time"
"git.curoverse.com/arvados.git/sdk/go/stats"
- log "github.com/Sirupsen/logrus"
+ "github.com/Sirupsen/logrus"
)
type contextKey struct {
var requestTimeContextKey = contextKey{"requestTime"}
+var Logger logrus.FieldLogger = logrus.StandardLogger()
+
// LogRequests wraps an http.Handler, logging each request and
// response via logrus.
func LogRequests(h http.Handler) http.Handler {
return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
- lgr := log.WithFields(log.Fields{
+ lgr := Logger.WithFields(logrus.Fields{
"RequestID": req.Header.Get("X-Request-Id"),
"remoteAddr": req.RemoteAddr,
"reqForwardedFor": req.Header.Get("X-Forwarded-For"),
})
}
-func logRequest(w *responseTimer, req *http.Request, lgr *log.Entry) {
+func logRequest(w *responseTimer, req *http.Request, lgr *logrus.Entry) {
lgr.Info("request")
}
-func logResponse(w *responseTimer, req *http.Request, lgr *log.Entry) {
+func logResponse(w *responseTimer, req *http.Request, lgr *logrus.Entry) {
if tStart, ok := req.Context().Value(&requestTimeContextKey).(time.Time); ok {
tDone := time.Now()
- lgr = lgr.WithFields(log.Fields{
+ lgr = lgr.WithFields(logrus.Fields{
"timeTotal": stats.Duration(tDone.Sub(tStart)),
"timeToStatus": stats.Duration(w.writeTime.Sub(tStart)),
"timeWriteBody": stats.Duration(tDone.Sub(w.writeTime)),
})
}
- lgr.WithFields(log.Fields{
- "respStatusCode": w.WroteStatus(),
- "respStatus": http.StatusText(w.WroteStatus()),
+ respCode := w.WroteStatus()
+ if respCode == 0 {
+ respCode = http.StatusOK
+ }
+ lgr.WithFields(logrus.Fields{
+ "respStatusCode": respCode,
+ "respStatus": http.StatusText(respCode),
"respBytes": w.WroteBodyBytes(),
}).Info("response")
}
writeTime time.Time
}
+func (rt *responseTimer) CloseNotify() <-chan bool {
+ if cn, ok := rt.ResponseWriter.(http.CloseNotifier); ok {
+ return cn.CloseNotify()
+ }
+ return nil
+}
+
func (rt *responseTimer) WriteHeader(code int) {
if !rt.wrote {
rt.wrote = true
return &responseWriter{ResponseWriter: orig}
}
+func (w *responseWriter) CloseNotify() <-chan bool {
+ if cn, ok := w.ResponseWriter.(http.CloseNotifier); ok {
+ return cn.CloseNotify()
+ }
+ return nil
+}
+
func (w *responseWriter) WriteHeader(s int) {
w.wroteStatus = s
w.ResponseWriter.WriteHeader(s)
"""
+ __slots__ = ('parent', 'name', '_writers', '_committed',
+ '_segments', 'lock', '_current_bblock', 'fuse_entry')
+
def __init__(self, parent, name, stream=[], segments=[]):
"""
ArvadosFile constructor.
return text
+ _token_re = re.compile(r'(\S+)(\s+|$)')
+ _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
+ _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+
@synchronized
def _import_manifest(self, manifest_text):
"""Import a manifest into a `Collection`.
stream_name = None
state = STREAM_NAME
- for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+ for token_and_separator in self._token_re.finditer(manifest_text):
tok = token_and_separator.group(1)
sep = token_and_separator.group(2)
continue
if state == BLOCKS:
- block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+ block_locator = self._block_re.match(tok)
if block_locator:
blocksize = int(block_locator.group(1))
blocks.append(Range(tok, streamoffset, blocksize, 0))
state = SEGMENTS
if state == SEGMENTS:
- file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+ file_segment = self._segment_re.match(tok)
if file_segment:
pos = int(file_segment.group(1))
size = int(file_segment.group(2))
self._lastheadername = name
self._headers[name] = value
# Returning None implies all bytes were written
-
+
class KeepWriterQueue(queue.Queue):
def __init__(self, copies):
self.successful_copies_lock = threading.Lock()
self.pending_tries = copies
self.pending_tries_notification = threading.Condition()
-
+
def write_success(self, response, replicas_nr):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
-
+
def write_fail(self, ks):
with self.pending_tries_notification:
self.pending_tries += 1
self.pending_tries_notification.notify()
-
+
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
for _ in range(num_threads):
w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
self.workers.append(w)
-
+
def add_task(self, ks, service_root):
self.queue.put((ks, service_root))
self.total_task_nr += 1
-
+
def done(self):
return self.queue.successful_copies
-
+
def join(self):
# Start workers
for worker in self.workers:
worker.start()
# Wait for finished work
self.queue.join()
-
+
def response(self):
return self.queue.response
-
-
+
+
class KeepWriterThread(threading.Thread):
TaskFailed = RuntimeError()
self.get_counter.add(1)
- locator = KeepLocator(loc_s)
- if method == "GET":
- slot, first = self.block_cache.reserve_cache(locator.md5sum)
- if not first:
- self.hits_counter.add(1)
- v = slot.get()
- return v
-
- self.misses_counter.add(1)
-
- headers = {
- 'X-Request-Id': (request_id or
- (hasattr(self, 'api_client') and self.api_client.request_id) or
- arvados.util.new_request_id()),
- }
-
- # If the locator has hints specifying a prefix (indicating a
- # remote keepproxy) or the UUID of a local gateway service,
- # read data from the indicated service(s) instead of the usual
- # list of local disk services.
- hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
- for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
- hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
- for hint in locator.hints if (
- hint.startswith('K@') and
- len(hint) == 29 and
- self._gateway_services.get(hint[2:])
- )])
- # Map root URLs to their KeepService objects.
- roots_map = {
- root: self.KeepService(root, self._user_agent_pool,
- upload_counter=self.upload_counter,
- download_counter=self.download_counter,
- headers=headers)
- for root in hint_roots
- }
-
- # See #3147 for a discussion of the loop implementation. Highlights:
- # * Refresh the list of Keep services after each failure, in case
- # it's being updated.
- # * Retry until we succeed, we're out of retries, or every available
- # service has returned permanent failure.
- sorted_roots = []
- roots_map = {}
+ slot = None
blob = None
- loop = retry.RetryLoop(num_retries, self._check_loop_result,
- backoff_start=2)
- for tries_left in loop:
- try:
- sorted_roots = self.map_new_services(
- roots_map, locator,
- force_rebuild=(tries_left < num_retries),
- need_writable=False,
- headers=headers)
- except Exception as error:
- loop.save_result(error)
- continue
+ try:
+ locator = KeepLocator(loc_s)
+ if method == "GET":
+ slot, first = self.block_cache.reserve_cache(locator.md5sum)
+ if not first:
+ self.hits_counter.add(1)
+ blob = slot.get()
+ if blob is None:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s))
+ return blob
+
+ self.misses_counter.add(1)
+
+ headers = {
+ 'X-Request-Id': (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id()),
+ }
+
+ # If the locator has hints specifying a prefix (indicating a
+ # remote keepproxy) or the UUID of a local gateway service,
+ # read data from the indicated service(s) instead of the usual
+ # list of local disk services.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+ hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+ for hint in locator.hints if (
+ hint.startswith('K@') and
+ len(hint) == 29 and
+ self._gateway_services.get(hint[2:])
+ )])
+ # Map root URLs to their KeepService objects.
+ roots_map = {
+ root: self.KeepService(root, self._user_agent_pool,
+ upload_counter=self.upload_counter,
+ download_counter=self.download_counter,
+ headers=headers)
+ for root in hint_roots
+ }
+
+ # See #3147 for a discussion of the loop implementation. Highlights:
+ # * Refresh the list of Keep services after each failure, in case
+ # it's being updated.
+ # * Retry until we succeed, we're out of retries, or every available
+ # service has returned permanent failure.
+ sorted_roots = []
+ roots_map = {}
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ sorted_roots = self.map_new_services(
+ roots_map, locator,
+ force_rebuild=(tries_left < num_retries),
+ need_writable=False,
+ headers=headers)
+ except Exception as error:
+ loop.save_result(error)
+ continue
- # Query KeepService objects that haven't returned
- # permanent failure, in our specified shuffle order.
- services_to_try = [roots_map[root]
- for root in sorted_roots
- if roots_map[root].usable()]
- for keep_service in services_to_try:
- blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
- if blob is not None:
- break
- loop.save_result((blob, len(services_to_try)))
-
- # Always cache the result, then return it if we succeeded.
- if method == "GET":
- slot.set(blob)
- self.block_cache.cap_cache()
- if loop.success():
- if method == "HEAD":
- return True
- else:
- return blob
+ # Query KeepService objects that haven't returned
+ # permanent failure, in our specified shuffle order.
+ services_to_try = [roots_map[root]
+ for root in sorted_roots
+ if roots_map[root].usable()]
+ for keep_service in services_to_try:
+ blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+ if blob is not None:
+ break
+ loop.save_result((blob, len(services_to_try)))
+
+ # Always cache the result, then return it if we succeeded.
+ if loop.success():
+ if method == "HEAD":
+ return True
+ else:
+ return blob
+ finally:
+ if slot is not None:
+ slot.set(blob)
+ self.block_cache.cap_cache()
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
loop.save_result(error)
continue
- writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+ writer_pool = KeepClient.KeepWriterThreadPool(data=data,
data_hash=data_hash,
copies=copies - done,
max_service_replicas=self.max_replicas_per_service,
def finished(self):
return False
-
+
def setUp(self):
self.copies = 3
self.pool = arvados.KeepClient.KeepWriterThreadPool(
self.pool.add_task(ks, None)
self.pool.join()
self.assertEqual(self.pool.done(), self.copies-1)
-
+
@tutil.skip_sleep
class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
with self.assertRaises(arvados.errors.KeepWriteError):
self.keep_client.put('foo', num_retries=1, copies=2)
self.assertEqual(2, req_mock.call_count)
+
+class KeepClientAPIErrorTest(unittest.TestCase):
+ def test_api_fail(self):
+ class ApiMock(object):
+ def __getattr__(self, r):
+ if r == "api_token":
+ return "abc"
+ else:
+ raise arvados.errors.KeepReadError()
+ keep_client = arvados.KeepClient(api_client=ApiMock(),
+ proxy='', local_store='')
+
+ # The bug this is testing for is that if an API (not
+ # keepstore) exception is thrown as part of a get(), the next
+ # attempt to get that same block will result in a deadlock.
+ # This is why there are two get()s in a row. Unfortunately,
+ # the failure mode for this test is that the test suite
+ # deadlocks, there isn't a good way to avoid that without
+ # adding a special case that has no use except for this test.
+
+ with self.assertRaises(arvados.errors.KeepReadError):
+ keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
+ with self.assertRaises(arvados.errors.KeepReadError):
+ keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
}
end
+ def self._update_requires_parameters
+ {}
+ end
+
def self._index_requires_parameters
{
filters: { type: 'array', required: false },
include DbCurrentTime
+ def self._ping_requires_parameters
+ { ping_secret: {required: true} }
+ end
+
+ def self._create_requires_parameters
+ super.merge(
+ { assign_slot: {required: false, type: 'boolean', description: 'assign slot and hostname'} })
+ end
+
+ def self._update_requires_parameters
+ super.merge(
+ { assign_slot: {required: false, type: 'boolean', description: 'assign slot and hostname'} })
+ end
+
+ def create
+ @object = model_class.new(resource_attrs)
+ @object.assign_slot if params[:assign_slot]
+ @object.save!
+ show
+ end
+
def update
if resource_attrs[:job_uuid].is_a? String
@object.job_readable = readable_job_uuids([resource_attrs[:job_uuid]]).any?
end
- super
- end
-
- def self._ping_requires_parameters
- { ping_secret: {required: true} }
+ attrs_to_update = resource_attrs.reject { |k,v|
+ [:kind, :etag, :href].index k
+ }
+ @object.update_attributes!(attrs_to_update)
+ @object.assign_slot if params[:assign_slot]
+ @object.save!
+ show
end
def ping
include Trashable
serialize :properties, Hash
+ serialize :storage_classes_desired, Array
+ serialize :storage_classes_confirmed, Array
before_validation :default_empty_manifest
+ before_validation :default_storage_classes, on: :create
before_validation :check_encoding
before_validation :check_manifest_validity
before_validation :check_signatures
before_validation :strip_signatures_and_update_replication_confirmed
validate :ensure_pdh_matches_manifest_text
+ validate :ensure_storage_classes_desired_is_not_empty
+ validate :ensure_storage_classes_contain_non_empty_strings
before_save :set_file_names
api_accessible :user, extend: :common do |t|
t.add :replication_desired
t.add :replication_confirmed
t.add :replication_confirmed_at
+ t.add :storage_classes_desired
+ t.add :storage_classes_confirmed
+ t.add :storage_classes_confirmed_at
t.add :delete_at
t.add :trash_at
t.add :is_trashed
end
def self.full_text_searchable_columns
- super - ["manifest_text"]
+ super - ["manifest_text", "storage_classes_desired", "storage_classes_confirmed"]
end
def self.where *args
end
protected
+
+ # Although the defaults for these columns is already set up on the schema,
+ # collection creation from an API client seems to ignore them, making the
+ # validation on empty desired storage classes return an error.
+ def default_storage_classes
+ if self.storage_classes_desired.nil? || self.storage_classes_desired.empty?
+ self.storage_classes_desired = ["default"]
+ end
+ self.storage_classes_confirmed ||= []
+ end
+
def portable_manifest_text
self.class.munge_manifest_locators(manifest_text) do |match|
if match[2] # size
end
def ensure_permission_to_save
- if (not current_user.andand.is_admin and
- (replication_confirmed_at_changed? or replication_confirmed_changed?) and
- not (replication_confirmed_at.nil? and replication_confirmed.nil?))
- raise ArvadosModel::PermissionDeniedError.new("replication_confirmed and replication_confirmed_at attributes cannot be changed, except by setting both to nil")
+ if (not current_user.andand.is_admin)
+ if (replication_confirmed_at_changed? or replication_confirmed_changed?) and
+ not (replication_confirmed_at.nil? and replication_confirmed.nil?)
+ raise ArvadosModel::PermissionDeniedError.new("replication_confirmed and replication_confirmed_at attributes cannot be changed, except by setting both to nil")
+ end
+ if (storage_classes_confirmed_changed? or storage_classes_confirmed_at_changed?) and
+ not (storage_classes_confirmed == [] and storage_classes_confirmed_at.nil?)
+ raise ArvadosModel::PermissionDeniedError.new("storage_classes_confirmed and storage_classes_confirmed_at attributes cannot be changed, except by setting them to [] and nil respectively")
+ end
end
super
end
+ def ensure_storage_classes_desired_is_not_empty
+ if self.storage_classes_desired.empty?
+ raise ArvadosModel::InvalidStateTransitionError.new("storage_classes_desired shouldn't be empty")
+ end
+ end
+
+ def ensure_storage_classes_contain_non_empty_strings
+ (self.storage_classes_desired + self.storage_classes_confirmed).each do |c|
+ if !c.is_a?(String) || c == ''
+ raise ArvadosModel::InvalidStateTransitionError.new("storage classes should only be non-empty strings")
+ end
+ end
+ end
end
end
end
- # Assign slot_number
- if self.slot_number.nil?
- while true
- n = self.class.available_slot_number
- if n.nil?
- raise "No available node slots"
- end
- self.slot_number = n
- begin
- self.save!
- break
- rescue ActiveRecord::RecordNotUnique
- # try again
- end
- end
- end
-
- # Assign hostname
- if self.hostname.nil? and Rails.configuration.assign_node_hostname
- self.hostname = self.class.hostname_for_slot(self.slot_number)
- end
+ assign_slot
# Record other basic stats
['total_cpu_cores', 'total_ram_mb', 'total_scratch_mb'].each do |key|
save!
end
+ def assign_slot
+ return if self.slot_number.andand > 0
+ while true
+ self.slot_number = self.class.available_slot_number
+ if self.slot_number.nil?
+ raise "No available node slots"
+ end
+ begin
+ save!
+ return assign_hostname
+ rescue ActiveRecord::RecordNotUnique
+ # try again
+ end
+ end
+ end
+
protected
+ def assign_hostname
+ if self.hostname.nil? and Rails.configuration.assign_node_hostname
+ self.hostname = self.class.hostname_for_slot(self.slot_number)
+ end
+ end
+
def self.available_slot_number
# Join the sequence 1..max with the nodes table. Return the first
# (i.e., smallest) value that doesn't match the slot_number of any
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class AddStorageClassesToCollections < ActiveRecord::Migration
+ def up
+ add_column :collections, :storage_classes_desired, :jsonb, :default => ["default"]
+ add_column :collections, :storage_classes_confirmed, :jsonb, :default => []
+ add_column :collections, :storage_classes_confirmed_at, :datetime, :default => nil, :null => true
+ end
+
+ def down
+ remove_column :collections, :storage_classes_desired
+ remove_column :collections, :storage_classes_confirmed
+ remove_column :collections, :storage_classes_confirmed_at
+ end
+end
delete_at timestamp without time zone,
file_names character varying(8192),
trash_at timestamp without time zone,
- is_trashed boolean DEFAULT false NOT NULL
+ is_trashed boolean DEFAULT false NOT NULL,
+ storage_classes_desired jsonb DEFAULT '["default"]'::jsonb,
+ storage_classes_confirmed jsonb DEFAULT '[]'::jsonb,
+ storage_classes_confirmed_at timestamp without time zone
);
INSERT INTO schema_migrations (version) VALUES ('20171212153352');
+INSERT INTO schema_migrations (version) VALUES ('20180216203422');
+
manifest_text: ". acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 0:3:foo 3:6:bar\n"
name: replication want=2 have=2
+storage_classes_desired_default_unconfirmed:
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2015-02-07 00:21:35.050333515 Z
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2015-02-07 00:21:35.050189104 Z
+ portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
+ storage_classes_desired: ["default"]
+ storage_classes_confirmed_at: ~
+ storage_classes_confirmed: ~
+ updated_at: 2015-02-07 00:21:35.050126576 Z
+ uuid: zzzzz-4zz18-3t236wrz4769tga
+ manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+ name: storage classes want=[default] have=[]
+
+storage_classes_desired_default_confirmed_default:
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2015-02-07 00:21:35.050333515 Z
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2015-02-07 00:21:35.050189104 Z
+ portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
+ storage_classes_desired: ["default"]
+ storage_classes_confirmed_at: 2015-02-07 00:21:35.050126576 Z
+ storage_classes_confirmed: ["default"]
+ updated_at: 2015-02-07 00:21:35.050126576 Z
+ uuid: zzzzz-4zz18-3t236wr12769tga
+ manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+ name: storage classes want=[default] have=[default]
+
+storage_classes_desired_archive_confirmed_default:
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2015-02-07 00:21:35.050333515 Z
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ modified_at: 2015-02-07 00:21:35.050189104 Z
+ portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
+ storage_classes_desired: ["archive"]
+ storage_classes_confirmed_at: ~
+ storage_classes_confirmed: ["default"]
+ updated_at: 2015-02-07 00:21:35.050126576 Z
+ uuid: zzzzz-4zz18-3t236wr12769qqa
+ manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+ name: storage classes want=[archive] have=[default]
+
collection_with_empty_properties:
uuid: zzzzz-4zz18-emptyproperties
portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
end
end
- test "jsonb 'exists' and '!=' filter" do
+ test "jsonb hash 'exists' and '!=' filter" do
@controller = Arvados::V1::CollectionsController.new
authorize_with :admin
get :index, {
assert_includes(found, collections(:collection_with_prop1_other1).uuid)
end
- test "jsonb alternate form 'exists' and '!=' filter" do
+ test "jsonb array 'exists'" do
+ @controller = Arvados::V1::CollectionsController.new
+ authorize_with :admin
+ get :index, {
+ filters: [ ['storage_classes_confirmed.default', 'exists', true] ]
+ }
+ assert_response :success
+ found = assigns(:objects).collect(&:uuid)
+ assert_equal 2, found.length
+ assert_not_includes(found,
+ collections(:storage_classes_desired_default_unconfirmed).uuid)
+ assert_includes(found,
+ collections(:storage_classes_desired_default_confirmed_default).uuid)
+ assert_includes(found,
+ collections(:storage_classes_desired_archive_confirmed_default).uuid)
+ end
+
+ test "jsonb hash alternate form 'exists' and '!=' filter" do
@controller = Arvados::V1::CollectionsController.new
authorize_with :admin
get :index, {
assert_includes(found, collections(:collection_with_prop1_other1).uuid)
end
+ test "jsonb array alternate form 'exists' filter" do
+ @controller = Arvados::V1::CollectionsController.new
+ authorize_with :admin
+ get :index, {
+ filters: [ ['storage_classes_confirmed', 'exists', 'default'] ]
+ }
+ assert_response :success
+ found = assigns(:objects).collect(&:uuid)
+ assert_equal 2, found.length
+ assert_not_includes(found,
+ collections(:storage_classes_desired_default_unconfirmed).uuid)
+ assert_includes(found,
+ collections(:storage_classes_desired_default_confirmed_default).uuid)
+ assert_includes(found,
+ collections(:storage_classes_desired_archive_confirmed_default).uuid)
+ end
+
test "jsonb 'exists' must be boolean" do
@controller = Arvados::V1::CollectionsController.new
authorize_with :admin
assert_not_nil json_response['uuid']
assert_not_nil json_response['info'].is_a? Hash
assert_not_nil json_response['info']['ping_secret']
+ assert_nil json_response['slot_number']
+ assert_nil json_response['hostname']
+ end
+
+ test "create node and assign slot" do
+ authorize_with :admin
+ post :create, {node: {}, assign_slot: true}
+ assert_response :success
+ assert_not_nil json_response['uuid']
+ assert_not_nil json_response['info'].is_a? Hash
+ assert_not_nil json_response['info']['ping_secret']
+ assert_operator 0, :<, json_response['slot_number']
+ n = json_response['slot_number']
+ assert_equal "compute#{n}", json_response['hostname']
+
+ node = Node.where(uuid: json_response['uuid']).first
+ assert_equal n, node.slot_number
+ assert_equal "compute#{n}", node.hostname
+ end
+
+ test "update node and assign slot" do
+ authorize_with :admin
+ node = nodes(:new_with_no_hostname)
+ post :update, {id: node.uuid, node: {}, assign_slot: true}
+ assert_response :success
+ assert_operator 0, :<, json_response['slot_number']
+ n = json_response['slot_number']
+ assert_equal "compute#{n}", json_response['hostname']
+
+ node.reload
+ assert_equal n, node.slot_number
+ assert_equal "compute#{n}", node.hostname
+ end
+
+ test "update node and assign slot, don't clobber hostname" do
+ authorize_with :admin
+ node = nodes(:new_with_custom_hostname)
+ post :update, {id: node.uuid, node: {}, assign_slot: true}
+ assert_response :success
+ assert_operator 0, :<, json_response['slot_number']
+ n = json_response['slot_number']
+ assert_equal "custom1", json_response['hostname']
end
test "ping adds node stats to info" do
end
end
+ test "storage_classes_desired cannot be empty" do
+ act_as_user users(:active) do
+ c = collections(:collection_owned_by_active)
+ c.update_attributes storage_classes_desired: ["hot"]
+ assert_equal ["hot"], c.storage_classes_desired
+ assert_raise ArvadosModel::InvalidStateTransitionError do
+ c.update_attributes storage_classes_desired: []
+ end
+ end
+ end
+
+ test "storage classes lists should only contain non-empty strings" do
+ c = collections(:storage_classes_desired_default_unconfirmed)
+ act_as_user users(:admin) do
+ assert c.update_attributes(storage_classes_desired: ["default", "a_string"],
+ storage_classes_confirmed: ["another_string"])
+ [
+ ["storage_classes_desired", ["default", 42]],
+ ["storage_classes_confirmed", [{the_answer: 42}]],
+ ["storage_classes_desired", ["default", ""]],
+ ["storage_classes_confirmed", [""]],
+ ].each do |attr, val|
+ assert_raise ArvadosModel::InvalidStateTransitionError do
+ assert c.update_attributes({attr => val})
+ end
+ end
+ end
+ end
+
+ test "storage_classes_confirmed* can be set by admin user" do
+ c = collections(:storage_classes_desired_default_unconfirmed)
+ act_as_user users(:admin) do
+ assert c.update_attributes(storage_classes_confirmed: ["default"],
+ storage_classes_confirmed_at: Time.now)
+ end
+ end
+
+ test "storage_classes_confirmed* cannot be set by non-admin user" do
+ act_as_user users(:active) do
+ c = collections(:storage_classes_desired_default_unconfirmed)
+ # Cannot set just one at a time.
+ assert_raise ArvadosModel::PermissionDeniedError do
+ c.update_attributes storage_classes_confirmed: ["default"]
+ end
+ c.reload
+ assert_raise ArvadosModel::PermissionDeniedError do
+ c.update_attributes storage_classes_confirmed_at: Time.now
+ end
+ # Cannot set bot at once, either.
+ c.reload
+ assert_raise ArvadosModel::PermissionDeniedError do
+ assert c.update_attributes(storage_classes_confirmed: ["default"],
+ storage_classes_confirmed_at: Time.now)
+ end
+ end
+ end
+
+ test "storage_classes_confirmed* can be cleared (but only together) by non-admin user" do
+ act_as_user users(:active) do
+ c = collections(:storage_classes_desired_default_confirmed_default)
+ # Cannot clear just one at a time.
+ assert_raise ArvadosModel::PermissionDeniedError do
+ c.update_attributes storage_classes_confirmed: []
+ end
+ c.reload
+ assert_raise ArvadosModel::PermissionDeniedError do
+ c.update_attributes storage_classes_confirmed_at: nil
+ end
+ # Can clear both at once.
+ c.reload
+ assert c.update_attributes(storage_classes_confirmed: [],
+ storage_classes_confirmed_at: nil)
+ end
+ end
+
[0, 2, 4, nil].each do |ask|
test "set replication_desired to #{ask.inspect}" do
Rails.configuration.default_collection_replication = 2
"strings"
"time"
+ "git.curoverse.com/arvados.git/lib/dispatchcloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
"github.com/coreos/go-systemd/daemon"
)
-var version = "dev"
+var (
+ version = "dev"
+ defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+)
+
+type Dispatcher struct {
+ *dispatch.Dispatcher
+ cluster *arvados.Cluster
+ sqCheck *SqueueChecker
+ slurm Slurm
-// Config used by crunch-dispatch-slurm
-type Config struct {
Client arvados.Client
SbatchArguments []string
// Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
CrunchRunCommand []string
+ // Extra RAM to reserve (in Bytes) for SLURM job, in addition
+ // to the amount specified in the container's RuntimeConstraints
+ ReserveExtraRAM int64
+
// Minimum time between two attempts to run the same container
MinRetryPeriod arvados.Duration
-
- slurm Slurm
}
func main() {
- theConfig.slurm = &slurmCLI{}
- err := doMain()
+ disp := &Dispatcher{}
+ err := disp.Run(os.Args[0], os.Args[1:])
if err != nil {
log.Fatal(err)
}
}
-var (
- theConfig Config
- sqCheck = &SqueueChecker{}
-)
-
-const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+func (disp *Dispatcher) Run(prog string, args []string) error {
+ if err := disp.configure(prog, args); err != nil {
+ return err
+ }
+ disp.setup()
+ return disp.run()
+}
-func doMain() error {
- flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
+// configure() loads config files. Tests skip this.
+func (disp *Dispatcher) configure(prog string, args []string) error {
+ flags := flag.NewFlagSet(prog, flag.ExitOnError)
flags.Usage = func() { usage(flags) }
configPath := flags.String(
false,
"Print version information and exit.")
// Parse args; omit the first arg which is the command name
- flags.Parse(os.Args[1:])
+ flags.Parse(args)
// Print version information if requested
if *getVersion {
log.Printf("crunch-dispatch-slurm %s started", version)
- err := readConfig(&theConfig, *configPath)
+ err := disp.readConfig(*configPath)
if err != nil {
return err
}
- if theConfig.CrunchRunCommand == nil {
- theConfig.CrunchRunCommand = []string{"crunch-run"}
+ if disp.CrunchRunCommand == nil {
+ disp.CrunchRunCommand = []string{"crunch-run"}
}
- if theConfig.PollPeriod == 0 {
- theConfig.PollPeriod = arvados.Duration(10 * time.Second)
+ if disp.PollPeriod == 0 {
+ disp.PollPeriod = arvados.Duration(10 * time.Second)
}
- if theConfig.Client.APIHost != "" || theConfig.Client.AuthToken != "" {
+ if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
// Copy real configs into env vars so [a]
// MakeArvadosClient() uses them, and [b] they get
// propagated to crunch-run via SLURM.
- os.Setenv("ARVADOS_API_HOST", theConfig.Client.APIHost)
- os.Setenv("ARVADOS_API_TOKEN", theConfig.Client.AuthToken)
+ os.Setenv("ARVADOS_API_HOST", disp.Client.APIHost)
+ os.Setenv("ARVADOS_API_TOKEN", disp.Client.AuthToken)
os.Setenv("ARVADOS_API_HOST_INSECURE", "")
- if theConfig.Client.Insecure {
+ if disp.Client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(theConfig.Client.KeepServiceURIs, " "))
+ os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
} else {
log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
}
if *dumpConfig {
- log.Fatal(config.DumpAndExit(theConfig))
+ return config.DumpAndExit(disp)
+ }
+
+ siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
+ if os.IsNotExist(err) {
+ log.Printf("warning: no cluster config (%s), proceeding with no node types defined", err)
+ } else if err != nil {
+ return fmt.Errorf("error loading config: %s", err)
+ } else if disp.cluster, err = siteConfig.GetCluster(""); err != nil {
+ return fmt.Errorf("config error: %s", err)
}
+ return nil
+}
+
+// setup() initializes private fields after configure().
+func (disp *Dispatcher) setup() {
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Printf("Error making Arvados client: %v", err)
- return err
+ log.Fatalf("Error making Arvados client: %v", err)
}
arv.Retries = 25
- sqCheck = &SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
- defer sqCheck.Stop()
-
- dispatcher := &dispatch.Dispatcher{
+ disp.slurm = &slurmCLI{}
+ disp.sqCheck = &SqueueChecker{
+ Period: time.Duration(disp.PollPeriod),
+ Slurm: disp.slurm,
+ }
+ disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
- RunContainer: run,
- PollPeriod: time.Duration(theConfig.PollPeriod),
- MinRetryPeriod: time.Duration(theConfig.MinRetryPeriod),
+ RunContainer: disp.runContainer,
+ PollPeriod: time.Duration(disp.PollPeriod),
+ MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
+ }
+}
+
+func (disp *Dispatcher) run() error {
+ defer disp.sqCheck.Stop()
+
+ if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
+ go dispatchcloud.SlurmNodeTypeFeatureKludge(disp.cluster)
}
if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
-
- go checkSqueueForOrphans(dispatcher, sqCheck)
-
- return dispatcher.Run(context.Background())
+ go disp.checkSqueueForOrphans()
+ return disp.Dispatcher.Run(context.Background())
}
var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
// jobs started by a previous dispatch process that never released
// their slurm allocations even though their container states are
// Cancelled or Complete. See https://dev.arvados.org/issues/10979
-func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) {
- for _, uuid := range sqCheck.All() {
+func (disp *Dispatcher) checkSqueueForOrphans() {
+ for _, uuid := range disp.sqCheck.All() {
if !containerUuidPattern.MatchString(uuid) {
continue
}
- err := dispatcher.TrackContainer(uuid)
+ err := disp.TrackContainer(uuid)
if err != nil {
log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
}
}
}
-func niceness(priority int) int {
+func (disp *Dispatcher) niceness(priority int) int {
if priority > 1000 {
priority = 1000
}
return (1000 - priority) * 10
}
-func sbatchArgs(container arvados.Container) []string {
- mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
+func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
+ mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
var disk int64
for _, m := range container.Mounts {
disk = int64(math.Ceil(float64(disk) / float64(1048576)))
var sbatchArgs []string
- sbatchArgs = append(sbatchArgs, theConfig.SbatchArguments...)
+ sbatchArgs = append(sbatchArgs, disp.SbatchArguments...)
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
- sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", disp.niceness(container.Priority)))
if len(container.SchedulingParameters.Partitions) > 0 {
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
- return sbatchArgs
+ if disp.cluster == nil {
+ // no instance types configured
+ } else if it, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
+ // ditto
+ } else if err != nil {
+ return nil, err
+ } else {
+ sbatchArgs = append(sbatchArgs, "--constraint=instancetype="+it.Name)
+ }
+
+ return sbatchArgs, nil
}
-func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
+func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
// append() here avoids modifying crunchRunCommand's
// underlying array, which is shared with other goroutines.
crArgs := append([]string(nil), crunchRunCommand...)
crArgs = append(crArgs, container.UUID)
crScript := strings.NewReader(execScript(crArgs))
- sqCheck.L.Lock()
- defer sqCheck.L.Unlock()
+ disp.sqCheck.L.Lock()
+ defer disp.sqCheck.L.Unlock()
- sbArgs := sbatchArgs(container)
+ sbArgs, err := disp.sbatchArgs(container)
+ if err != nil {
+ return err
+ }
log.Printf("running sbatch %+q", sbArgs)
- return theConfig.slurm.Batch(crScript, sbArgs)
+ return disp.slurm.Batch(crScript, sbArgs)
}
// Submit a container to the slurm queue (or resume monitoring if it's
// already in the queue). Cancel the slurm job if the container's
// priority changes to zero or its state indicates it's no longer
// running.
-func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
+ if ctr.State == dispatch.Locked && !disp.sqCheck.HasUUID(ctr.UUID) {
log.Printf("Submitting container %s to slurm", ctr.UUID)
- if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
- text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil {
+ var text string
+ if err == dispatchcloud.ErrConstraintsNotSatisfiable {
+ text = fmt.Sprintf("cannot run container %s: %s", ctr.UUID, err)
+ disp.UpdateState(ctr.UUID, dispatch.Cancelled)
+ } else {
+ text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ }
log.Print(text)
lr := arvadosclient.Dict{"log": arvadosclient.Dict{
// no point in waiting for further dispatch updates: just
// clean up and return.
go func(uuid string) {
- for ctx.Err() == nil && sqCheck.HasUUID(uuid) {
+ for ctx.Err() == nil && disp.sqCheck.HasUUID(uuid) {
}
cancel()
}(ctr.UUID)
return
case updated, ok := <-status:
if !ok {
- log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
- scancel(ctr)
+ log.Printf("container %s is done: cancel slurm job", ctr.UUID)
+ disp.scancel(ctr)
} else if updated.Priority == 0 {
- log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
- scancel(ctr)
+ log.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
+ disp.scancel(ctr)
} else {
- renice(updated)
+ disp.renice(updated)
}
}
}
}
-func scancel(ctr arvados.Container) {
- sqCheck.L.Lock()
- err := theConfig.slurm.Cancel(ctr.UUID)
- sqCheck.L.Unlock()
+func (disp *Dispatcher) scancel(ctr arvados.Container) {
+ disp.sqCheck.L.Lock()
+ err := disp.slurm.Cancel(ctr.UUID)
+ disp.sqCheck.L.Unlock()
if err != nil {
log.Printf("scancel: %s", err)
time.Sleep(time.Second)
- } else if sqCheck.HasUUID(ctr.UUID) {
+ } else if disp.sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s is still in squeue after scancel", ctr.UUID)
time.Sleep(time.Second)
}
}
-func renice(ctr arvados.Container) {
- nice := niceness(ctr.Priority)
- oldnice := sqCheck.GetNiceness(ctr.UUID)
+func (disp *Dispatcher) renice(ctr arvados.Container) {
+ nice := disp.niceness(ctr.Priority)
+ oldnice := disp.sqCheck.GetNiceness(ctr.UUID)
if nice == oldnice || oldnice == -1 {
return
}
log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
- sqCheck.L.Lock()
- err := theConfig.slurm.Renice(ctr.UUID, nice)
- sqCheck.L.Unlock()
+ disp.sqCheck.L.Lock()
+ err := disp.slurm.Renice(ctr.UUID, nice)
+ disp.sqCheck.L.Unlock()
if err != nil {
log.Printf("renice: %s", err)
time.Sleep(time.Second)
return
}
- if sqCheck.HasUUID(ctr.UUID) {
+ if disp.sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s has arvados priority %d, slurm nice %d",
- ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
+ ctr.UUID, ctr.Priority, disp.sqCheck.GetNiceness(ctr.UUID))
}
}
-func readConfig(dst interface{}, path string) error {
- err := config.LoadFile(dst, path)
+func (disp *Dispatcher) readConfig(path string) error {
+ err := config.LoadFile(disp, path)
if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
log.Printf("Config not specified. Continue with default configuration.")
err = nil
"testing"
"time"
+ "git.curoverse.com/arvados.git/lib/dispatchcloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
TestingT(t)
}
-var _ = Suite(&TestSuite{})
-var _ = Suite(&MockArvadosServerSuite{})
+var _ = Suite(&IntegrationSuite{})
+var _ = Suite(&StubbedSuite{})
-type TestSuite struct{}
-type MockArvadosServerSuite struct{}
-
-var initialArgs []string
-
-func (s *TestSuite) SetUpSuite(c *C) {
- initialArgs = os.Args
-}
-
-func (s *TestSuite) TearDownSuite(c *C) {
+type IntegrationSuite struct {
+ disp Dispatcher
+ slurm slurmFake
}
-func (s *TestSuite) SetUpTest(c *C) {
- args := []string{"crunch-dispatch-slurm"}
- os.Args = args
-
+func (s *IntegrationSuite) SetUpTest(c *C) {
arvadostest.StartAPI()
os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
+ s.disp = Dispatcher{}
+ s.disp.setup()
+ s.slurm = slurmFake{}
}
-func (s *TestSuite) TearDownTest(c *C) {
- os.Args = initialArgs
+func (s *IntegrationSuite) TearDownTest(c *C) {
arvadostest.ResetEnv()
arvadostest.StopAPI()
}
-func (s *MockArvadosServerSuite) TearDownTest(c *C) {
- arvadostest.ResetEnv()
-}
-
type slurmFake struct {
didBatch [][]string
didCancel []string
return nil
}
-func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+func (s *IntegrationSuite) integrationTest(c *C,
expectBatch [][]string,
runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
arvadostest.ResetEnv()
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, IsNil)
- defer func(orig Slurm) {
- theConfig.slurm = orig
- }(theConfig.slurm)
- theConfig.slurm = slurm
-
// There should be one queued container
params := arvadosclient.Dict{
"filters": [][]string{{"state", "=", "Queued"}},
c.Check(err, IsNil)
c.Check(len(containers.Items), Equals, 1)
- theConfig.CrunchRunCommand = []string{"echo"}
+ s.disp.CrunchRunCommand = []string{"echo"}
ctx, cancel := context.WithCancel(context.Background())
doneRun := make(chan struct{})
- dispatcher := dispatch.Dispatcher{
+ s.disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Duration(1) * time.Second,
RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
go func() {
runContainer(disp, ctr)
- slurm.queue = ""
+ s.slurm.queue = ""
doneRun <- struct{}{}
}()
- run(disp, ctr, status)
+ s.disp.runContainer(disp, ctr, status)
cancel()
},
}
- sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
+ s.disp.slurm = &s.slurm
+ s.disp.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: s.disp.slurm}
- err = dispatcher.Run(ctx)
+ err = s.disp.Dispatcher.Run(ctx)
<-doneRun
c.Assert(err, Equals, context.Canceled)
- sqCheck.Stop()
+ s.disp.sqCheck.Stop()
- c.Check(slurm.didBatch, DeepEquals, expectBatch)
+ c.Check(s.slurm.didBatch, DeepEquals, expectBatch)
// There should be no queued containers now
err = arv.List("containers", params, &containers)
return container
}
-func (s *TestSuite) TestIntegrationNormal(c *C) {
+func (s *IntegrationSuite) TestNormal(c *C) {
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
container := s.integrationTest(c,
- &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
c.Check(container.State, Equals, arvados.ContainerStateComplete)
}
-func (s *TestSuite) TestIntegrationCancel(c *C) {
- slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+func (s *IntegrationSuite) TestCancel(c *C) {
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
readyToCancel := make(chan bool)
- slurm.onCancel = func() { <-readyToCancel }
+ s.slurm.onCancel = func() { <-readyToCancel }
container := s.integrationTest(c,
- slurm,
nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
close(readyToCancel)
})
c.Check(container.State, Equals, arvados.ContainerStateCancelled)
- c.Check(len(slurm.didCancel) > 1, Equals, true)
- c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
+ c.Check(len(s.slurm.didCancel) > 1, Equals, true)
+ c.Check(s.slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
}
-func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
- container := s.integrationTest(c, &slurmFake{},
+func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
+ container := s.integrationTest(c,
[][]string{{
fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
fmt.Sprintf("--mem=%d", 11445),
c.Check(container.State, Equals, arvados.ContainerStateCancelled)
}
-func (s *TestSuite) TestSbatchFail(c *C) {
+func (s *IntegrationSuite) TestSbatchFail(c *C) {
+ s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
container := s.integrationTest(c,
- &slurmFake{errBatch: errors.New("something terrible happened")},
[][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
c.Assert(len(ll.Items), Equals, 1)
}
-func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
+func (s *IntegrationSuite) TestChangePriority(c *C) {
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ container := s.integrationTest(c, nil,
+ func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ time.Sleep(time.Second)
+ dispatcher.Arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"priority": 600}},
+ nil)
+ time.Sleep(time.Second)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ })
+ c.Check(container.State, Equals, arvados.ContainerStateComplete)
+ c.Assert(len(s.slurm.didRenice), Not(Equals), 0)
+ c.Check(s.slurm.didRenice[len(s.slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
+}
+
+type StubbedSuite struct {
+ disp Dispatcher
+}
+
+func (s *StubbedSuite) SetUpTest(c *C) {
+ s.disp = Dispatcher{}
+ s.disp.setup()
+}
+
+func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
+ s.testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
}
-func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
apiStub := arvadostest.ServerStub{apiStubResponses}
api := httptest.NewServer(&apiStub)
log.SetOutput(io.MultiWriter(buf, os.Stderr))
defer log.SetOutput(os.Stderr)
- theConfig.CrunchRunCommand = []string{crunchCmd}
+ s.disp.CrunchRunCommand = []string{crunchCmd}
ctx, cancel := context.WithCancel(context.Background())
dispatcher := dispatch.Dispatcher{
disp.UpdateState(ctr.UUID, dispatch.Running)
disp.UpdateState(ctr.UUID, dispatch.Complete)
}()
- run(disp, ctr, status)
+ s.disp.runContainer(disp, ctr, status)
cancel()
},
}
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
-func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
- var config Config
- err := readConfig(&config, "/nosuchdir89j7879/8hjwr7ojgyy7")
+func (s *StubbedSuite) TestNoSuchConfigFile(c *C) {
+ err := s.disp.readConfig("/nosuchdir89j7879/8hjwr7ojgyy7")
c.Assert(err, NotNil)
}
-func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
- var config Config
-
+func (s *StubbedSuite) TestBadSbatchArgsConfig(c *C) {
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
c.Check(err, IsNil)
- err = readConfig(&config, tmpfile.Name())
+ err = s.disp.readConfig(tmpfile.Name())
c.Assert(err, NotNil)
}
-func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
- var config Config
-
+func (s *StubbedSuite) TestNoSuchArgInConfigIgnored(c *C) {
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
c.Check(err, IsNil)
- err = readConfig(&config, tmpfile.Name())
+ err = s.disp.readConfig(tmpfile.Name())
c.Assert(err, IsNil)
- c.Check(0, Equals, len(config.SbatchArguments))
+ c.Check(0, Equals, len(s.disp.SbatchArguments))
}
-func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
- var config Config
-
+func (s *StubbedSuite) TestReadConfig(c *C) {
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.Write([]byte(argsS))
c.Check(err, IsNil)
- err = readConfig(&config, tmpfile.Name())
+ err = s.disp.readConfig(tmpfile.Name())
c.Assert(err, IsNil)
- c.Check(3, Equals, len(config.SbatchArguments))
- c.Check(args, DeepEquals, config.SbatchArguments)
+ c.Check(args, DeepEquals, s.disp.SbatchArguments)
}
-func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
- testSbatchFuncWithArgs(c, nil)
-}
-
-func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
- testSbatchFuncWithArgs(c, []string{})
-}
+func (s *StubbedSuite) TestSbatchArgs(c *C) {
+ container := arvados.Container{
+ UUID: "123",
+ RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
+ Priority: 1,
+ }
-func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
- testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
+ for _, defaults := range [][]string{
+ nil,
+ {},
+ {"--arg1=v1", "--arg2"},
+ } {
+ c.Logf("%#v", defaults)
+ s.disp.SbatchArguments = defaults
+
+ args, err := s.disp.sbatchArgs(container)
+ c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"))
+ c.Check(err, IsNil)
+ }
}
-func testSbatchFuncWithArgs(c *C, args []string) {
- defer func() { theConfig.SbatchArguments = nil }()
- theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
-
+func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
container := arvados.Container{
UUID: "123",
RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
- Priority: 1}
+ Priority: 1,
+ }
- var expected []string
- expected = append(expected, theConfig.SbatchArguments...)
- expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
- c.Check(sbatchArgs(container), DeepEquals, expected)
+ for _, trial := range []struct {
+ types []arvados.InstanceType
+ sbatchArgs []string
+ err error
+ }{
+ // Choose node type => use --constraint arg
+ {
+ types: []arvados.InstanceType{
+ {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+ {Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
+ {Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
+ {Name: "a1.large", Price: 0.16, RAM: 1024000000, VCPUs: 8},
+ },
+ sbatchArgs: []string{"--constraint=instancetype=a1.medium"},
+ },
+ // No node types configured => no slurm constraint
+ {
+ types: nil,
+ sbatchArgs: nil,
+ },
+ // No node type is big enough => error
+ {
+ types: []arvados.InstanceType{
+ {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+ },
+ err: dispatchcloud.ErrConstraintsNotSatisfiable,
+ },
+ } {
+ c.Logf("%#v", trial)
+ s.disp.cluster = &arvados.Cluster{InstanceTypes: trial.types}
+
+ args, err := s.disp.sbatchArgs(container)
+ c.Check(err, Equals, trial.err)
+ if trial.err == nil {
+ c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"}, trial.sbatchArgs...))
+ }
+ }
}
-func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
+func (s *StubbedSuite) TestSbatchPartition(c *C) {
container := arvados.Container{
UUID: "123",
RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
- Priority: 1}
+ Priority: 1,
+ }
- c.Check(sbatchArgs(container), DeepEquals, []string{
+ args, err := s.disp.sbatchArgs(container)
+ c.Check(args, DeepEquals, []string{
"--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
"--partition=blurb,b2",
})
-}
-
-func (s *TestSuite) TestIntegrationChangePriority(c *C) {
- slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
- container := s.integrationTest(c, slurm, nil,
- func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
- dispatcher.UpdateState(container.UUID, dispatch.Running)
- time.Sleep(time.Second)
- dispatcher.Arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"priority": 600}},
- nil)
- time.Sleep(time.Second)
- dispatcher.UpdateState(container.UUID, dispatch.Complete)
- })
- c.Check(container.State, Equals, arvados.ContainerStateComplete)
- c.Assert(len(slurm.didRenice), Not(Equals), 0)
- c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
+ c.Check(err, IsNil)
}
// command 'squeue'.
type SqueueChecker struct {
Period time.Duration
+ Slurm Slurm
uuids map[string]jobPriority
startOnce sync.Once
done chan struct{}
sqc.L.Lock()
defer sqc.L.Unlock()
- cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
+ cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
},
"CrunchRunCommand": ["crunch-run"],
"PollPeriod": "10s",
- "SbatchArguments": ["--partition=foo", "--exclude=node13"]
+ "SbatchArguments": ["--partition=foo", "--exclude=node13"],
+ "ReserveExtraRAM": 268435456,
}`)
func usage(fs *flag.FlagSet) {
def _remove(self, obj, clear):
if clear:
- if obj.in_use():
- _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
- return
+ # Kernel behavior seems to be that if a file is
+ # referenced, its parents remain referenced too. This
+ # means has_ref() exits early when a collection is not
+ # candidate for eviction.
+ #
+ # By contrast, in_use() doesn't increment references on
+ # parents, so it requires a full tree walk to determine if
+ # a collection is a candidate for eviction. This takes
+ # .07s for 240000 files, which becomes a major drag when
+ # cap_cache is being called several times a second and
+ # there are multiple non-evictable collections in the
+ # cache.
+ #
+ # So it is important for performance that we do the
+ # has_ref() check first.
+
if obj.has_ref(True):
_logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
return
+
+ if obj.in_use():
+ _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
+ return
+
obj.kernel_invalidate()
_logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
obj.clear()
if obj not in self._by_uuid[obj.cache_uuid]:
self._by_uuid[obj.cache_uuid].append(obj)
self._total += obj.objsize()
- _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
+ _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
+ obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
self.cap_cache()
def touch(self, obj):
* Clear the object contents (invalidates the object)
"""
+
+ __slots__ = ("_stale", "_poll", "_last_update", "_atime", "_poll_time", "use_count",
+ "ref_count", "dead", "cache_size", "cache_uuid", "allow_attr_cache")
+
def __init__(self):
self._stale = True
self._poll = False
class File(FreshBase):
"""Base for file objects."""
+ __slots__ = ("inode", "parent_inode", "_mtime")
+
def __init__(self, parent_inode, _mtime=0):
super(File, self).__init__()
self.inode = None
class FuseArvadosFile(File):
"""Wraps a ArvadosFile."""
+ __slots__ = ('arvfile',)
+
def __init__(self, parent_inode, arvfile, _mtime):
super(FuseArvadosFile, self).__init__(parent_inode, _mtime)
self.arvfile = arvfile
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- log "github.com/Sirupsen/logrus"
- "github.com/curoverse/azure-sdk-for-go/storage"
+ "github.com/Azure/azure-sdk-for-go/storage"
)
const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
AzureReplication int
ReadOnly bool
RequestTimeout arvados.Duration
+ StorageClasses []string
- azClient storage.Client
- bsClient *azureBlobClient
+ azClient storage.Client
+ container *azureContainer
+}
+
+// singleSender is a single-attempt storage.Sender.
+type singleSender struct{}
+
+// Send performs req exactly once.
+func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
+ return c.HTTPClient.Do(req)
}
// Examples implements VolumeWithExamples.
if err != nil {
return fmt.Errorf("creating Azure storage client: %s", err)
}
+ v.azClient.Sender = &singleSender{}
if v.RequestTimeout == 0 {
v.RequestTimeout = azureDefaultRequestTimeout
Timeout: time.Duration(v.RequestTimeout),
}
bs := v.azClient.GetBlobService()
- v.bsClient = &azureBlobClient{
- client: &bs,
+ v.container = &azureContainer{
+ ctr: bs.GetContainerReference(v.ContainerName),
}
- ok, err := v.bsClient.ContainerExists(v.ContainerName)
- if err != nil {
+ if ok, err := v.container.Exists(); err != nil {
return err
- }
- if !ok {
+ } else if !ok {
return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
}
return nil
// Return true if expires_at metadata attribute is found on the block
func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
- metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
+ metadata, err := v.container.GetBlobMetadata(loc)
if err != nil {
return false, metadata, v.translateError(err)
}
if azureMaxGetBytes < BlockSize {
// Unfortunately the handler doesn't tell us how long the blob
// is expected to be, so we have to ask Azure.
- props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+ props, err := v.container.GetBlobProperties(loc)
if err != nil {
return 0, v.translateError(err)
}
go func() {
defer close(gotRdr)
if startPos == 0 && endPos == expectSize {
- rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+ rdr, err = v.container.GetBlob(loc)
} else {
- rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+ rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
}
}()
select {
gotRdr := make(chan struct{})
go func() {
defer close(gotRdr)
- rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+ rdr, err = v.container.GetBlob(loc)
}()
select {
case <-ctx.Done():
body = http.NoBody
bufr.Close()
}
- errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), body, nil)
+ errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
}()
select {
case <-ctx.Done():
}
metadata["touch"] = fmt.Sprintf("%d", time.Now())
- return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
+ return v.container.SetBlobMetadata(loc, metadata, nil)
}
// Mtime returns the last-modified property of a block blob.
return time.Time{}, os.ErrNotExist
}
- props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+ props, err := v.container.GetBlobProperties(loc)
if err != nil {
return time.Time{}, err
}
- return time.Parse(time.RFC1123, props.LastModified)
+ return time.Time(props.LastModified), nil
}
// IndexTo writes a list of Keep blocks that are stored in the
func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
params := storage.ListBlobsParameters{
Prefix: prefix,
- Include: "metadata",
+ Include: &storage.IncludeBlobDataset{Metadata: true},
}
for {
- resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
+ resp, err := v.container.ListBlobs(params)
if err != nil {
return err
}
for _, b := range resp.Blobs {
- t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
- if err != nil {
- return err
- }
if !v.isKeepBlock(b.Name) {
continue
}
- if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
+ modtime := time.Time(b.Properties.LastModified)
+ if b.Properties.ContentLength == 0 && modtime.Add(azureWriteRaceInterval).After(time.Now()) {
// A new zero-length blob is probably
// just a new non-empty blob that
// hasn't committed its data yet (see
// Trashed blob; exclude it from response
continue
}
- fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
+ fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
}
if resp.NextMarker == "" {
return nil
// we get the Etag before checking Mtime, and use If-Match to
// ensure we don't delete data if Put() or Touch() happens
// between our calls to Mtime() and DeleteBlob().
- props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+ props, err := v.container.GetBlobProperties(loc)
if err != nil {
return err
}
// If TrashLifetime == 0, just delete it
if theConfig.TrashLifetime == 0 {
- return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
- "If-Match": props.Etag,
+ return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
+ IfMatch: props.Etag,
})
}
// Otherwise, mark as trash
- return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
+ return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
"expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
- }, map[string]string{
- "If-Match": props.Etag,
+ }, &storage.SetBlobMetadataOptions{
+ IfMatch: props.Etag,
})
}
// Delete the expires_at metadata attribute
func (v *AzureBlobVolume) Untrash(loc string) error {
// if expires_at does not exist, return NotFoundError
- metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
+ metadata, err := v.container.GetBlobMetadata(loc)
if err != nil {
return v.translateError(err)
}
// reset expires_at metadata attribute
metadata["expires_at"] = ""
- err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
+ err = v.container.SetBlobMetadata(loc, metadata, nil)
return v.translateError(err)
}
return v.AzureReplication
}
+// GetStorageClasses implements Volume
+func (v *AzureBlobVolume) GetStorageClasses() []string {
+ return v.StorageClasses
+}
+
// If possible, translate an Azure SDK error to a recognizable error
// like os.ErrNotExist.
func (v *AzureBlobVolume) translateError(err error) error {
func (v *AzureBlobVolume) EmptyTrash() {
var bytesDeleted, bytesInTrash int64
var blocksDeleted, blocksInTrash int
- params := storage.ListBlobsParameters{Include: "metadata"}
+ params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
for {
- resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
+ resp, err := v.container.ListBlobs(params)
if err != nil {
log.Printf("EmptyTrash: ListBlobs: %v", err)
break
continue
}
- err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
- "If-Match": b.Properties.Etag,
+ err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
+ IfMatch: b.Properties.Etag,
})
if err != nil {
log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
// InternalStats returns bucket I/O and API call counters.
func (v *AzureBlobVolume) InternalStats() interface{} {
- return &v.bsClient.stats
+ return &v.container.stats
}
type azureBlobStats struct {
s.statsTicker.TickErr(err, errType)
}
-// azureBlobClient wraps storage.BlobStorageClient in order to count
-// I/O and API usage stats.
-type azureBlobClient struct {
- client *storage.BlobStorageClient
- stats azureBlobStats
+// azureContainer wraps storage.Container in order to count I/O and
+// API usage stats.
+type azureContainer struct {
+ ctr *storage.Container
+ stats azureBlobStats
}
-func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
+func (c *azureContainer) Exists() (bool, error) {
c.stats.Tick(&c.stats.Ops)
- ok, err := c.client.ContainerExists(cname)
+ ok, err := c.ctr.Exists()
c.stats.TickErr(err)
return ok, err
}
-func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
+func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
- m, err := c.client.GetBlobMetadata(cname, bname)
+ b := c.ctr.GetBlobReference(bname)
+ err := b.GetMetadata(nil)
c.stats.TickErr(err)
- return m, err
+ return b.Metadata, err
}
-func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
+func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
- p, err := c.client.GetBlobProperties(cname, bname)
+ b := c.ctr.GetBlobReference(bname)
+ err := b.GetProperties(nil)
c.stats.TickErr(err)
- return p, err
+ return &b.Properties, err
}
-func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
+func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
- rdr, err := c.client.GetBlob(cname, bname)
+ b := c.ctr.GetBlobReference(bname)
+ rdr, err := b.Get(nil)
c.stats.TickErr(err)
return NewCountingReader(rdr, c.stats.TickInBytes), err
}
-func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
+func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
- rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
+ b := c.ctr.GetBlobReference(bname)
+ rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
+ Range: &storage.BlobRange{
+ Start: uint64(start),
+ End: uint64(end),
+ },
+ GetBlobOptions: opts,
+ })
c.stats.TickErr(err)
return NewCountingReader(rdr, c.stats.TickInBytes), err
}
-func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
+// If we give it an io.Reader that doesn't also have a Len() int
+// method, the Azure SDK determines data size by copying the data into
+// a new buffer, which is not a good use of memory.
+type readerWithAzureLen struct {
+ io.Reader
+ len int
+}
+
+// Len satisfies the private lener interface in azure-sdk-for-go.
+func (r *readerWithAzureLen) Len() int {
+ return r.len
+}
+
+func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
if size != 0 {
- rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
+ rdr = &readerWithAzureLen{
+ Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
+ len: size,
+ }
}
- err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
+ b := c.ctr.GetBlobReference(bname)
+ err := b.CreateBlockBlobFromReader(rdr, opts)
c.stats.TickErr(err)
return err
}
-func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
+func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
- err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
+ b := c.ctr.GetBlobReference(bname)
+ b.Metadata = m
+ err := b.SetMetadata(opts)
c.stats.TickErr(err)
return err
}
-func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
- resp, err := c.client.ListBlobs(cname, params)
+ resp, err := c.ctr.ListBlobs(params)
c.stats.TickErr(err)
return resp, err
}
-func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
+func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
- err := c.client.DeleteBlob(cname, bname, hdrs)
+ b := c.ctr.GetBlobReference(bname)
+ err := b.Delete(opts)
c.stats.TickErr(err)
return err
}
"testing"
"time"
- log "github.com/Sirupsen/logrus"
- "github.com/curoverse/azure-sdk-for-go/storage"
+ "github.com/Azure/azure-sdk-for-go/storage"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
// used by Microsoft's Azure emulator: the Azure SDK
// recognizes that magic string and changes its behavior to
// cater to the Azure SDK's own test suite.
- fakeAccountName = "fakeAccountName"
+ fakeAccountName = "fakeaccountname"
fakeAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
)
b := storage.Blob{
Name: hash,
Properties: storage.BlobProperties{
- LastModified: blob.Mtime.Format(time.RFC1123),
+ LastModified: storage.TimeRFC1123(blob.Mtime),
ContentLength: int64(len(blob.Data)),
Etag: blob.Etag,
},
ReadOnly: readonly,
AzureReplication: replication,
azClient: azClient,
- bsClient: &azureBlobClient{client: &bs},
+ container: &azureContainer{ctr: bs.GetContainerReference(container)},
}
return &TestableAzureBlobVolume{
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
+func (s *StubbedAzureBlobSuite) TestConfig(c *check.C) {
+ var cfg Config
+ err := yaml.Unmarshal([]byte(`
+Volumes:
+ - Type: Azure
+ StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+ c.Check(err, check.IsNil)
+ c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
v.azHandler.PutRaw(v.ContainerName, locator, data)
}
"sync"
"sync/atomic"
"time"
-
- log "github.com/Sirupsen/logrus"
)
type bufferPool struct {
package main
import (
- . "gopkg.in/check.v1"
"time"
+
+ . "gopkg.in/check.v1"
)
var _ = Suite(&BufferPoolSuite{})
"encoding/json"
"fmt"
"io/ioutil"
+ "net/http"
+ "strconv"
"strings"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- log "github.com/Sirupsen/logrus"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
+ "github.com/Sirupsen/logrus"
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
)
type Config struct {
debugLogf func(string, ...interface{})
ManagementToken string
+
+ metrics
}
-var theConfig = DefaultConfig()
+var (
+ theConfig = DefaultConfig()
+ formatter = map[string]logrus.Formatter{
+ "text": &logrus.TextFormatter{
+ FullTimestamp: true,
+ TimestampFormat: rfc3339NanoFixed,
+ },
+ "json": &logrus.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ },
+ }
+ log = logrus.StandardLogger()
+)
const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
// fields, and before using the config.
func (cfg *Config) Start() error {
if cfg.Debug {
- log.SetLevel(log.DebugLevel)
+ log.Level = logrus.DebugLevel
cfg.debugLogf = log.Printf
cfg.debugLogf("debugging enabled")
} else {
+ log.Level = logrus.InfoLevel
cfg.debugLogf = func(string, ...interface{}) {}
}
- switch strings.ToLower(cfg.LogFormat) {
- case "text":
- log.SetFormatter(&log.TextFormatter{
- FullTimestamp: true,
- TimestampFormat: rfc3339NanoFixed,
- })
- case "json":
- log.SetFormatter(&log.JSONFormatter{
- TimestampFormat: rfc3339NanoFixed,
- })
- default:
+ f := formatter[strings.ToLower(cfg.LogFormat)]
+ if f == nil {
return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
}
+ log.Formatter = f
if cfg.MaxBuffers < 0 {
return fmt.Errorf("MaxBuffers must be greater than zero")
return nil
}
+type metrics struct {
+ registry *prometheus.Registry
+ reqDuration *prometheus.SummaryVec
+ timeToStatus *prometheus.SummaryVec
+ exportProm http.Handler
+}
+
+func (*metrics) Levels() []logrus.Level {
+ return logrus.AllLevels
+}
+
+func (m *metrics) Fire(ent *logrus.Entry) error {
+ if tts, ok := ent.Data["timeToStatus"].(stats.Duration); !ok {
+ } else if method, ok := ent.Data["reqMethod"].(string); !ok {
+ } else if code, ok := ent.Data["respStatusCode"].(int); !ok {
+ } else {
+ m.timeToStatus.WithLabelValues(strconv.Itoa(code), strings.ToLower(method)).Observe(time.Duration(tts).Seconds())
+ }
+ return nil
+}
+
+func (m *metrics) setup() {
+ m.registry = prometheus.NewRegistry()
+ m.timeToStatus = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+ Name: "time_to_status_seconds",
+ Help: "Summary of request TTFB.",
+ }, []string{"code", "method"})
+ m.reqDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+ Name: "request_duration_seconds",
+ Help: "Summary of request duration.",
+ }, []string{"code", "method"})
+ m.registry.MustRegister(m.timeToStatus)
+ m.registry.MustRegister(m.reqDuration)
+ m.exportProm = promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{
+ ErrorLog: log,
+ })
+ log.AddHook(m)
+}
+
+func (m *metrics) exportJSON(w http.ResponseWriter, req *http.Request) {
+ jm := jsonpb.Marshaler{Indent: " "}
+ mfs, _ := m.registry.Gather()
+ w.Write([]byte{'['})
+ for i, mf := range mfs {
+ if i > 0 {
+ w.Write([]byte{','})
+ }
+ jm.Marshal(w, mf)
+ }
+ w.Write([]byte{']'})
+}
+
+func (m *metrics) Instrument(next http.Handler) http.Handler {
+ return promhttp.InstrumentHandlerDuration(m.reqDuration, next)
+}
+
// VolumeTypes is built up by init() funcs in the source files that
// define the volume types.
var VolumeTypes = []func() VolumeWithExamples{}
type VolumeList []Volume
-// UnmarshalJSON, given an array of objects, deserializes each object
-// as the volume type indicated by the object's Type field.
-func (vols *VolumeList) UnmarshalJSON(data []byte) error {
+// UnmarshalJSON -- given an array of objects -- deserializes each
+// object as the volume type indicated by the object's Type field.
+func (vl *VolumeList) UnmarshalJSON(data []byte) error {
typeMap := map[string]func() VolumeWithExamples{}
for _, factory := range VolumeTypes {
t := factory().Type()
if err != nil {
return err
}
- *vols = append(*vols, vol)
+ *vl = append(*vl, vol)
}
return nil
}
package main
import (
- log "github.com/Sirupsen/logrus"
+ "github.com/Sirupsen/logrus"
)
func init() {
+ log.Level = logrus.DebugLevel
theConfig.debugLogf = log.Printf
}
"git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
- log "github.com/Sirupsen/logrus"
)
type router struct {
// MakeRESTRouter returns a new router that forwards all Keep requests
// to the appropriate handlers.
-func MakeRESTRouter() *router {
- rest := mux.NewRouter()
- rtr := &router{Router: rest}
+func MakeRESTRouter() http.Handler {
+ rtr := &router{Router: mux.NewRouter()}
- rest.HandleFunc(
+ rtr.HandleFunc(
`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
- rest.HandleFunc(
+ rtr.HandleFunc(
`/{hash:[0-9a-f]{32}}+{hints}`,
GetBlockHandler).Methods("GET", "HEAD")
- rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
- rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
+ rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+ rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
// List all blocks stored here. Privileged client only.
- rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
+ rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
// List blocks stored here whose hash has the given prefix.
// Privileged client only.
- rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
+ rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
// Internals/debugging info (runtime.MemStats)
- rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
+ rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
// List volumes: path, device number, bytes used/avail.
- rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
+ rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
// List mounts: UUID, readonly, tier, device ID, ...
- rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
- rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
- rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
+ rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
+ rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
+ rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
// Replace the current pull queue.
- rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+ rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
// Replace the current trash queue.
- rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+ rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
// Untrash moves blocks from trash back into store
- rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+ rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
- rest.Handle("/_health/{check}", &health.Handler{
+ rtr.Handle("/_health/{check}", &health.Handler{
Token: theConfig.ManagementToken,
Prefix: "/_health/",
}).Methods("GET")
// Any request which does not match any of these routes gets
// 400 Bad Request.
- rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
+ rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
- return rtr
+ theConfig.metrics.setup()
+
+ rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
+
+ mux := http.NewServeMux()
+ mux.Handle("/", theConfig.metrics.Instrument(
+ httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter))))
+ mux.HandleFunc("/metrics.json", theConfig.metrics.exportJSON)
+ mux.Handle("/metrics", theConfig.metrics.exportProm)
+
+ return mux
}
// BadRequestHandler is a HandleFunc to address bad requests.
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
- log "github.com/Sirupsen/logrus"
"github.com/coreos/go-systemd/daemon"
)
// Middleware/handler stack
router := MakeRESTRouter()
- limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
- router.limiter = limiter
- http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
log.Printf("Error notifying init daemon: %v", err)
}
log.Println("listening at", listener.Addr())
- srv := &http.Server{}
+ srv := &http.Server{Handler: router}
srv.Serve(listener)
}
package main
import (
+ "bytes"
"context"
"encoding/json"
"net/http"
func (s *MountsSuite) SetUpTest(c *check.C) {
s.vm = MakeTestVolumeManager(2)
KeepVM = s.vm
- s.rtr = MakeRESTRouter()
+ theConfig = DefaultConfig()
theConfig.systemAuthToken = arvadostest.DataManagerToken
+ theConfig.Start()
+ s.rtr = MakeRESTRouter()
}
func (s *MountsSuite) TearDownTest(c *check.C) {
vols[0].Put(context.Background(), TestHash, TestBlock)
vols[1].Put(context.Background(), TestHash2, TestBlock2)
- resp := s.call("GET", "/mounts", "")
+ resp := s.call("GET", "/mounts", "", nil)
c.Check(resp.Code, check.Equals, http.StatusOK)
var mntList []struct {
- UUID string
- DeviceID string
- ReadOnly bool
- Replication int
- Tier int
+ UUID string
+ DeviceID string
+ ReadOnly bool
+ Replication int
+ StorageClasses []string
}
err := json.Unmarshal(resp.Body.Bytes(), &mntList)
c.Assert(err, check.IsNil)
c.Check(m.DeviceID, check.Equals, "mock-device-id")
c.Check(m.ReadOnly, check.Equals, false)
c.Check(m.Replication, check.Equals, 1)
- c.Check(m.Tier, check.Equals, 1)
+ c.Check(m.StorageClasses, check.DeepEquals, []string{"default"})
}
c.Check(mntList[0].UUID, check.Not(check.Equals), mntList[1].UUID)
// Bad auth
for _, tok := range []string{"", "xyzzy"} {
- resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks", tok)
+ resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks", tok, nil)
c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
c.Check(resp.Body.String(), check.Equals, "Unauthorized\n")
}
tok := arvadostest.DataManagerToken
// Nonexistent mount UUID
- resp = s.call("GET", "/mounts/X/blocks", tok)
+ resp = s.call("GET", "/mounts/X/blocks", tok, nil)
c.Check(resp.Code, check.Equals, http.StatusNotFound)
c.Check(resp.Body.String(), check.Equals, "mount not found\n")
// Complete index of first mount
- resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks", tok)
+ resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks", tok, nil)
c.Check(resp.Code, check.Equals, http.StatusOK)
c.Check(resp.Body.String(), check.Matches, TestHash+`\+[0-9]+ [0-9]+\n\n`)
// Partial index of first mount (one block matches prefix)
- resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks?prefix="+TestHash[:2], tok)
+ resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks?prefix="+TestHash[:2], tok, nil)
c.Check(resp.Code, check.Equals, http.StatusOK)
c.Check(resp.Body.String(), check.Matches, TestHash+`\+[0-9]+ [0-9]+\n\n`)
// Complete index of second mount (note trailing slash)
- resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/", tok)
+ resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/", tok, nil)
c.Check(resp.Code, check.Equals, http.StatusOK)
c.Check(resp.Body.String(), check.Matches, TestHash2+`\+[0-9]+ [0-9]+\n\n`)
// Partial index of second mount (no blocks match prefix)
- resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/?prefix="+TestHash[:2], tok)
+ resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/?prefix="+TestHash[:2], tok, nil)
c.Check(resp.Code, check.Equals, http.StatusOK)
c.Check(resp.Body.String(), check.Equals, "\n")
}
-func (s *MountsSuite) call(method, path, tok string) *httptest.ResponseRecorder {
+func (s *MountsSuite) TestMetrics(c *check.C) {
+ s.call("PUT", "/"+TestHash, "", TestBlock)
+ s.call("PUT", "/"+TestHash2, "", TestBlock2)
+ resp := s.call("GET", "/metrics.json", "", nil)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ var j []struct {
+ Name string
+ Help string
+ Type string
+ Metric []struct {
+ Label []struct {
+ Name string
+ Value string
+ }
+ Summary struct {
+ SampleCount string `json:"sample_count"`
+ SampleSum float64 `json:"sample_sum"`
+ Quantile []struct {
+ Quantile float64
+ Value float64
+ }
+ }
+ }
+ }
+ json.NewDecoder(resp.Body).Decode(&j)
+ found := make(map[string]bool)
+ for _, g := range j {
+ for _, m := range g.Metric {
+ if len(m.Label) == 2 && m.Label[0].Name == "code" && m.Label[0].Value == "200" && m.Label[1].Name == "method" && m.Label[1].Value == "put" {
+ c.Check(m.Summary.SampleCount, check.Equals, "2")
+ c.Check(len(m.Summary.Quantile), check.Not(check.Equals), 0)
+ c.Check(m.Summary.Quantile[0].Value, check.Not(check.Equals), float64(0))
+ found[g.Name] = true
+ }
+ }
+ }
+ c.Check(found["request_duration_seconds"], check.Equals, true)
+ c.Check(found["time_to_status_seconds"], check.Equals, true)
+}
+
+func (s *MountsSuite) call(method, path, tok string, body []byte) *httptest.ResponseRecorder {
resp := httptest.NewRecorder()
- req, _ := http.NewRequest(method, path, nil)
+ req, _ := http.NewRequest(method, path, bytes.NewReader(body))
if tok != "" {
req.Header.Set("Authorization", "OAuth2 "+tok)
}
"time"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
-
- log "github.com/Sirupsen/logrus"
)
// RunPullWorker receives PullRequests from pullq, invokes
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
- log "github.com/Sirupsen/logrus"
)
const (
RaceWindow arvados.Duration
ReadOnly bool
UnsafeDelete bool
+ StorageClasses []string
bucket *s3bucket
return v.S3Replication
}
+// GetStorageClasses implements Volume
+func (v *S3Volume) GetStorageClasses() []string {
+ return v.StorageClasses
+}
+
var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
func (v *S3Volume) isKeepBlock(s string) bool {
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/s3"
"github.com/AdRoll/goamz/s3/s3test"
- log "github.com/Sirupsen/logrus"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
return v
}
+func (s *StubbedS3Suite) TestConfig(c *check.C) {
+ var cfg Config
+ err := yaml.Unmarshal([]byte(`
+Volumes:
+ - Type: S3
+ StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+ c.Check(err, check.IsNil)
+ c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
func (v *TestableS3Volume) Start() error {
tmp, err := ioutil.TempFile("", "keepstore")
v.c.Assert(err, check.IsNil)
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- log "github.com/Sirupsen/logrus"
)
// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
// Return a globally unique ID of the underlying storage
// device if possible, otherwise "".
DeviceID() string
+
+ // Get the storage classes associated with this volume
+ GetStorageClasses() []string
}
// A VolumeWithExamples provides example configs to display in the
// A VolumeMount is an attachment of a Volume to a VolumeManager.
type VolumeMount struct {
- UUID string
- DeviceID string
- ReadOnly bool
- Replication int
- Tier int
- volume Volume
+ UUID string
+ DeviceID string
+ ReadOnly bool
+ Replication int
+ StorageClasses []string
+ volume Volume
}
// Generate a UUID the way API server would for a "KeepVolumeMount"
}
vm.mountMap = make(map[string]*VolumeMount)
for _, v := range volumes {
+ sc := v.GetStorageClasses()
+ if len(sc) == 0 {
+ sc = []string{"default"}
+ }
mnt := &VolumeMount{
- UUID: (*VolumeMount)(nil).generateUUID(),
- DeviceID: v.DeviceID(),
- ReadOnly: !v.Writable(),
- Replication: v.Replication(),
- Tier: 1,
- volume: v,
+ UUID: (*VolumeMount)(nil).generateUUID(),
+ DeviceID: v.DeviceID(),
+ ReadOnly: !v.Writable(),
+ Replication: v.Replication(),
+ StorageClasses: sc,
+ volume: v,
}
vm.iostats[v] = &ioStats{}
vm.mounts = append(vm.mounts, mnt)
func (v *MockVolume) EmptyTrash() {
}
+
+func (v *MockVolume) GetStorageClasses() []string {
+ return nil
+}
"sync"
"syscall"
"time"
-
- log "github.com/Sirupsen/logrus"
)
type unixVolumeAdder struct {
ReadOnly bool
Serialize bool
DirectoryReplication int
+ StorageClasses []string
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
return v.DirectoryReplication
}
+// GetStorageClasses implements Volume
+func (v *UnixVolume) GetStorageClasses() []string {
+ return v.StorageClasses
+}
+
// InternalStats returns I/O and filesystem ops counters.
func (v *UnixVolume) InternalStats() interface{} {
return &v.os.stats
"testing"
"time"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
}
+
+func (s *UnixVolumeSuite) TestConfig(c *check.C) {
+ var cfg Config
+ err := yaml.Unmarshal([]byte(`
+Volumes:
+ - Type: Directory
+ StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+ c.Check(err, check.IsNil)
+ c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
- self.arvados_node = self._arvados.nodes().create(body={}).execute()
+ self.arvados_node = self._arvados.nodes().create(
+ body={}, assign_slot=True).execute()
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
- self.arvados_node = self._clean_arvados_node(
- node, "Prepared by Node Manager")
+ self._clean_arvados_node(node, "Prepared by Node Manager")
+ self.arvados_node = self._arvados.nodes().update(
+ uuid=node['uuid'], body={}, assign_slot=True).execute()
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry()
def sync_node(self, cloud_node, arvados_node):
- return self._cloud.sync_node(cloud_node, arvados_node)
+ if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
+ return self._cloud.sync_node(cloud_node, arvados_node)
class ComputeNodeMonitorActor(config.actor_class):
for shutdown.
"""
def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
- cloud_fqdn_func, timer_actor, update_actor, cloud_client,
+ timer_actor, update_actor, cloud_client,
arvados_node=None, poll_stale_after=600, node_stale_after=3600,
boot_fail_after=1800
):
super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.tell_proxy()
self._shutdowns = shutdown_timer
- self._cloud_node_fqdn = cloud_fqdn_func
self._timer = timer_actor
self._update = update_actor
self._cloud = cloud_client
self._later.consider_shutdown()
def update_arvados_node(self, arvados_node):
- # If the cloud node's FQDN doesn't match what's in the Arvados node
- # record, make them match.
+ """Called when the latest Arvados node record is retrieved.
+
+ Calls the updater's sync_node() method.
+
+ """
# This method is a little unusual in the way it just fires off the
# request without checking the result or retrying errors. That's
# because this update happens every time we reload the Arvados node
# the logic to throttle those effective retries when there's trouble.
if arvados_node is not None:
self.arvados_node = arvados_node
- if (self._cloud_node_fqdn(self.cloud_node) !=
- arvados_node_fqdn(self.arvados_node)):
- self._update.sync_node(self.cloud_node, self.arvados_node)
+ self._update.sync_node(self.cloud_node, self.arvados_node)
self._later.consider_shutdown()
import subprocess
import time
-from . import \
- ComputeNodeSetupActor, ComputeNodeMonitorActor
+from . import ComputeNodeMonitorActor
+from . import ComputeNodeSetupActor as SetupActorBase
from . import ComputeNodeShutdownActor as ShutdownActorBase
from . import ComputeNodeUpdateActor as UpdateActorBase
from .. import RetryMixin
'fail\n', 'fail*\n'])
SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
- def _set_node_state(self, nodename, state, *args):
- cmd = ['scontrol', 'update', 'NodeName=' + nodename,
- 'State=' + state]
- cmd.extend(args)
- subprocess.check_output(cmd)
+ def _update_slurm_node(self, nodename, updates):
+ cmd = ['scontrol', 'update', 'NodeName=' + nodename] + updates
+ try:
+ subprocess.check_output(cmd)
+ except:
+ self._logger.error(
+ "SLURM update %r failed", cmd, exc_info=True)
+
+ def _update_slurm_size_attrs(self, nodename, size):
+ self._update_slurm_node(nodename, [
+ 'Weight=%i' % int(size.price * 1000),
+ 'Features=instancetype=' + size.id,
+ ])
def _get_slurm_state(self, nodename):
return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
+class ComputeNodeSetupActor(SlurmMixin, SetupActorBase):
+ def create_cloud_node(self):
+ hostname = self.arvados_node.get("hostname")
+ if hostname:
+ self._update_slurm_size_attrs(hostname, self.cloud_size)
+ return super(ComputeNodeSetupActor, self).create_cloud_node()
+
+
class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
def on_start(self):
arv_node = self._arvados_node()
if self._nodename:
if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
# Resume from "drng" or "drain"
- self._set_node_state(self._nodename, 'RESUME')
+ self._update_slurm_node(self._nodename, ['State=RESUME'])
else:
# Node is in a state such as 'idle' or 'alloc' so don't
# try to resume it because that will just raise an error.
if self.cancel_reason is not None:
return
if self._nodename:
- self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
+ self._update_slurm_node(self._nodename, [
+ 'State=DRAIN', 'Reason=Node Manager shutdown'])
self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
self._later.await_slurm_drain()
else:
def _destroy_node(self):
if self._nodename:
- self._set_node_state(self._nodename, 'DOWN', 'Reason=Node Manager shutdown')
+ self._update_slurm_node(self._nodename, [
+ 'State=DOWN', 'Reason=Node Manager shutdown'])
super(ComputeNodeShutdownActor, self)._destroy_node()
-class ComputeNodeUpdateActor(UpdateActorBase):
+class ComputeNodeUpdateActor(SlurmMixin, UpdateActorBase):
def sync_node(self, cloud_node, arvados_node):
- if arvados_node.get("hostname"):
- try:
- subprocess.check_output(['scontrol', 'update', 'NodeName=' + arvados_node["hostname"], 'Weight=%i' % int(cloud_node.size.price * 1000)])
- except:
- self._logger.error("Unable to set slurm node weight.", exc_info=True)
- return super(ComputeNodeUpdateActor, self).sync_node(cloud_node, arvados_node)
+ """Keep SLURM's node properties up to date."""
+ hostname = arvados_node.get("hostname")
+ features = arvados_node.get("slurm_node_features", "").split(",")
+ sizefeature = "instancetype=" + cloud_node.size.id
+ if hostname and sizefeature not in features:
+ # This probably means SLURM has restarted and lost our
+ # dynamically configured node weights and features.
+ self._update_slurm_size_attrs(hostname, cloud_node.size)
+ return super(ComputeNodeUpdateActor, self).sync_node(
+ cloud_node, arvados_node)
super(ComputeNodeDriver, self).__init__(
auth_kwargs, list_kwargs, create_kwargs,
driver_class)
- self._sizes_by_name = {sz.name: sz for sz in self.sizes.itervalues()}
+ self._sizes_by_id = {sz.id: sz for sz in self.sizes.itervalues()}
self._disktype_links = {dt.name: self._object_link(dt)
for dt in self.real.ex_list_disktypes()}
# and monkeypatch the results when that's the case.
if nodelist and not hasattr(nodelist[0].size, 'id'):
for node in nodelist:
- node.size = self._sizes_by_name[node.size]
+ node.size = self._sizes_by_id[node.size]
return nodelist
@classmethod
cloud_node=cloud_node,
cloud_node_start_time=start_time,
shutdown_timer=shutdown_timer,
- cloud_fqdn_func=self._cloud_driver.node_fqdn,
update_actor=self._cloud_updater,
timer_actor=self._timer,
arvados_node=None,
from __future__ import absolute_import, print_function
import logging
+import re
import subprocess
import arvados.util
return fallback
def cloud_size_for_constraints(self, constraints):
+ specified_size = constraints.get('instance_type')
want_value = lambda key: self.coerce_int(constraints.get(key), 0)
wants = {'cores': want_value('min_cores_per_node'),
'ram': want_value('min_ram_mb_per_node'),
'scratch': want_value('min_scratch_mb_per_node')}
for size in self.cloud_sizes:
- if size.meets_constraints(**wants):
- return size
+ if (size.meets_constraints(**wants) and
+ (specified_size is None or size.id == specified_size)):
+ return size
return None
def servers_for_queue(self, queue):
cloud_size = self.cloud_size_for_constraints(constraints)
if cloud_size is None:
unsatisfiable_jobs[job['uuid']] = (
- 'Requirements for a single node exceed the available '
- 'cloud node size')
+ "Constraints cannot be satisfied by any node type")
elif (want_count > self.max_nodes):
unsatisfiable_jobs[job['uuid']] = (
"Job's min_nodes constraint is greater than the configured "
queuelist = []
if self.slurm_queue:
# cpus, memory, tempory disk space, reason, job name
- squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+ squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f"])
for out in squeue_out.splitlines():
try:
- cpu, ram, disk, reason, jobname = out.split("|", 4)
- if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
- queuelist.append({
- "uuid": jobname,
- "runtime_constraints": {
- "min_cores_per_node": cpu,
- "min_ram_mb_per_node": self.coerce_to_mb(ram),
- "min_scratch_mb_per_node": self.coerce_to_mb(disk)
- }
- })
+ cpu, ram, disk, reason, jobname, features = out.split("|", 5)
except ValueError:
- pass
+ self._logger.warning("ignored malformed line in squeue output: %r", out)
+ continue
+ if '-dz642-' not in jobname:
+ continue
+ if not re.search(r'ReqNodeNotAvail|Resources|Priority', reason):
+ continue
+
+ for feature in features.split(','):
+ m = re.match(r'instancetype=(.*)', feature)
+ if not m:
+ continue
+ instance_type = m.group(1)
+ # Ignore cpu/ram/scratch requirements, bring up
+ # the requested node type.
+ queuelist.append({
+ "uuid": jobname,
+ "runtime_constraints": {
+ "instance_type": instance_type,
+ }
+ })
+ break
+ else:
+ # No instance type specified. Choose a node type
+ # to suit cpu/ram/scratch requirements.
+ queuelist.append({
+ "uuid": jobname,
+ "runtime_constraints": {
+ "min_cores_per_node": cpu,
+ "min_ram_mb_per_node": self.coerce_to_mb(ram),
+ "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+ }
+ })
if self.jobs_queue:
queuelist.extend(self._client.jobs().queue().execute()['items'])
class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
"""Actor to poll the Arvados node list.
- This actor regularly polls the list of Arvados node records, and
- sends it to subscribers.
+ This actor regularly polls the list of Arvados node records,
+ augments it with the latest SLURM node info (`sinfo`), and sends
+ it to subscribers.
"""
def is_common_error(self, exception):
nodelist = arvados.util.list_all(self._client.nodes().list)
# node hostname, state
- sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n %t"])
+ sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n|%t|%f"])
nodestates = {}
+ nodefeatures = {}
for out in sinfo_out.splitlines():
try:
- nodename, state = out.split(" ", 2)
- if state in ('alloc', 'alloc*',
- 'comp', 'comp*',
- 'mix', 'mix*',
- 'drng', 'drng*'):
- nodestates[nodename] = 'busy'
- elif state in ('idle', 'fail'):
- nodestates[nodename] = state
- else:
- nodestates[nodename] = 'down'
+ nodename, state, features = out.split("|", 3)
except ValueError:
- pass
+ continue
+ if state in ('alloc', 'alloc*',
+ 'comp', 'comp*',
+ 'mix', 'mix*',
+ 'drng', 'drng*'):
+ nodestates[nodename] = 'busy'
+ elif state in ('idle', 'fail'):
+ nodestates[nodename] = state
+ else:
+ nodestates[nodename] = 'down'
+ if features != "(null)":
+ nodefeatures[nodename] = features
for n in nodelist:
if n["slot_number"] and n["hostname"] and n["hostname"] in nodestates:
n["crunch_worker_state"] = nodestates[n["hostname"]]
else:
n["crunch_worker_state"] = 'down'
+ n["slurm_node_features"] = nodefeatures.get(n["hostname"], "")
return nodelist
import arvados
import StringIO
+formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+
+handler = logging.StreamHandler(sys.stderr)
+handler.setFormatter(formatter)
logger = logging.getLogger("logger")
logger.setLevel(logging.INFO)
-logger.addHandler(logging.StreamHandler(sys.stderr))
+logger.addHandler(handler)
detail = logging.getLogger("detail")
detail.setLevel(logging.INFO)
detail_content = sys.stderr
else:
detail_content = StringIO.StringIO()
-detail.addHandler(logging.StreamHandler(detail_content))
+handler = logging.StreamHandler(detail_content)
+handler.setFormatter(formatter)
+detail.addHandler(handler)
fake_slurm = None
compute_nodes = None
def set_squeue(g):
global all_jobs
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
- "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+ "\n".join("echo '1|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
return 0
def set_queue_unsatisfiable(g):
global all_jobs, unsatisfiable_job_scancelled
# Simulate a job requesting a 99 core node.
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
- "\n".join("echo '99|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+ "\n".join("echo '99|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
"\ntouch %s" % unsatisfiable_job_scancelled)
return 0
['event_type', '=', 'stderr'],
]).execute()['items'][0]
if not re.match(
- r"Requirements for a single node exceed the available cloud node size",
+ r"Constraints cannot be satisfied",
log_entry['properties']['text']):
return 1
return 0
compute_nodes[g.group(1)] = g.group(3)
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+ "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
for k,v in all_jobs.items():
if v == "ReqNodeNotAvail":
def remaining_jobs(g):
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+ "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
for k,v in all_jobs.items():
all_jobs[k] = "Running"
def node_busy(g):
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s idle'" % (v) for k,v in compute_nodes.items()))
+ "\n".join("echo '%s|idle|(null)'" % (v) for k,v in compute_nodes.items()))
return 0
def node_shutdown(g):
from . import testutil
class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+ ACTOR_CLASS = dispatch.ComputeNodeSetupActor
+
def make_mocks(self, arvados_effect=None):
if arvados_effect is None:
- arvados_effect = [testutil.arvados_node_mock()]
+ arvados_effect = [testutil.arvados_node_mock(
+ slot_number=None,
+ hostname=None,
+ first_ping_at=None,
+ last_ping_at=None,
+ )]
self.arvados_effect = arvados_effect
self.timer = testutil.MockTimer()
self.api_client = mock.MagicMock(name='api_client')
def make_actor(self, arv_node=None):
if not hasattr(self, 'timer'):
self.make_mocks(arvados_effect=[arv_node] if arv_node else None)
- self.setup_actor = dispatch.ComputeNodeSetupActor.start(
+ self.setup_actor = self.ACTOR_CLASS.start(
self.timer, self.api_client, self.cloud_client,
testutil.MockSize(1), arv_node).proxy()
self.assertEqual(self.arvados_effect[-1],
self.setup_actor.arvados_node.get(self.TIMEOUT))
assert(finished.wait(self.TIMEOUT))
+ self.api_client.nodes().create.called_with(body={}, assign_slot=True)
self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
self.assert_node_properties_updated()
self.setup_actor.arvados_node.get(self.TIMEOUT))
assert(finished.wait(self.TIMEOUT))
self.assert_node_properties_updated()
- self.assertEqual(2, self.api_client.nodes().update().execute.call_count)
+ self.api_client.nodes().create.called_with(body={}, assign_slot=True)
+ self.assertEqual(3, self.api_client.nodes().update().execute.call_count)
self.assertEqual(self.cloud_client.create_node(),
self.setup_actor.cloud_node.get(self.TIMEOUT))
start_time = time.time()
monitor_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_node, start_time, self.shutdowns,
- testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+ self.timer, self.updates, self.cloud_client,
self.arvados_node)
self.shutdown_actor = self.ACTOR_CLASS.start(
self.timer, self.cloud_client, self.arvados_client, monitor_actor,
start_time = time.time()
self.node_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_mock, start_time, self.shutdowns,
- testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+ self.timer, self.updates, self.cloud_client,
arv_node, boot_fail_after=300).proxy()
self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
self.assertEqual(testutil.ip_address_mock(4),
current_arvados['ip_address'])
- def test_update_arvados_node_syncs_when_fqdn_mismatch(self):
+ def test_update_arvados_node_calls_sync_node(self):
self.make_mocks(5)
self.cloud_mock.extra['testname'] = 'cloudfqdn.zzzzz.arvadosapi.com'
self.make_actor()
arv_node = testutil.arvados_node_mock(5)
self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
self.assertEqual(1, self.updates.sync_node.call_count)
-
- def test_update_arvados_node_skips_sync_when_fqdn_match(self):
- self.make_mocks(6)
- arv_node = testutil.arvados_node_mock(6)
- self.cloud_mock.extra['testname'] ='{n[hostname]}.{n[domain]}'.format(
- n=arv_node)
- self.make_actor()
- self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
- self.assertEqual(0, self.updates.sync_node.call_count)
import arvnodeman.computenode.dispatch.slurm as slurm_dispatch
from . import testutil
-from .test_computenode_dispatch import ComputeNodeShutdownActorMixin, ComputeNodeUpdateActorTestCase
+from .test_computenode_dispatch import \
+ ComputeNodeShutdownActorMixin, \
+ ComputeNodeSetupActorTestCase, \
+ ComputeNodeUpdateActorTestCase
@mock.patch('subprocess.check_output')
class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
cloud_node = testutil.cloud_node_mock()
arv_node = testutil.arvados_node_mock()
self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
- check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000'])
+ check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000', 'Features=instancetype=z99.test'])
+
+class SLURMComputeNodeSetupActorTestCase(ComputeNodeSetupActorTestCase):
+ ACTOR_CLASS = slurm_dispatch.ComputeNodeSetupActor
+
+ @mock.patch('subprocess.check_output')
+ def test_update_node_features(self, check_output):
+ # `scontrol update` happens only if the Arvados node record
+ # has a hostname. ComputeNodeSetupActorTestCase.make_mocks
+ # uses mocks with scrubbed hostnames, so we override with the
+ # default testutil.arvados_node_mock.
+ self.make_mocks(arvados_effect=[testutil.arvados_node_mock()])
+ self.make_actor()
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=1000', 'Features=instancetype=z1.test'])
# patches that up in listings.
size = testutil.MockSize(2)
node = testutil.cloud_node_mock(size=size)
- node.size = size.name
+ node.size = size.id
self.driver_mock().list_sizes.return_value = [size]
self.driver_mock().list_nodes.return_value = [node]
driver = self.new_driver()
unittest.TestCase):
def busywait(self, f):
- n = 0
- while not f() and n < 200:
+ for n in xrange(200):
+ ok = f()
+ if ok:
+ return
time.sleep(.1)
self.daemon.ping().get(self.TIMEOUT)
- n += 1
- self.assertTrue(f())
+ self.assertTrue(ok) # always falsy, but not necessarily False
def mock_node_start(self, **kwargs):
# Make sure that every time the daemon starts a setup actor,
def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
- mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
+ mock_squeue.return_value = "1|1024|0|(Resources)|" + container_uuid + "|\n"
self.build_monitor([{'items': [{'uuid': job_uuid}]}],
self.MockCalculatorUnsatisfiableJobs(), True, True)
@mock.patch("subprocess.check_output")
def test_squeue_server_list(self, mock_squeue):
- mock_squeue.return_value = """1|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-2|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+ mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
+2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@mock.patch("subprocess.check_output")
def test_squeue_server_list_suffix(self, mock_squeue):
- mock_squeue.return_value = """1|1024M|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-1|2G|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+ mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
+1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
self.subscriber.assert_called_with([testutil.MockSize(1),
testutil.MockSize(2)])
+ @mock.patch("subprocess.check_output")
+ def test_squeue_server_list_instancetype_constraint(self, mock_squeue):
+ mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|instancetype=z2.test\n"""
+ super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
+ [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
+ True, True)
+ self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
+ self.subscriber.assert_called_with([testutil.MockSize(2)])
+
def test_coerce_to_mb(self):
self.assertEqual(1, jobqueue.JobQueueMonitorActor.coerce_to_mb("1"))
self.assertEqual(512, jobqueue.JobQueueMonitorActor.coerce_to_mb("512"))
@mock.patch("subprocess.check_output")
def test_update_from_sinfo(self, sinfo_mock):
- sinfo_mock.return_value = "compute99 alloc"
- node = testutil.arvados_node_mock()
+ sinfo_mock.return_value = """compute1|idle|instancetype=a1.test
+compute2|alloc|(null)
+notarvados12345|idle|(null)
+"""
+ nodeIdle = testutil.arvados_node_mock(node_num=1)
+ nodeBusy = testutil.arvados_node_mock(node_num=2)
+ nodeMissing = testutil.arvados_node_mock(node_num=99)
self.build_monitor([{
- 'items': [node],
+ 'items': [nodeIdle, nodeBusy, nodeMissing],
'items_available': 1,
'offset': 0
}, {
'items_available': 1,
'offset': 1
}])
- self.monitor.subscribe_to(node['uuid'],
+ self.monitor.subscribe_to(nodeMissing['uuid'],
self.subscriber).get(self.TIMEOUT)
self.stop_proxy(self.monitor)
- self.subscriber.assert_called_with(node)
- self.assertEqual("busy", node["crunch_worker_state"])
+ self.subscriber.assert_called_with(nodeMissing)
+
+ self.assertEqual("idle", nodeIdle["crunch_worker_state"])
+ self.assertEqual("busy", nodeBusy["crunch_worker_state"])
+ self.assertEqual("down", nodeMissing["crunch_worker_state"])
+
+ self.assertEqual("instancetype=a1.test", nodeIdle["slurm_node_features"])
+ self.assertEqual("", nodeBusy["slurm_node_features"])
+ self.assertEqual("", nodeMissing["slurm_node_features"])
class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
def cloud_node_fqdn(node):
# We intentionally put the FQDN somewhere goofy to make sure tested code is
# using this function for lookups.
- return node.extra.get('testname', 'NoTestName')
+ return node.extra.get('testname', node.name+'.NoTestName.invalid')
def ip_address_mock(last_octet):
return '10.20.30.{}'.format(last_octet)
class MockSize(object):
def __init__(self, factor):
self.id = 'z{}.test'.format(factor)
- self.name = self.id
+ self.name = 'test size '+self.id
self.ram = 128 * factor
self.disk = factor # GB
self.scratch = 1000 * factor # MB
run_bundler --without=development
cd /usr/src/arvados/sdk/R
-R --quiet --vanilla <<EOF
-options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
-if (!requireNamespace("devtools")) {
- install.packages("devtools")
-}
-if (!requireNamespace("roxygen2")) {
- install.packages("roxygen2")
-}
-if (!requireNamespace("pkgdown")) {
- devtools::install_github("hadley/pkgdown")
-}
-devtools::install_dev_deps()
-EOF
+R --quiet --vanilla --file=install_deps.R
if test "$1" = "--only-deps" ; then
exit
return fmt.Errorf("error searching for parent group: %s", err)
}
if len(gl.Items) == 0 {
- // Default parent group not existant, create one.
+ // Default parent group does not exist, create it.
if cfg.Verbose {
log.Println("Default parent group not found, creating...")
}
"revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
"revisionTime": "2017-07-27T13:52:37Z"
},
+ {
+ "checksumSHA1": "xHZe/h/tyrqmS9qiR03bLfRv5FI=",
+ "path": "github.com/Azure/azure-sdk-for-go/storage",
+ "revision": "f8eeb65a1a1f969696b49aada9d24073f2c2acd1",
+ "revisionTime": "2018-02-15T19:19:13Z"
+ },
+ {
+ "checksumSHA1": "PfyfOXsPbGEWmdh54cguqzdwloY=",
+ "path": "github.com/Azure/azure-sdk-for-go/version",
+ "revision": "471256ff7c6c93b96131845cef5309d20edd313d",
+ "revisionTime": "2018-02-14T01:17:07Z"
+ },
+ {
+ "checksumSHA1": "LQWU/2M2E4L/hVzT9BVW1SkLrpA=",
+ "path": "github.com/Azure/go-autorest/autorest",
+ "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+ "revisionTime": "2017-11-30T17:00:06Z"
+ },
+ {
+ "checksumSHA1": "nBQ7cdhoeYUur6G6HG97uueoDmE=",
+ "path": "github.com/Azure/go-autorest/autorest/adal",
+ "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+ "revisionTime": "2017-11-30T17:00:06Z"
+ },
+ {
+ "checksumSHA1": "zXyLmDVpkYkIsL0yinNLoW82IZc=",
+ "path": "github.com/Azure/go-autorest/autorest/azure",
+ "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+ "revisionTime": "2017-11-30T17:00:06Z"
+ },
+ {
+ "checksumSHA1": "9nXCi9qQsYjxCeajJKWttxgEt0I=",
+ "path": "github.com/Azure/go-autorest/autorest/date",
+ "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+ "revisionTime": "2017-11-30T17:00:06Z"
+ },
{
"checksumSHA1": "o/3cn04KAiwC7NqNVvmfVTD+hgA=",
"path": "github.com/Microsoft/go-winio",
"revision": "d682213848ed68c0a260ca37d6dd5ace8423f5ba",
"revisionTime": "2017-12-05T20:32:29Z"
},
+ {
+ "checksumSHA1": "spyv5/YFBjYyZLZa1U2LBfDR8PM=",
+ "path": "github.com/beorn7/perks/quantile",
+ "revision": "4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9",
+ "revisionTime": "2016-08-04T10:47:26Z"
+ },
{
"checksumSHA1": "+Zz+leZHHC9C0rx8DoRuffSRPso=",
"path": "github.com/coreos/go-systemd/daemon",
"revisionTime": "2018-01-08T08:51:32Z"
},
{
- "checksumSHA1": "pAu+do4x7E5SFLfIqJeGwhcOd6E=",
- "path": "github.com/curoverse/azure-sdk-for-go/storage",
- "revision": "1620af6b32398bfc91827ceae54a8cc1f55df04d",
- "revisionTime": "2016-12-14T20:08:43Z"
+ "checksumSHA1": "+TKtBzv23ywvmmqRiGEjUba4YmI=",
+ "path": "github.com/dgrijalva/jwt-go",
+ "revision": "dbeaa9332f19a944acb5736b4456cfcc02140e29",
+ "revisionTime": "2017-10-19T21:57:19Z"
},
{
"checksumSHA1": "Gj+xR1VgFKKmFXYOJMnAczC3Znk=",
"revision": "160de10b2537169b5ae3e7e221d28269ef40d311",
"revisionTime": "2018-01-04T10:21:28Z"
},
+ {
+ "checksumSHA1": "iVfdaLxIDjfk2KLP8dCMIbsxZZM=",
+ "path": "github.com/golang/protobuf/jsonpb",
+ "revision": "1e59b77b52bf8e4b449a57e6f79f21226d571845",
+ "revisionTime": "2017-11-13T18:07:20Z"
+ },
+ {
+ "checksumSHA1": "yqF125xVSkmfLpIVGrLlfE05IUk=",
+ "path": "github.com/golang/protobuf/proto",
+ "revision": "1e59b77b52bf8e4b449a57e6f79f21226d571845",
+ "revisionTime": "2017-11-13T18:07:20Z"
+ },
+ {
+ "checksumSHA1": "Ylq6kq3KWBy6mu68oyEwenhNMdg=",
+ "path": "github.com/golang/protobuf/ptypes/struct",
+ "revision": "1e59b77b52bf8e4b449a57e6f79f21226d571845",
+ "revisionTime": "2017-11-13T18:07:20Z"
+ },
{
"checksumSHA1": "iIUYZyoanCQQTUaWsu8b+iOSPt4=",
"origin": "github.com/docker/docker/vendor/github.com/gorilla/context",
"revision": "83612a56d3dd153a94a629cd64925371c9adad78",
"revisionTime": "2017-11-26T05:04:59Z"
},
+ {
+ "checksumSHA1": "T9E+5mKBQ/BX4wlNxgaPfetxdeI=",
+ "path": "github.com/marstr/guid",
+ "revision": "8bdf7d1a087ccc975cf37dd6507da50698fd19ca",
+ "revisionTime": "2017-04-27T23:51:15Z"
+ },
+ {
+ "checksumSHA1": "bKMZjd2wPw13VwoE7mBeSv5djFA=",
+ "path": "github.com/matttproud/golang_protobuf_extensions/pbutil",
+ "revision": "c12348ce28de40eed0136aa2b644d0ee0650e56c",
+ "revisionTime": "2016-04-24T11:30:07Z"
+ },
{
"checksumSHA1": "V/quM7+em2ByJbWBLOsEwnY3j/Q=",
"path": "github.com/mitchellh/go-homedir",
"revision": "e881fd58d78e04cf6d0de1217f8707c8cc2249bc",
"revisionTime": "2017-12-16T07:03:16Z"
},
+ {
+ "checksumSHA1": "Ajt29IHVbX99PUvzn8Gc/lMCXBY=",
+ "path": "github.com/prometheus/client_golang/prometheus",
+ "revision": "9bb6ab929dcbe1c8393cd9ef70387cb69811bd1c",
+ "revisionTime": "2018-02-03T14:28:15Z"
+ },
+ {
+ "checksumSHA1": "c3Ui7nnLiJ4CAGWZ8dGuEgqHd8s=",
+ "path": "github.com/prometheus/client_golang/prometheus/promhttp",
+ "revision": "9bb6ab929dcbe1c8393cd9ef70387cb69811bd1c",
+ "revisionTime": "2018-02-03T14:28:15Z"
+ },
+ {
+ "checksumSHA1": "DvwvOlPNAgRntBzt3b3OSRMS2N4=",
+ "path": "github.com/prometheus/client_model/go",
+ "revision": "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c",
+ "revisionTime": "2017-11-17T10:05:41Z"
+ },
+ {
+ "checksumSHA1": "xfnn0THnqNwjwimeTClsxahYrIo=",
+ "path": "github.com/prometheus/common/expfmt",
+ "revision": "89604d197083d4781071d3c65855d24ecfb0a563",
+ "revisionTime": "2018-01-10T21:49:58Z"
+ },
+ {
+ "checksumSHA1": "GWlM3d2vPYyNATtTFgftS10/A9w=",
+ "path": "github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg",
+ "revision": "89604d197083d4781071d3c65855d24ecfb0a563",
+ "revisionTime": "2018-01-10T21:49:58Z"
+ },
+ {
+ "checksumSHA1": "YU+/K48IMawQnToO4ETE6a+hhj4=",
+ "path": "github.com/prometheus/common/model",
+ "revision": "89604d197083d4781071d3c65855d24ecfb0a563",
+ "revisionTime": "2018-01-10T21:49:58Z"
+ },
+ {
+ "checksumSHA1": "lolK0h7LSVERIX8zLyVQ/+7wEyA=",
+ "path": "github.com/prometheus/procfs",
+ "revision": "cb4147076ac75738c9a7d279075a253c0cc5acbd",
+ "revisionTime": "2018-01-25T13:30:57Z"
+ },
+ {
+ "checksumSHA1": "lv9rIcjbVEGo8AT1UCUZXhXrfQc=",
+ "path": "github.com/prometheus/procfs/internal/util",
+ "revision": "cb4147076ac75738c9a7d279075a253c0cc5acbd",
+ "revisionTime": "2018-01-25T13:30:57Z"
+ },
+ {
+ "checksumSHA1": "BXJH5h2ri8SU5qC6kkDvTIGCky4=",
+ "path": "github.com/prometheus/procfs/nfs",
+ "revision": "cb4147076ac75738c9a7d279075a253c0cc5acbd",
+ "revisionTime": "2018-01-25T13:30:57Z"
+ },
+ {
+ "checksumSHA1": "yItvTQLUVqm/ArLEbvEhqG0T5a0=",
+ "path": "github.com/prometheus/procfs/xfs",
+ "revision": "cb4147076ac75738c9a7d279075a253c0cc5acbd",
+ "revisionTime": "2018-01-25T13:30:57Z"
+ },
+ {
+ "checksumSHA1": "eDQ6f1EsNf+frcRO/9XukSEchm8=",
+ "path": "github.com/satori/go.uuid",
+ "revision": "36e9d2ebbde5e3f13ab2e25625fd453271d6522e",
+ "revisionTime": "2018-01-03T17:44:51Z"
+ },
{
"checksumSHA1": "UwtyqB7CaUWPlw0DVJQvw0IFQZs=",
"path": "github.com/sergi/go-diff/diffmatchpatch",