From: Tom Clegg Date: Thu, 26 Aug 2021 04:04:26 +0000 (-0400) Subject: 17217: Update tests, remove unused code. X-Git-Tag: 2.3.0~93^2~5 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/9ed0a21813117caf5c6bae73c09a0d725564743b 17217: Update tests, remove unused code. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/lib/controller/fed_collections.go b/lib/controller/fed_collections.go deleted file mode 100644 index a0a123129f..0000000000 --- a/lib/controller/fed_collections.go +++ /dev/null @@ -1,312 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: AGPL-3.0 - -package controller - -import ( - "bufio" - "bytes" - "context" - "crypto/md5" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "net/http" - "strings" - "sync" - - "git.arvados.org/arvados.git/sdk/go/arvados" - "git.arvados.org/arvados.git/sdk/go/httpserver" - "git.arvados.org/arvados.git/sdk/go/keepclient" -) - -func rewriteSignatures(clusterID string, expectHash string, - resp *http.Response, requestError error) (newResponse *http.Response, err error) { - - if requestError != nil { - return resp, requestError - } - - if resp.StatusCode != http.StatusOK { - return resp, nil - } - - originalBody := resp.Body - defer originalBody.Close() - - var col arvados.Collection - err = json.NewDecoder(resp.Body).Decode(&col) - if err != nil { - return nil, err - } - - // rewriting signatures will make manifest text 5-10% bigger so calculate - // capacity accordingly - updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1))) - - hasher := md5.New() - mw := io.MultiWriter(hasher, updatedManifest) - sz := 0 - - scanner := bufio.NewScanner(strings.NewReader(col.ManifestText)) - scanner.Buffer(make([]byte, 1048576), len(col.ManifestText)) - for scanner.Scan() { - line := scanner.Text() - tokens := strings.Split(line, " ") - if len(tokens) < 3 { - return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line) - } - - n, err := mw.Write([]byte(tokens[0])) - if err != nil { - return nil, fmt.Errorf("Error updating manifest: %v", err) - } - sz += n - for _, token := range tokens[1:] { - n, err = mw.Write([]byte(" ")) - if err != nil { - return nil, fmt.Errorf("Error updating manifest: %v", err) - } - sz += n - - m := keepclient.SignedLocatorRe.FindStringSubmatch(token) - if m != nil { - // Rewrite the block signature to be a remote signature - _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8]) - if err != nil { - return nil, fmt.Errorf("Error updating manifest: %v", err) - } - - // for hash checking, ignore signatures - n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2]) - if err != nil { - return nil, fmt.Errorf("Error updating manifest: %v", err) - } - sz += n - } else { - n, err = mw.Write([]byte(token)) - if err != nil { - return nil, fmt.Errorf("Error updating manifest: %v", err) - } - sz += n - } - } - n, err = mw.Write([]byte("\n")) - if err != nil { - return nil, fmt.Errorf("Error updating manifest: %v", err) - } - sz += n - } - - // Check that expected hash is consistent with - // portable_data_hash field of the returned record - if expectHash == "" { - expectHash = col.PortableDataHash - } else if expectHash != col.PortableDataHash { - return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash) - } - - // Certify that the computed hash of the manifest_text matches our expectation - sum := hasher.Sum(nil) - computedHash := fmt.Sprintf("%x+%v", sum, sz) - if computedHash != expectHash { - return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash) - } - - col.ManifestText = updatedManifest.String() - - newbody, err := json.Marshal(col) - if err != nil { - return nil, err - } - - buf := bytes.NewBuffer(newbody) - resp.Body = ioutil.NopCloser(buf) - resp.ContentLength = int64(buf.Len()) - resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len())) - - return resp, nil -} - -func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) { - if requestError != nil { - return resp, requestError - } - - if resp.StatusCode == http.StatusNotFound { - // Suppress returning this result, because we want to - // search the federation. - return nil, nil - } - return resp, nil -} - -type searchRemoteClusterForPDH struct { - pdh string - remoteID string - mtx *sync.Mutex - sentResponse *bool - sharedContext *context.Context - cancelFunc func() - errors *[]string - statusCode *int -} - -func fetchRemoteCollectionByUUID( - h *genericFederatedRequestHandler, - effectiveMethod string, - clusterID *string, - uuid string, - remainder string, - w http.ResponseWriter, - req *http.Request) bool { - - if effectiveMethod != "GET" { - // Only handle GET requests right now - return false - } - - if uuid != "" { - // Collection UUID GET request - *clusterID = uuid[0:5] - if *clusterID != "" && *clusterID != h.handler.Cluster.ClusterID { - // request for remote collection by uuid - resp, err := h.handler.remoteClusterRequest(*clusterID, req) - newResponse, err := rewriteSignatures(*clusterID, "", resp, err) - h.handler.proxy.ForwardResponse(w, newResponse, err) - return true - } - } - - return false -} - -func fetchRemoteCollectionByPDH( - h *genericFederatedRequestHandler, - effectiveMethod string, - clusterID *string, - uuid string, - remainder string, - w http.ResponseWriter, - req *http.Request) bool { - - if effectiveMethod != "GET" { - // Only handle GET requests right now - return false - } - - m := collectionsByPDHRe.FindStringSubmatch(req.URL.Path) - if len(m) != 2 { - return false - } - - // Request for collection by PDH. Search the federation. - - // First, query the local cluster. - resp, err := h.handler.localClusterRequest(req) - newResp, err := filterLocalClusterResponse(resp, err) - if newResp != nil || err != nil { - h.handler.proxy.ForwardResponse(w, newResp, err) - return true - } - - // Create a goroutine for each cluster in the - // RemoteClusters map. The first valid result gets - // returned to the client. When that happens, all - // other outstanding requests are cancelled - sharedContext, cancelFunc := context.WithCancel(req.Context()) - defer cancelFunc() - - req = req.WithContext(sharedContext) - wg := sync.WaitGroup{} - pdh := m[1] - success := make(chan *http.Response) - errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters)) - - acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification) - - for remoteID := range h.handler.Cluster.RemoteClusters { - if remoteID == h.handler.Cluster.ClusterID { - // No need to query local cluster again - continue - } - if remoteID == "*" { - // This isn't a real remote cluster: it just sets defaults for unlisted remotes. - continue - } - - wg.Add(1) - go func(remote string) { - defer wg.Done() - acquire() - defer release() - select { - case <-sharedContext.Done(): - return - default: - } - - resp, err := h.handler.remoteClusterRequest(remote, req) - wasSuccess := false - defer func() { - if resp != nil && !wasSuccess { - resp.Body.Close() - } - }() - if err != nil { - errorChan <- err - return - } - if resp.StatusCode != http.StatusOK { - errorChan <- HTTPError{resp.Status, resp.StatusCode} - return - } - select { - case <-sharedContext.Done(): - return - default: - } - - newResponse, err := rewriteSignatures(remote, pdh, resp, nil) - if err != nil { - errorChan <- err - return - } - select { - case <-sharedContext.Done(): - case success <- newResponse: - wasSuccess = true - } - }(remoteID) - } - go func() { - wg.Wait() - cancelFunc() - }() - - errorCode := http.StatusNotFound - - for { - select { - case newResp = <-success: - h.handler.proxy.ForwardResponse(w, newResp, nil) - return true - case <-sharedContext.Done(): - var errors []string - for len(errorChan) > 0 { - err := <-errorChan - if httperr, ok := err.(HTTPError); !ok || httperr.Code != http.StatusNotFound { - errorCode = http.StatusBadGateway - } - errors = append(errors, err.Error()) - } - httpserver.Errors(w, errors, errorCode) - return true - } - } - - // shouldn't ever get here - return true -} diff --git a/lib/controller/fed_containers.go b/lib/controller/fed_containers.go deleted file mode 100644 index fd4f0521bc..0000000000 --- a/lib/controller/fed_containers.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: AGPL-3.0 - -package controller - -import ( - "bytes" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "strings" - - "git.arvados.org/arvados.git/sdk/go/auth" - "git.arvados.org/arvados.git/sdk/go/httpserver" -) - -func remoteContainerRequestCreate( - h *genericFederatedRequestHandler, - effectiveMethod string, - clusterID *string, - uuid string, - remainder string, - w http.ResponseWriter, - req *http.Request) bool { - - if effectiveMethod != "POST" || uuid != "" || remainder != "" { - return false - } - - // First make sure supplied token is valid. - creds := auth.NewCredentials() - creds.LoadTokensFromHTTPRequest(req) - - currentUser, ok, err := h.handler.validateAPItoken(req, creds.Tokens[0]) - if err != nil { - httpserver.Error(w, err.Error(), http.StatusInternalServerError) - return true - } else if !ok { - httpserver.Error(w, "invalid API token", http.StatusForbidden) - return true - } - - if *clusterID == "" || *clusterID == h.handler.Cluster.ClusterID { - // Submitting container request to local cluster. No - // need to set a runtime_token (rails api will create - // one when the container runs) or do a remote cluster - // request. - return false - } - - if req.Header.Get("Content-Type") != "application/json" { - httpserver.Error(w, "Expected Content-Type: application/json, got "+req.Header.Get("Content-Type"), http.StatusBadRequest) - return true - } - - originalBody := req.Body - defer originalBody.Close() - var request map[string]interface{} - err = json.NewDecoder(req.Body).Decode(&request) - if err != nil { - httpserver.Error(w, err.Error(), http.StatusBadRequest) - return true - } - - crString, ok := request["container_request"].(string) - if ok { - var crJSON map[string]interface{} - err := json.Unmarshal([]byte(crString), &crJSON) - if err != nil { - httpserver.Error(w, err.Error(), http.StatusBadRequest) - return true - } - - request["container_request"] = crJSON - } - - containerRequest, ok := request["container_request"].(map[string]interface{}) - if !ok { - // Use toplevel object as the container_request object - containerRequest = request - } - - // If runtime_token is not set, create a new token - if _, ok := containerRequest["runtime_token"]; !ok { - if len(currentUser.Authorization.Scopes) != 1 || currentUser.Authorization.Scopes[0] != "all" { - httpserver.Error(w, "Token scope is not [all]", http.StatusForbidden) - return true - } - - if strings.HasPrefix(currentUser.Authorization.UUID, h.handler.Cluster.ClusterID) { - // Local user, submitting to a remote cluster. - // Create a new time-limited token. - newtok, err := h.handler.createAPItoken(req, currentUser.UUID, nil) - if err != nil { - httpserver.Error(w, err.Error(), http.StatusForbidden) - return true - } - containerRequest["runtime_token"] = newtok.TokenV2() - } else { - // Remote user. Container request will use the - // current token, minus the trailing portion - // (optional container uuid). - sp := strings.Split(creds.Tokens[0], "/") - if len(sp) >= 3 { - containerRequest["runtime_token"] = strings.Join(sp[0:3], "/") - } else { - containerRequest["runtime_token"] = creds.Tokens[0] - } - } - } - - newbody, err := json.Marshal(request) - buf := bytes.NewBuffer(newbody) - req.Body = ioutil.NopCloser(buf) - req.ContentLength = int64(buf.Len()) - req.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len())) - - resp, err := h.handler.remoteClusterRequest(*clusterID, req) - h.handler.proxy.ForwardResponse(w, resp, err) - return true -} diff --git a/lib/controller/federation.go b/lib/controller/federation.go index 419d8b0104..144d41c21b 100644 --- a/lib/controller/federation.go +++ b/lib/controller/federation.go @@ -94,20 +94,12 @@ func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler { wfHandler := &genericFederatedRequestHandler{next, h, wfRe, nil} containersHandler := &genericFederatedRequestHandler{next, h, containersRe, nil} - containerRequestsHandler := &genericFederatedRequestHandler{next, h, containerRequestsRe, - []federatedRequestDelegate{remoteContainerRequestCreate}} - collectionsRequestsHandler := &genericFederatedRequestHandler{next, h, collectionsRe, - []federatedRequestDelegate{fetchRemoteCollectionByUUID, fetchRemoteCollectionByPDH}} linksRequestsHandler := &genericFederatedRequestHandler{next, h, linksRe, nil} mux.Handle("/arvados/v1/workflows", wfHandler) mux.Handle("/arvados/v1/workflows/", wfHandler) mux.Handle("/arvados/v1/containers", containersHandler) mux.Handle("/arvados/v1/containers/", containersHandler) - mux.Handle("/arvados/v1/container_requests", containerRequestsHandler) - mux.Handle("/arvados/v1/container_requests/", containerRequestsHandler) - mux.Handle("/arvados/v1/collections", collectionsRequestsHandler) - mux.Handle("/arvados/v1/collections/", collectionsRequestsHandler) mux.Handle("/arvados/v1/links", linksRequestsHandler) mux.Handle("/arvados/v1/links/", linksRequestsHandler) mux.Handle("/", next) diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go index f4cadd821b..4c0ffec60c 100644 --- a/lib/controller/federation_test.go +++ b/lib/controller/federation_test.go @@ -64,6 +64,8 @@ func (s *FederationSuite) SetUpTest(c *check.C) { cluster.API.MaxItemsPerResponse = 1000 cluster.API.MaxRequestAmplification = 4 cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute) + cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey + cluster.Collections.BlobSigningTTL = arvados.Duration(time.Hour * 24 * 14) arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/") arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/") s.testHandler = &Handler{Cluster: cluster} @@ -695,7 +697,6 @@ func (s *FederationSuite) TestCreateRemoteContainerRequestCheckRuntimeToken(c *c s.testHandler.Cluster.ClusterID = "zzzzz" s.testHandler.Cluster.SystemRootToken = arvadostest.SystemRootToken s.testHandler.Cluster.API.MaxTokenLifetime = arvados.Duration(time.Hour) - s.testHandler.Cluster.Collections.BlobSigningTTL = arvados.Duration(336 * time.Hour) // For some reason, this was set to 0h resp := s.testRequest(req).Result() c.Check(resp.StatusCode, check.Equals, http.StatusOK) diff --git a/lib/controller/server_test.go b/lib/controller/server_test.go index e3558c3f41..54f8bdbebf 100644 --- a/lib/controller/server_test.go +++ b/lib/controller/server_test.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "path/filepath" + "time" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadostest" @@ -39,6 +40,8 @@ func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server { PostgreSQL: integrationTestCluster().PostgreSQL, }} handler.Cluster.TLS.Insecure = true + handler.Cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey + handler.Cluster.Collections.BlobSigningTTL = arvados.Duration(time.Hour * 24 * 14) arvadostest.SetServiceURL(&handler.Cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST")) arvadostest.SetServiceURL(&handler.Cluster.Services.Controller, "http://localhost:/")