17217: Update tests, remove unused code.
authorTom Clegg <tom@curii.com>
Thu, 26 Aug 2021 04:04:26 +0000 (00:04 -0400)
committerTom Clegg <tom@curii.com>
Thu, 26 Aug 2021 04:05:56 +0000 (00:05 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/controller/fed_collections.go [deleted file]
lib/controller/fed_containers.go [deleted file]
lib/controller/federation.go
lib/controller/federation_test.go
lib/controller/server_test.go

diff --git a/lib/controller/fed_collections.go b/lib/controller/fed_collections.go
deleted file mode 100644 (file)
index a0a1231..0000000
+++ /dev/null
@@ -1,312 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package controller
-
-import (
-       "bufio"
-       "bytes"
-       "context"
-       "crypto/md5"
-       "encoding/json"
-       "fmt"
-       "io"
-       "io/ioutil"
-       "net/http"
-       "strings"
-       "sync"
-
-       "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/httpserver"
-       "git.arvados.org/arvados.git/sdk/go/keepclient"
-)
-
-func rewriteSignatures(clusterID string, expectHash string,
-       resp *http.Response, requestError error) (newResponse *http.Response, err error) {
-
-       if requestError != nil {
-               return resp, requestError
-       }
-
-       if resp.StatusCode != http.StatusOK {
-               return resp, nil
-       }
-
-       originalBody := resp.Body
-       defer originalBody.Close()
-
-       var col arvados.Collection
-       err = json.NewDecoder(resp.Body).Decode(&col)
-       if err != nil {
-               return nil, err
-       }
-
-       // rewriting signatures will make manifest text 5-10% bigger so calculate
-       // capacity accordingly
-       updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
-
-       hasher := md5.New()
-       mw := io.MultiWriter(hasher, updatedManifest)
-       sz := 0
-
-       scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
-       scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
-       for scanner.Scan() {
-               line := scanner.Text()
-               tokens := strings.Split(line, " ")
-               if len(tokens) < 3 {
-                       return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
-               }
-
-               n, err := mw.Write([]byte(tokens[0]))
-               if err != nil {
-                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-               }
-               sz += n
-               for _, token := range tokens[1:] {
-                       n, err = mw.Write([]byte(" "))
-                       if err != nil {
-                               return nil, fmt.Errorf("Error updating manifest: %v", err)
-                       }
-                       sz += n
-
-                       m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
-                       if m != nil {
-                               // Rewrite the block signature to be a remote signature
-                               _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
-                               if err != nil {
-                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-                               }
-
-                               // for hash checking, ignore signatures
-                               n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
-                               if err != nil {
-                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-                               }
-                               sz += n
-                       } else {
-                               n, err = mw.Write([]byte(token))
-                               if err != nil {
-                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-                               }
-                               sz += n
-                       }
-               }
-               n, err = mw.Write([]byte("\n"))
-               if err != nil {
-                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-               }
-               sz += n
-       }
-
-       // Check that expected hash is consistent with
-       // portable_data_hash field of the returned record
-       if expectHash == "" {
-               expectHash = col.PortableDataHash
-       } else if expectHash != col.PortableDataHash {
-               return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
-       }
-
-       // Certify that the computed hash of the manifest_text matches our expectation
-       sum := hasher.Sum(nil)
-       computedHash := fmt.Sprintf("%x+%v", sum, sz)
-       if computedHash != expectHash {
-               return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
-       }
-
-       col.ManifestText = updatedManifest.String()
-
-       newbody, err := json.Marshal(col)
-       if err != nil {
-               return nil, err
-       }
-
-       buf := bytes.NewBuffer(newbody)
-       resp.Body = ioutil.NopCloser(buf)
-       resp.ContentLength = int64(buf.Len())
-       resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
-
-       return resp, nil
-}
-
-func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
-       if requestError != nil {
-               return resp, requestError
-       }
-
-       if resp.StatusCode == http.StatusNotFound {
-               // Suppress returning this result, because we want to
-               // search the federation.
-               return nil, nil
-       }
-       return resp, nil
-}
-
-type searchRemoteClusterForPDH struct {
-       pdh           string
-       remoteID      string
-       mtx           *sync.Mutex
-       sentResponse  *bool
-       sharedContext *context.Context
-       cancelFunc    func()
-       errors        *[]string
-       statusCode    *int
-}
-
-func fetchRemoteCollectionByUUID(
-       h *genericFederatedRequestHandler,
-       effectiveMethod string,
-       clusterID *string,
-       uuid string,
-       remainder string,
-       w http.ResponseWriter,
-       req *http.Request) bool {
-
-       if effectiveMethod != "GET" {
-               // Only handle GET requests right now
-               return false
-       }
-
-       if uuid != "" {
-               // Collection UUID GET request
-               *clusterID = uuid[0:5]
-               if *clusterID != "" && *clusterID != h.handler.Cluster.ClusterID {
-                       // request for remote collection by uuid
-                       resp, err := h.handler.remoteClusterRequest(*clusterID, req)
-                       newResponse, err := rewriteSignatures(*clusterID, "", resp, err)
-                       h.handler.proxy.ForwardResponse(w, newResponse, err)
-                       return true
-               }
-       }
-
-       return false
-}
-
-func fetchRemoteCollectionByPDH(
-       h *genericFederatedRequestHandler,
-       effectiveMethod string,
-       clusterID *string,
-       uuid string,
-       remainder string,
-       w http.ResponseWriter,
-       req *http.Request) bool {
-
-       if effectiveMethod != "GET" {
-               // Only handle GET requests right now
-               return false
-       }
-
-       m := collectionsByPDHRe.FindStringSubmatch(req.URL.Path)
-       if len(m) != 2 {
-               return false
-       }
-
-       // Request for collection by PDH.  Search the federation.
-
-       // First, query the local cluster.
-       resp, err := h.handler.localClusterRequest(req)
-       newResp, err := filterLocalClusterResponse(resp, err)
-       if newResp != nil || err != nil {
-               h.handler.proxy.ForwardResponse(w, newResp, err)
-               return true
-       }
-
-       // Create a goroutine for each cluster in the
-       // RemoteClusters map.  The first valid result gets
-       // returned to the client.  When that happens, all
-       // other outstanding requests are cancelled
-       sharedContext, cancelFunc := context.WithCancel(req.Context())
-       defer cancelFunc()
-
-       req = req.WithContext(sharedContext)
-       wg := sync.WaitGroup{}
-       pdh := m[1]
-       success := make(chan *http.Response)
-       errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
-
-       acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
-
-       for remoteID := range h.handler.Cluster.RemoteClusters {
-               if remoteID == h.handler.Cluster.ClusterID {
-                       // No need to query local cluster again
-                       continue
-               }
-               if remoteID == "*" {
-                       // This isn't a real remote cluster: it just sets defaults for unlisted remotes.
-                       continue
-               }
-
-               wg.Add(1)
-               go func(remote string) {
-                       defer wg.Done()
-                       acquire()
-                       defer release()
-                       select {
-                       case <-sharedContext.Done():
-                               return
-                       default:
-                       }
-
-                       resp, err := h.handler.remoteClusterRequest(remote, req)
-                       wasSuccess := false
-                       defer func() {
-                               if resp != nil && !wasSuccess {
-                                       resp.Body.Close()
-                               }
-                       }()
-                       if err != nil {
-                               errorChan <- err
-                               return
-                       }
-                       if resp.StatusCode != http.StatusOK {
-                               errorChan <- HTTPError{resp.Status, resp.StatusCode}
-                               return
-                       }
-                       select {
-                       case <-sharedContext.Done():
-                               return
-                       default:
-                       }
-
-                       newResponse, err := rewriteSignatures(remote, pdh, resp, nil)
-                       if err != nil {
-                               errorChan <- err
-                               return
-                       }
-                       select {
-                       case <-sharedContext.Done():
-                       case success <- newResponse:
-                               wasSuccess = true
-                       }
-               }(remoteID)
-       }
-       go func() {
-               wg.Wait()
-               cancelFunc()
-       }()
-
-       errorCode := http.StatusNotFound
-
-       for {
-               select {
-               case newResp = <-success:
-                       h.handler.proxy.ForwardResponse(w, newResp, nil)
-                       return true
-               case <-sharedContext.Done():
-                       var errors []string
-                       for len(errorChan) > 0 {
-                               err := <-errorChan
-                               if httperr, ok := err.(HTTPError); !ok || httperr.Code != http.StatusNotFound {
-                                       errorCode = http.StatusBadGateway
-                               }
-                               errors = append(errors, err.Error())
-                       }
-                       httpserver.Errors(w, errors, errorCode)
-                       return true
-               }
-       }
-
-       // shouldn't ever get here
-       return true
-}
diff --git a/lib/controller/fed_containers.go b/lib/controller/fed_containers.go
deleted file mode 100644 (file)
index fd4f052..0000000
+++ /dev/null
@@ -1,123 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package controller
-
-import (
-       "bytes"
-       "encoding/json"
-       "fmt"
-       "io/ioutil"
-       "net/http"
-       "strings"
-
-       "git.arvados.org/arvados.git/sdk/go/auth"
-       "git.arvados.org/arvados.git/sdk/go/httpserver"
-)
-
-func remoteContainerRequestCreate(
-       h *genericFederatedRequestHandler,
-       effectiveMethod string,
-       clusterID *string,
-       uuid string,
-       remainder string,
-       w http.ResponseWriter,
-       req *http.Request) bool {
-
-       if effectiveMethod != "POST" || uuid != "" || remainder != "" {
-               return false
-       }
-
-       // First make sure supplied token is valid.
-       creds := auth.NewCredentials()
-       creds.LoadTokensFromHTTPRequest(req)
-
-       currentUser, ok, err := h.handler.validateAPItoken(req, creds.Tokens[0])
-       if err != nil {
-               httpserver.Error(w, err.Error(), http.StatusInternalServerError)
-               return true
-       } else if !ok {
-               httpserver.Error(w, "invalid API token", http.StatusForbidden)
-               return true
-       }
-
-       if *clusterID == "" || *clusterID == h.handler.Cluster.ClusterID {
-               // Submitting container request to local cluster. No
-               // need to set a runtime_token (rails api will create
-               // one when the container runs) or do a remote cluster
-               // request.
-               return false
-       }
-
-       if req.Header.Get("Content-Type") != "application/json" {
-               httpserver.Error(w, "Expected Content-Type: application/json, got "+req.Header.Get("Content-Type"), http.StatusBadRequest)
-               return true
-       }
-
-       originalBody := req.Body
-       defer originalBody.Close()
-       var request map[string]interface{}
-       err = json.NewDecoder(req.Body).Decode(&request)
-       if err != nil {
-               httpserver.Error(w, err.Error(), http.StatusBadRequest)
-               return true
-       }
-
-       crString, ok := request["container_request"].(string)
-       if ok {
-               var crJSON map[string]interface{}
-               err := json.Unmarshal([]byte(crString), &crJSON)
-               if err != nil {
-                       httpserver.Error(w, err.Error(), http.StatusBadRequest)
-                       return true
-               }
-
-               request["container_request"] = crJSON
-       }
-
-       containerRequest, ok := request["container_request"].(map[string]interface{})
-       if !ok {
-               // Use toplevel object as the container_request object
-               containerRequest = request
-       }
-
-       // If runtime_token is not set, create a new token
-       if _, ok := containerRequest["runtime_token"]; !ok {
-               if len(currentUser.Authorization.Scopes) != 1 || currentUser.Authorization.Scopes[0] != "all" {
-                       httpserver.Error(w, "Token scope is not [all]", http.StatusForbidden)
-                       return true
-               }
-
-               if strings.HasPrefix(currentUser.Authorization.UUID, h.handler.Cluster.ClusterID) {
-                       // Local user, submitting to a remote cluster.
-                       // Create a new time-limited token.
-                       newtok, err := h.handler.createAPItoken(req, currentUser.UUID, nil)
-                       if err != nil {
-                               httpserver.Error(w, err.Error(), http.StatusForbidden)
-                               return true
-                       }
-                       containerRequest["runtime_token"] = newtok.TokenV2()
-               } else {
-                       // Remote user. Container request will use the
-                       // current token, minus the trailing portion
-                       // (optional container uuid).
-                       sp := strings.Split(creds.Tokens[0], "/")
-                       if len(sp) >= 3 {
-                               containerRequest["runtime_token"] = strings.Join(sp[0:3], "/")
-                       } else {
-                               containerRequest["runtime_token"] = creds.Tokens[0]
-                       }
-               }
-       }
-
-       newbody, err := json.Marshal(request)
-       buf := bytes.NewBuffer(newbody)
-       req.Body = ioutil.NopCloser(buf)
-       req.ContentLength = int64(buf.Len())
-       req.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
-
-       resp, err := h.handler.remoteClusterRequest(*clusterID, req)
-       h.handler.proxy.ForwardResponse(w, resp, err)
-       return true
-}
index 419d8b01049b21d97ccebe6c5f9ec394208e1336..144d41c21beb62213195d537d32bca8fa9650f99 100644 (file)
@@ -94,20 +94,12 @@ func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
 
        wfHandler := &genericFederatedRequestHandler{next, h, wfRe, nil}
        containersHandler := &genericFederatedRequestHandler{next, h, containersRe, nil}
 
        wfHandler := &genericFederatedRequestHandler{next, h, wfRe, nil}
        containersHandler := &genericFederatedRequestHandler{next, h, containersRe, nil}
-       containerRequestsHandler := &genericFederatedRequestHandler{next, h, containerRequestsRe,
-               []federatedRequestDelegate{remoteContainerRequestCreate}}
-       collectionsRequestsHandler := &genericFederatedRequestHandler{next, h, collectionsRe,
-               []federatedRequestDelegate{fetchRemoteCollectionByUUID, fetchRemoteCollectionByPDH}}
        linksRequestsHandler := &genericFederatedRequestHandler{next, h, linksRe, nil}
 
        mux.Handle("/arvados/v1/workflows", wfHandler)
        mux.Handle("/arvados/v1/workflows/", wfHandler)
        mux.Handle("/arvados/v1/containers", containersHandler)
        mux.Handle("/arvados/v1/containers/", containersHandler)
        linksRequestsHandler := &genericFederatedRequestHandler{next, h, linksRe, nil}
 
        mux.Handle("/arvados/v1/workflows", wfHandler)
        mux.Handle("/arvados/v1/workflows/", wfHandler)
        mux.Handle("/arvados/v1/containers", containersHandler)
        mux.Handle("/arvados/v1/containers/", containersHandler)
-       mux.Handle("/arvados/v1/container_requests", containerRequestsHandler)
-       mux.Handle("/arvados/v1/container_requests/", containerRequestsHandler)
-       mux.Handle("/arvados/v1/collections", collectionsRequestsHandler)
-       mux.Handle("/arvados/v1/collections/", collectionsRequestsHandler)
        mux.Handle("/arvados/v1/links", linksRequestsHandler)
        mux.Handle("/arvados/v1/links/", linksRequestsHandler)
        mux.Handle("/", next)
        mux.Handle("/arvados/v1/links", linksRequestsHandler)
        mux.Handle("/arvados/v1/links/", linksRequestsHandler)
        mux.Handle("/", next)
index f4cadd821b3d008d7ad8cdf68a402d9e7bd05092..4c0ffec60c443bb25f2815ca3a7ea88eeaa9a1ae 100644 (file)
@@ -64,6 +64,8 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
        cluster.API.MaxItemsPerResponse = 1000
        cluster.API.MaxRequestAmplification = 4
        cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
        cluster.API.MaxItemsPerResponse = 1000
        cluster.API.MaxRequestAmplification = 4
        cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
+       cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
+       cluster.Collections.BlobSigningTTL = arvados.Duration(time.Hour * 24 * 14)
        arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
        arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
        s.testHandler = &Handler{Cluster: cluster}
        arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
        arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
        s.testHandler = &Handler{Cluster: cluster}
@@ -695,7 +697,6 @@ func (s *FederationSuite) TestCreateRemoteContainerRequestCheckRuntimeToken(c *c
        s.testHandler.Cluster.ClusterID = "zzzzz"
        s.testHandler.Cluster.SystemRootToken = arvadostest.SystemRootToken
        s.testHandler.Cluster.API.MaxTokenLifetime = arvados.Duration(time.Hour)
        s.testHandler.Cluster.ClusterID = "zzzzz"
        s.testHandler.Cluster.SystemRootToken = arvadostest.SystemRootToken
        s.testHandler.Cluster.API.MaxTokenLifetime = arvados.Duration(time.Hour)
-       s.testHandler.Cluster.Collections.BlobSigningTTL = arvados.Duration(336 * time.Hour) // For some reason, this was set to 0h
 
        resp := s.testRequest(req).Result()
        c.Check(resp.StatusCode, check.Equals, http.StatusOK)
 
        resp := s.testRequest(req).Result()
        c.Check(resp.StatusCode, check.Equals, http.StatusOK)
index e3558c3f41bec4b47c01b2575e4793fbbebb7674..54f8bdbebfa4cc82b32c39f615802a19f724227c 100644 (file)
@@ -9,6 +9,7 @@ import (
        "net/http"
        "os"
        "path/filepath"
        "net/http"
        "os"
        "path/filepath"
+       "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
@@ -39,6 +40,8 @@ func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
                PostgreSQL: integrationTestCluster().PostgreSQL,
        }}
        handler.Cluster.TLS.Insecure = true
                PostgreSQL: integrationTestCluster().PostgreSQL,
        }}
        handler.Cluster.TLS.Insecure = true
+       handler.Cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
+       handler.Cluster.Collections.BlobSigningTTL = arvados.Duration(time.Hour * 24 * 14)
        arvadostest.SetServiceURL(&handler.Cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
        arvadostest.SetServiceURL(&handler.Cluster.Services.Controller, "http://localhost:/")
 
        arvadostest.SetServiceURL(&handler.Cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
        arvadostest.SetServiceURL(&handler.Cluster.Services.Controller, "http://localhost:/")