15003: Move keys from RequestLimits to API section.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 23 Apr 2019 13:44:54 +0000 (09:44 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 23 Apr 2019 17:30:07 +0000 (13:30 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

13 files changed:
lib/config/cmd_test.go
lib/config/config.default.yml
lib/config/generated_config.go
lib/config/load.go
lib/config/load_test.go
lib/controller/fed_collections.go
lib/controller/fed_generic.go
lib/controller/federation_test.go
lib/controller/semaphore.go [new file with mode: 0644]
sdk/go/arvados/config.go
sdk/go/arvados/postgresql.go
services/api/config/arvados_config.rb
vendor/vendor.json

index 0a60c25b578fbf1add7c8b3b44886807cc7e2721..39dcb4fe6bbc149ab9397e1ca6b2a6e48e81cb63 100644 (file)
@@ -28,6 +28,19 @@ func (s *CommandSuite) TestEmptyInput(c *check.C) {
        c.Check(stderr.String(), check.Matches, `config does not define any clusters\n`)
 }
 
+func (s *CommandSuite) TestLogDeprecatedKeys(c *check.C) {
+       var stdout, stderr bytes.Buffer
+       in := `
+Clusters:
+ z1234:
+  RequestLimits:
+    MaxItemsPerResponse: 1234
+`
+       code := DumpCommand.RunCommand("arvados dump-config", nil, bytes.NewBufferString(in), &stdout, &stderr)
+       c.Check(code, check.Equals, 0)
+       c.Check(stderr.String(), check.Matches, `(?ms).*overriding Clusters.z1234.API.MaxItemsPerResponse .* = 1234.*`)
+}
+
 func (s *CommandSuite) TestUnknownKey(c *check.C) {
        var stdout, stderr bytes.Buffer
        in := `
@@ -38,6 +51,7 @@ Clusters:
 `
        code := DumpCommand.RunCommand("arvados dump-config", nil, bytes.NewBufferString(in), &stdout, &stderr)
        c.Check(code, check.Equals, 0)
+       c.Check(stderr.String(), check.Equals, "")
        c.Check(stdout.String(), check.Matches, `(?ms)Clusters:\n  z1234:\n.*`)
        c.Check(stdout.String(), check.Matches, `(?ms).*\n *ManagementToken: secret\n.*`)
        c.Check(stdout.String(), check.Not(check.Matches), `(?ms).*UnknownKey.*`)
index bea6387532f0d5ac476b82858ac46171162a0e06..c767f76132e8b10e168a36035c693576f0fae728 100644 (file)
@@ -70,7 +70,7 @@ Clusters:
         # All parameters here are passed to the PG client library in a connection string;
         # see https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS
         Host: ""
-        Port: 0
+        Port: ""
         User: ""
         Password: ""
         DBName: ""
@@ -109,6 +109,10 @@ Clusters:
       # update on the permission view in the future, if not already scheduled.
       AsyncPermissionsUpdateInterval: 20
 
+      # Maximum number of concurrent outgoing requests to make while
+      # serving a single incoming multi-cluster (federated) request.
+      MaxRequestAmplification: 4
+
       # RailsSessionSecretToken is a string of alphanumeric characters
       # used by Rails to sign session tokens. IMPORTANT: This is a
       # site secret. It should be at least 50 characters.
index a24a9055fda39a6788a97651ce56a848a0d02c4a..3c16e89558b865757066cc382d763ca51866b09d 100644 (file)
@@ -76,7 +76,7 @@ Clusters:
         # All parameters here are passed to the PG client library in a connection string;
         # see https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS
         Host: ""
-        Port: 0
+        Port: ""
         User: ""
         Password: ""
         DBName: ""
@@ -115,6 +115,15 @@ Clusters:
       # update on the permission view in the future, if not already scheduled.
       AsyncPermissionsUpdateInterval: 20
 
+      # Maximum number of concurrent outgoing requests to make while
+      # serving a single incoming multi-cluster (federated) request.
+      MaxRequestAmplification: 4
+
+      # RailsSessionSecretToken is a string of alphanumeric characters
+      # used by Rails to sign session tokens. IMPORTANT: This is a
+      # site secret. It should be at least 50 characters.
+      RailsSessionSecretToken: ""
+
     Users:
       # Config parameters to automatically setup new users.  If enabled,
       # this users will be able to self-activate.  Enable this if you want
index 159dd65dc0d021c0d482589efa09021c1043dbc2..5a690b03c2280bcc46039c16197800f5a088afa9 100644 (file)
@@ -6,6 +6,7 @@ package config
 
 import (
        "bytes"
+       "encoding/json"
        "errors"
        "fmt"
        "io"
@@ -15,16 +16,25 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/ghodss/yaml"
+       "github.com/imdario/mergo"
 )
 
 type logger interface {
        Warnf(string, ...interface{})
 }
 
+type deprRequestLimits struct {
+       MaxItemsPerResponse            *int
+       MultiClusterRequestConcurrency *int
+}
+
+type deprCluster struct {
+       RequestLimits deprRequestLimits
+       NodeProfiles  map[string]arvados.NodeProfile
+}
+
 type deprecatedConfig struct {
-       Clusters map[string]struct {
-               NodeProfiles map[string]arvados.NodeProfile
-       }
+       Clusters map[string]deprCluster
 }
 
 func LoadFile(path string, log logger) (*arvados.Config, error) {
@@ -57,18 +67,41 @@ func Load(rdr io.Reader, log logger) (*arvados.Config, error) {
        if len(dummy.Clusters) == 0 {
                return nil, errors.New("config does not define any clusters")
        }
+
+       // We can't merge deep structs here; instead, we unmarshal the
+       // default & loaded config files into generic maps, merge
+       // those, and then json-encode+decode the result into the
+       // config struct type.
+       var merged map[string]interface{}
        for id := range dummy.Clusters {
-               err = yaml.Unmarshal(bytes.Replace(DefaultYAML, []byte("xxxxx"), []byte(id), -1), &cfg)
+               var src map[string]interface{}
+               err = yaml.Unmarshal(bytes.Replace(DefaultYAML, []byte(" xxxxx:"), []byte(" "+id+":"), -1), &src)
                if err != nil {
                        return nil, fmt.Errorf("loading defaults for %s: %s", id, err)
                }
+               mergo.Merge(&merged, src, mergo.WithOverride)
        }
-       err = yaml.Unmarshal(buf, &cfg)
+       var src map[string]interface{}
+       err = yaml.Unmarshal(buf, &src)
        if err != nil {
-               return nil, err
+               return nil, fmt.Errorf("loading config data: %s", err)
+       }
+       mergo.Merge(&merged, src, mergo.WithOverride)
+
+       var errEnc error
+       pr, pw := io.Pipe()
+       go func() {
+               errEnc = json.NewEncoder(pw).Encode(merged)
+               pw.Close()
+       }()
+       err = json.NewDecoder(pr).Decode(&cfg)
+       if errEnc != nil {
+               err = errEnc
+       }
+       if err != nil {
+               return nil, fmt.Errorf("transcoding config data: %s", err)
        }
 
-       // Check for deprecated config values, and apply them to cfg.
        var dc deprecatedConfig
        err = yaml.Unmarshal(buf, &dc)
        if err != nil {
@@ -98,6 +131,14 @@ func applyDeprecatedConfig(cfg *arvados.Config, dc *deprecatedConfig, log logger
                                applyDeprecatedNodeProfile(hostname, np.DispatchCloud, &cluster.Services.DispatchCloud)
                        }
                }
+               if dst, n := &cluster.API.MaxItemsPerResponse, dcluster.RequestLimits.MaxItemsPerResponse; n != nil && *n != *dst {
+                       log.Warnf("overriding Clusters.%s.API.MaxItemsPerResponse with deprecated config RequestLimits.MultiClusterRequestConcurrency = %d", id, *n)
+                       *dst = *n
+               }
+               if dst, n := &cluster.API.MaxRequestAmplification, dcluster.RequestLimits.MultiClusterRequestConcurrency; n != nil && *n != *dst {
+                       log.Warnf("overriding Clusters.%s.API.MaxRequestAmplification with deprecated config RequestLimits.MultiClusterRequestConcurrency = %d", id, *n)
+                       *dst = *n
+               }
                cfg.Clusters[id] = cluster
        }
        return nil
index f00ce33fde101f569afb0b244ee284ff3fc6d878..277ff423a4f78607f5d018e64eeb240f9fa788ed 100644 (file)
@@ -38,6 +38,8 @@ func (s *LoadSuite) TestNoConfigs(c *check.C) {
        cc, err := cfg.GetCluster("z1111")
        c.Assert(err, check.IsNil)
        c.Check(cc.ClusterID, check.Equals, "z1111")
+       c.Check(cc.API.MaxRequestAmplification, check.Equals, 4)
+       c.Check(cc.API.MaxItemsPerResponse, check.Equals, 1000)
 }
 
 func (s *LoadSuite) TestMultipleClusters(c *check.C) {
@@ -91,14 +93,46 @@ Clusters:
 `)
 }
 
+func (s *LoadSuite) TestMovedKeys(c *check.C) {
+       s.checkEquivalent(c, `# config has old keys only
+Clusters:
+ zzzzz:
+  RequestLimits:
+   MultiClusterRequestConcurrency: 3
+   MaxItemsPerResponse: 999
+`, `
+Clusters:
+ zzzzz:
+  API:
+   MaxRequestAmplification: 3
+   MaxItemsPerResponse: 999
+`)
+       s.checkEquivalent(c, `# config has both old and new keys; old values win
+Clusters:
+ zzzzz:
+  RequestLimits:
+   MultiClusterRequestConcurrency: 0
+   MaxItemsPerResponse: 555
+  API:
+   MaxRequestAmplification: 3
+   MaxItemsPerResponse: 999
+`, `
+Clusters:
+ zzzzz:
+  API:
+   MaxRequestAmplification: 0
+   MaxItemsPerResponse: 555
+`)
+}
+
 func (s *LoadSuite) checkEquivalent(c *check.C, goty, expectedy string) {
        got, err := Load(bytes.NewBufferString(goty), ctxlog.TestLogger(c))
        c.Assert(err, check.IsNil)
        expected, err := Load(bytes.NewBufferString(expectedy), ctxlog.TestLogger(c))
        c.Assert(err, check.IsNil)
        if !c.Check(got, check.DeepEquals, expected) {
-               cmd := exec.Command("diff", "-u", "--label", "got", "--label", "expected", "/dev/fd/3", "/dev/fd/4")
-               for _, obj := range []interface{}{got, expected} {
+               cmd := exec.Command("diff", "-u", "--label", "expected", "--label", "got", "/dev/fd/3", "/dev/fd/4")
+               for _, obj := range []interface{}{expected, got} {
                        y, _ := yaml.Marshal(obj)
                        pr, pw, err := os.Pipe()
                        c.Assert(err, check.IsNil)
index ab49e39d12656c3f960e840f82c9f4974e59d32d..07daf2f90ef28b3199e856c93134aa5b6975fab3 100644 (file)
@@ -217,17 +217,15 @@ func fetchRemoteCollectionByPDH(
        // returned to the client.  When that happens, all
        // other outstanding requests are cancelled
        sharedContext, cancelFunc := context.WithCancel(req.Context())
+       defer cancelFunc()
+
        req = req.WithContext(sharedContext)
        wg := sync.WaitGroup{}
        pdh := m[1]
        success := make(chan *http.Response)
        errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
 
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-
-       defer cancelFunc()
+       acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
 
        for remoteID := range h.handler.Cluster.RemoteClusters {
                if remoteID == h.handler.Cluster.ClusterID {
@@ -238,9 +236,8 @@ func fetchRemoteCollectionByPDH(
                wg.Add(1)
                go func(remote string) {
                        defer wg.Done()
-                       // blocks until it can put a value into the
-                       // channel (which has a max queue capacity)
-                       sem <- true
+                       acquire()
+                       defer release()
                        select {
                        case <-sharedContext.Done():
                                return
@@ -278,7 +275,6 @@ func fetchRemoteCollectionByPDH(
                        case success <- newResponse:
                                wasSuccess = true
                        }
-                       <-sem
                }(remoteID)
        }
        go func() {
index 9c8b1614bcdcceaa4be70bcba15fa694e26940dc..fd2fbc226e4860f7ddeb591c555f1759f3fcb7ef 100644 (file)
@@ -175,9 +175,9 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
                return true
        }
-       if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
+       if max := h.handler.Cluster.API.MaxItemsPerResponse; expectCount > max {
                httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
-                       expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
+                       expectCount, max), http.StatusBadRequest)
                return true
        }
        if req.Form.Get("select") != "" {
@@ -203,10 +203,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 
        // Perform concurrent requests to each cluster
 
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-       defer close(sem)
+       acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
        wg := sync.WaitGroup{}
 
        req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
@@ -220,23 +217,20 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                        // Nothing to query
                        continue
                }
-
-               // blocks until it can put a value into the
-               // channel (which has a max queue capacity)
-               sem <- true
+               acquire()
                wg.Add(1)
                go func(k string, v []string) {
+                       defer release()
+                       defer wg.Done()
                        rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
                        mtx.Lock()
+                       defer mtx.Unlock()
                        if err == nil {
                                completeResponses = append(completeResponses, rp...)
                                kind = kn
                        } else {
                                errors = append(errors, err)
                        }
-                       mtx.Unlock()
-                       wg.Done()
-                       <-sem
                }(k, v)
        }
        wg.Wait()
index 62916acd2ac10be14d90d4e02e2703e77949e32b..c4aa33c15e724feb807b7ac35f3a9d0312a62770 100644 (file)
@@ -64,9 +64,9 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
                NodeProfiles: map[string]arvados.NodeProfile{
                        "*": nodeProfile,
                },
-               RequestLimits: arvados.RequestLimits{
-                       MaxItemsPerResponse:            1000,
-                       MultiClusterRequestConcurrency: 4,
+               API: arvados.API{
+                       MaxItemsPerResponse:     1000,
+                       MaxRequestAmplification: 4,
                },
        }, NodeProfile: &nodeProfile}
        s.testServer = newServerFromIntegrationTestEnv(c)
@@ -850,7 +850,7 @@ func (s *FederationSuite) TestListMultiRemoteContainersMissing(c *check.C) {
 }
 
 func (s *FederationSuite) TestListMultiRemoteContainerPageSizeError(c *check.C) {
-       s.testHandler.Cluster.RequestLimits.MaxItemsPerResponse = 1
+       s.testHandler.Cluster.API.MaxItemsPerResponse = 1
        req := httptest.NewRequest("GET", fmt.Sprintf("/arvados/v1/containers?count=none&filters=%s",
                url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`,
                        arvadostest.QueuedContainerUUID))),
diff --git a/lib/controller/semaphore.go b/lib/controller/semaphore.go
new file mode 100644 (file)
index 0000000..ff607bb
--- /dev/null
@@ -0,0 +1,14 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+func semaphore(max int) (acquire, release func()) {
+       if max > 0 {
+               ch := make(chan bool, max)
+               return func() { ch <- true }, func() { <-ch }
+       } else {
+               return func() {}, func() {}
+       }
+}
index 32763acd18ad833497dd208c8c17a84096633347..610a3d288de27acd00d9db540baea3e6516636e1 100644 (file)
@@ -51,9 +51,9 @@ func (sc *Config) GetCluster(clusterID string) (*Cluster, error) {
        }
 }
 
-type RequestLimits struct {
-       MaxItemsPerResponse            int
-       MultiClusterRequestConcurrency int
+type API struct {
+       MaxItemsPerResponse     int
+       MaxRequestAmplification int
 }
 
 type Cluster struct {
@@ -68,7 +68,7 @@ type Cluster struct {
        HTTPRequestTimeout Duration
        RemoteClusters     map[string]RemoteCluster
        PostgreSQL         PostgreSQL
-       RequestLimits      RequestLimits
+       API                API
        Logging            Logging
        TLS                TLS
 }
@@ -332,20 +332,6 @@ func (np *NodeProfile) ServicePorts() map[ServiceName]string {
        }
 }
 
-func (h RequestLimits) GetMultiClusterRequestConcurrency() int {
-       if h.MultiClusterRequestConcurrency == 0 {
-               return 4
-       }
-       return h.MultiClusterRequestConcurrency
-}
-
-func (h RequestLimits) GetMaxItemsPerResponse() int {
-       if h.MaxItemsPerResponse == 0 {
-               return 1000
-       }
-       return h.MaxItemsPerResponse
-}
-
 type SystemServiceInstance struct {
        Listen   string
        TLS      bool
index 47953ce9da7a10b795b980adbc04ae964d532926..1969441da1d0dc8767c1ca9acb5145a75b3613d0 100644 (file)
@@ -9,6 +9,9 @@ import "strings"
 func (c PostgreSQLConnection) String() string {
        s := ""
        for k, v := range c {
+               if v == "" {
+                       continue
+               }
                s += strings.ToLower(k)
                s += "='"
                s += strings.Replace(
index 669beb16e50e42e86fc9637594c264624faeae9a..cb76b68dc984e71155a0ebcc2bfaf7363a60d638 100644 (file)
@@ -172,7 +172,7 @@ dbcfg = ConfigLoader.new
 
 dbcfg.declare_config "PostgreSQL.ConnectionPool", Integer, :pool
 dbcfg.declare_config "PostgreSQL.Connection.Host", String, :host
-dbcfg.declare_config "PostgreSQL.Connection.Port", Integer, :port
+dbcfg.declare_config "PostgreSQL.Connection.Port", String, :port
 dbcfg.declare_config "PostgreSQL.Connection.User", String, :username
 dbcfg.declare_config "PostgreSQL.Connection.Password", String, :password
 dbcfg.declare_config "PostgreSQL.Connection.DBName", String, :database
index 5e2ed2e32e9863ff24bf20b263a9ba4218668d25..cfcba1b21888a867698980a3f9434133d02ed607 100644 (file)
                        "revision": "0a025b7e63adc15a622f29b0b2c4c3848243bbf6",
                        "revisionTime": "2016-08-13T22:13:03Z"
                },
+               {
+                       "checksumSHA1": "x7IEwuVYTztOJItr3jtePGyFDWA=",
+                       "path": "github.com/imdario/mergo",
+                       "revision": "5ef87b449ca75fbed1bc3765b749ca8f73f1fa69",
+                       "revisionTime": "2019-04-15T13:31:43Z"
+               },
                {
                        "checksumSHA1": "iCsyavJDnXC9OY//p52IWJWy7PY=",
                        "path": "github.com/jbenet/go-context/io",