c.Check(stderr.String(), check.Matches, `config does not define any clusters\n`)
}
+func (s *CommandSuite) TestLogDeprecatedKeys(c *check.C) {
+ var stdout, stderr bytes.Buffer
+ in := `
+Clusters:
+ z1234:
+ RequestLimits:
+ MaxItemsPerResponse: 1234
+`
+ code := DumpCommand.RunCommand("arvados dump-config", nil, bytes.NewBufferString(in), &stdout, &stderr)
+ c.Check(code, check.Equals, 0)
+ c.Check(stderr.String(), check.Matches, `(?ms).*overriding Clusters.z1234.API.MaxItemsPerResponse .* = 1234.*`)
+}
+
func (s *CommandSuite) TestUnknownKey(c *check.C) {
var stdout, stderr bytes.Buffer
in := `
`
code := DumpCommand.RunCommand("arvados dump-config", nil, bytes.NewBufferString(in), &stdout, &stderr)
c.Check(code, check.Equals, 0)
+ c.Check(stderr.String(), check.Equals, "")
c.Check(stdout.String(), check.Matches, `(?ms)Clusters:\n z1234:\n.*`)
c.Check(stdout.String(), check.Matches, `(?ms).*\n *ManagementToken: secret\n.*`)
c.Check(stdout.String(), check.Not(check.Matches), `(?ms).*UnknownKey.*`)
# All parameters here are passed to the PG client library in a connection string;
# see https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS
Host: ""
- Port: 0
+ Port: ""
User: ""
Password: ""
DBName: ""
# update on the permission view in the future, if not already scheduled.
AsyncPermissionsUpdateInterval: 20
+ # Maximum number of concurrent outgoing requests to make while
+ # serving a single incoming multi-cluster (federated) request.
+ MaxRequestAmplification: 4
+
# RailsSessionSecretToken is a string of alphanumeric characters
# used by Rails to sign session tokens. IMPORTANT: This is a
# site secret. It should be at least 50 characters.
# All parameters here are passed to the PG client library in a connection string;
# see https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS
Host: ""
- Port: 0
+ Port: ""
User: ""
Password: ""
DBName: ""
# update on the permission view in the future, if not already scheduled.
AsyncPermissionsUpdateInterval: 20
+ # Maximum number of concurrent outgoing requests to make while
+ # serving a single incoming multi-cluster (federated) request.
+ MaxRequestAmplification: 4
+
+ # RailsSessionSecretToken is a string of alphanumeric characters
+ # used by Rails to sign session tokens. IMPORTANT: This is a
+ # site secret. It should be at least 50 characters.
+ RailsSessionSecretToken: ""
+
Users:
# Config parameters to automatically setup new users. If enabled,
# this users will be able to self-activate. Enable this if you want
import (
"bytes"
+ "encoding/json"
"errors"
"fmt"
"io"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/ghodss/yaml"
+ "github.com/imdario/mergo"
)
type logger interface {
Warnf(string, ...interface{})
}
+type deprRequestLimits struct {
+ MaxItemsPerResponse *int
+ MultiClusterRequestConcurrency *int
+}
+
+type deprCluster struct {
+ RequestLimits deprRequestLimits
+ NodeProfiles map[string]arvados.NodeProfile
+}
+
type deprecatedConfig struct {
- Clusters map[string]struct {
- NodeProfiles map[string]arvados.NodeProfile
- }
+ Clusters map[string]deprCluster
}
func LoadFile(path string, log logger) (*arvados.Config, error) {
if len(dummy.Clusters) == 0 {
return nil, errors.New("config does not define any clusters")
}
+
+ // We can't merge deep structs here; instead, we unmarshal the
+ // default & loaded config files into generic maps, merge
+ // those, and then json-encode+decode the result into the
+ // config struct type.
+ var merged map[string]interface{}
for id := range dummy.Clusters {
- err = yaml.Unmarshal(bytes.Replace(DefaultYAML, []byte("xxxxx"), []byte(id), -1), &cfg)
+ var src map[string]interface{}
+ err = yaml.Unmarshal(bytes.Replace(DefaultYAML, []byte(" xxxxx:"), []byte(" "+id+":"), -1), &src)
if err != nil {
return nil, fmt.Errorf("loading defaults for %s: %s", id, err)
}
+ mergo.Merge(&merged, src, mergo.WithOverride)
}
- err = yaml.Unmarshal(buf, &cfg)
+ var src map[string]interface{}
+ err = yaml.Unmarshal(buf, &src)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("loading config data: %s", err)
+ }
+ mergo.Merge(&merged, src, mergo.WithOverride)
+
+ var errEnc error
+ pr, pw := io.Pipe()
+ go func() {
+ errEnc = json.NewEncoder(pw).Encode(merged)
+ pw.Close()
+ }()
+ err = json.NewDecoder(pr).Decode(&cfg)
+ if errEnc != nil {
+ err = errEnc
+ }
+ if err != nil {
+ return nil, fmt.Errorf("transcoding config data: %s", err)
}
- // Check for deprecated config values, and apply them to cfg.
var dc deprecatedConfig
err = yaml.Unmarshal(buf, &dc)
if err != nil {
applyDeprecatedNodeProfile(hostname, np.DispatchCloud, &cluster.Services.DispatchCloud)
}
}
+ if dst, n := &cluster.API.MaxItemsPerResponse, dcluster.RequestLimits.MaxItemsPerResponse; n != nil && *n != *dst {
+ log.Warnf("overriding Clusters.%s.API.MaxItemsPerResponse with deprecated config RequestLimits.MultiClusterRequestConcurrency = %d", id, *n)
+ *dst = *n
+ }
+ if dst, n := &cluster.API.MaxRequestAmplification, dcluster.RequestLimits.MultiClusterRequestConcurrency; n != nil && *n != *dst {
+ log.Warnf("overriding Clusters.%s.API.MaxRequestAmplification with deprecated config RequestLimits.MultiClusterRequestConcurrency = %d", id, *n)
+ *dst = *n
+ }
cfg.Clusters[id] = cluster
}
return nil
cc, err := cfg.GetCluster("z1111")
c.Assert(err, check.IsNil)
c.Check(cc.ClusterID, check.Equals, "z1111")
+ c.Check(cc.API.MaxRequestAmplification, check.Equals, 4)
+ c.Check(cc.API.MaxItemsPerResponse, check.Equals, 1000)
}
func (s *LoadSuite) TestMultipleClusters(c *check.C) {
`)
}
+func (s *LoadSuite) TestMovedKeys(c *check.C) {
+ s.checkEquivalent(c, `# config has old keys only
+Clusters:
+ zzzzz:
+ RequestLimits:
+ MultiClusterRequestConcurrency: 3
+ MaxItemsPerResponse: 999
+`, `
+Clusters:
+ zzzzz:
+ API:
+ MaxRequestAmplification: 3
+ MaxItemsPerResponse: 999
+`)
+ s.checkEquivalent(c, `# config has both old and new keys; old values win
+Clusters:
+ zzzzz:
+ RequestLimits:
+ MultiClusterRequestConcurrency: 0
+ MaxItemsPerResponse: 555
+ API:
+ MaxRequestAmplification: 3
+ MaxItemsPerResponse: 999
+`, `
+Clusters:
+ zzzzz:
+ API:
+ MaxRequestAmplification: 0
+ MaxItemsPerResponse: 555
+`)
+}
+
func (s *LoadSuite) checkEquivalent(c *check.C, goty, expectedy string) {
got, err := Load(bytes.NewBufferString(goty), ctxlog.TestLogger(c))
c.Assert(err, check.IsNil)
expected, err := Load(bytes.NewBufferString(expectedy), ctxlog.TestLogger(c))
c.Assert(err, check.IsNil)
if !c.Check(got, check.DeepEquals, expected) {
- cmd := exec.Command("diff", "-u", "--label", "got", "--label", "expected", "/dev/fd/3", "/dev/fd/4")
- for _, obj := range []interface{}{got, expected} {
+ cmd := exec.Command("diff", "-u", "--label", "expected", "--label", "got", "/dev/fd/3", "/dev/fd/4")
+ for _, obj := range []interface{}{expected, got} {
y, _ := yaml.Marshal(obj)
pr, pw, err := os.Pipe()
c.Assert(err, check.IsNil)
// returned to the client. When that happens, all
// other outstanding requests are cancelled
sharedContext, cancelFunc := context.WithCancel(req.Context())
+ defer cancelFunc()
+
req = req.WithContext(sharedContext)
wg := sync.WaitGroup{}
pdh := m[1]
success := make(chan *http.Response)
errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
- // use channel as a semaphore to limit the number of concurrent
- // requests at a time
- sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-
- defer cancelFunc()
+ acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
for remoteID := range h.handler.Cluster.RemoteClusters {
if remoteID == h.handler.Cluster.ClusterID {
wg.Add(1)
go func(remote string) {
defer wg.Done()
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
+ acquire()
+ defer release()
select {
case <-sharedContext.Done():
return
case success <- newResponse:
wasSuccess = true
}
- <-sem
}(remoteID)
}
go func() {
httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
return true
}
- if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
+ if max := h.handler.Cluster.API.MaxItemsPerResponse; expectCount > max {
httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
- expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
+ expectCount, max), http.StatusBadRequest)
return true
}
if req.Form.Get("select") != "" {
// Perform concurrent requests to each cluster
- // use channel as a semaphore to limit the number of concurrent
- // requests at a time
- sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
- defer close(sem)
+ acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
wg := sync.WaitGroup{}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// Nothing to query
continue
}
-
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
+ acquire()
wg.Add(1)
go func(k string, v []string) {
+ defer release()
+ defer wg.Done()
rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
mtx.Lock()
+ defer mtx.Unlock()
if err == nil {
completeResponses = append(completeResponses, rp...)
kind = kn
} else {
errors = append(errors, err)
}
- mtx.Unlock()
- wg.Done()
- <-sem
}(k, v)
}
wg.Wait()
NodeProfiles: map[string]arvados.NodeProfile{
"*": nodeProfile,
},
- RequestLimits: arvados.RequestLimits{
- MaxItemsPerResponse: 1000,
- MultiClusterRequestConcurrency: 4,
+ API: arvados.API{
+ MaxItemsPerResponse: 1000,
+ MaxRequestAmplification: 4,
},
}, NodeProfile: &nodeProfile}
s.testServer = newServerFromIntegrationTestEnv(c)
}
func (s *FederationSuite) TestListMultiRemoteContainerPageSizeError(c *check.C) {
- s.testHandler.Cluster.RequestLimits.MaxItemsPerResponse = 1
+ s.testHandler.Cluster.API.MaxItemsPerResponse = 1
req := httptest.NewRequest("GET", fmt.Sprintf("/arvados/v1/containers?count=none&filters=%s",
url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`,
arvadostest.QueuedContainerUUID))),
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+func semaphore(max int) (acquire, release func()) {
+ if max > 0 {
+ ch := make(chan bool, max)
+ return func() { ch <- true }, func() { <-ch }
+ } else {
+ return func() {}, func() {}
+ }
+}
}
}
-type RequestLimits struct {
- MaxItemsPerResponse int
- MultiClusterRequestConcurrency int
+type API struct {
+ MaxItemsPerResponse int
+ MaxRequestAmplification int
}
type Cluster struct {
HTTPRequestTimeout Duration
RemoteClusters map[string]RemoteCluster
PostgreSQL PostgreSQL
- RequestLimits RequestLimits
+ API API
Logging Logging
TLS TLS
}
}
}
-func (h RequestLimits) GetMultiClusterRequestConcurrency() int {
- if h.MultiClusterRequestConcurrency == 0 {
- return 4
- }
- return h.MultiClusterRequestConcurrency
-}
-
-func (h RequestLimits) GetMaxItemsPerResponse() int {
- if h.MaxItemsPerResponse == 0 {
- return 1000
- }
- return h.MaxItemsPerResponse
-}
-
type SystemServiceInstance struct {
Listen string
TLS bool
func (c PostgreSQLConnection) String() string {
s := ""
for k, v := range c {
+ if v == "" {
+ continue
+ }
s += strings.ToLower(k)
s += "='"
s += strings.Replace(
dbcfg.declare_config "PostgreSQL.ConnectionPool", Integer, :pool
dbcfg.declare_config "PostgreSQL.Connection.Host", String, :host
-dbcfg.declare_config "PostgreSQL.Connection.Port", Integer, :port
+dbcfg.declare_config "PostgreSQL.Connection.Port", String, :port
dbcfg.declare_config "PostgreSQL.Connection.User", String, :username
dbcfg.declare_config "PostgreSQL.Connection.Password", String, :password
dbcfg.declare_config "PostgreSQL.Connection.DBName", String, :database
"revision": "0a025b7e63adc15a622f29b0b2c4c3848243bbf6",
"revisionTime": "2016-08-13T22:13:03Z"
},
+ {
+ "checksumSHA1": "x7IEwuVYTztOJItr3jtePGyFDWA=",
+ "path": "github.com/imdario/mergo",
+ "revision": "5ef87b449ca75fbed1bc3765b749ca8f73f1fa69",
+ "revisionTime": "2019-04-15T13:31:43Z"
+ },
{
"checksumSHA1": "iCsyavJDnXC9OY//p52IWJWy7PY=",
"path": "github.com/jbenet/go-context/io",