Merge branch '21901-file-log-throttling'
[arvados.git] / services / keep-web / s3.go
index 38428cdab1cb0a54775481ee9c45c977acab8af0..75dc8f98e57b2af53e6a483e06c13d833aed4d60 100644 (file)
@@ -31,9 +31,10 @@ import (
 )
 
 const (
-       s3MaxKeys       = 1000
-       s3SignAlgorithm = "AWS4-HMAC-SHA256"
-       s3MaxClockSkew  = 5 * time.Minute
+       s3MaxKeys                 = 1000
+       s3SignAlgorithm           = "AWS4-HMAC-SHA256"
+       s3MaxClockSkew            = 5 * time.Minute
+       s3SecretCacheTidyInterval = time.Minute
 )
 
 type commonPrefix struct {
@@ -93,6 +94,31 @@ type s3Key struct {
        }
 }
 
+type cachedS3Secret struct {
+       auth   *arvados.APIClientAuthorization
+       expiry time.Time
+}
+
+func newCachedS3Secret(auth *arvados.APIClientAuthorization, maxExpiry time.Time) *cachedS3Secret {
+       var expiry time.Time
+       if auth.ExpiresAt.IsZero() || maxExpiry.Before(auth.ExpiresAt) {
+               expiry = maxExpiry
+       } else {
+               expiry = auth.ExpiresAt
+       }
+       return &cachedS3Secret{
+               auth:   auth,
+               expiry: expiry,
+       }
+}
+
+func (cs *cachedS3Secret) isValidAt(t time.Time) bool {
+       return cs.auth != nil &&
+               !cs.expiry.IsZero() &&
+               !t.IsZero() &&
+               t.Before(cs.expiry)
+}
+
 func hmacstring(msg string, key []byte) []byte {
        h := hmac.New(sha256.New, key)
        io.WriteString(h, msg)
@@ -217,6 +243,33 @@ func unescapeKey(key string) string {
        }
 }
 
+func (h *handler) updateS3SecretCache(aca *arvados.APIClientAuthorization, key string) {
+       now := time.Now()
+       ttlExpiry := now.Add(h.Cluster.Collections.WebDAVCache.TTL.Duration())
+       cachedSecret := newCachedS3Secret(aca, ttlExpiry)
+
+       h.s3SecretCacheMtx.Lock()
+       defer h.s3SecretCacheMtx.Unlock()
+
+       if h.s3SecretCache == nil {
+               h.s3SecretCache = make(map[string]*cachedS3Secret)
+       }
+       h.s3SecretCache[key] = cachedSecret
+       h.s3SecretCache[cachedSecret.auth.UUID] = cachedSecret
+       h.s3SecretCache[cachedSecret.auth.APIToken] = cachedSecret
+       h.s3SecretCache[cachedSecret.auth.TokenV2()] = cachedSecret
+
+       if h.s3SecretCacheNextTidy.After(now) {
+               return
+       }
+       for key, entry := range h.s3SecretCache {
+               if entry.expiry.Before(now) {
+                       delete(h.s3SecretCache, key)
+               }
+       }
+       h.s3SecretCacheNextTidy = now.Add(s3SecretCacheTidyInterval)
+}
+
 // checks3signature verifies the given S3 V4 signature and returns the
 // Arvados token that corresponds to the given accessKey. An error is
 // returned if accessKey is not a valid token UUID or the signature
@@ -241,31 +294,43 @@ func (h *handler) checks3signature(r *http.Request) (string, error) {
                        signature = split[1]
                }
        }
+       keyIsUUID := len(key) == 27 && key[5:12] == "-gj3su-"
+       unescapedKey := unescapeKey(key)
 
-       client := (&arvados.Client{
-               APIHost:  h.Cluster.Services.Controller.ExternalURL.Host,
-               Insecure: h.Cluster.TLS.Insecure,
-       }).WithRequestID(r.Header.Get("X-Request-Id"))
-       var aca arvados.APIClientAuthorization
+       h.s3SecretCacheMtx.Lock()
+       cached := h.s3SecretCache[unescapedKey]
+       h.s3SecretCacheMtx.Unlock()
+       usedCache := cached != nil && cached.isValidAt(time.Now())
+       var aca *arvados.APIClientAuthorization
+       if usedCache {
+               aca = cached.auth
+       } else {
+               var acaAuth, acaPath string
+               if keyIsUUID {
+                       acaAuth = h.Cluster.SystemRootToken
+                       acaPath = key
+               } else {
+                       acaAuth = unescapedKey
+                       acaPath = "current"
+               }
+               client := (&arvados.Client{
+                       APIHost:  h.Cluster.Services.Controller.ExternalURL.Host,
+                       Insecure: h.Cluster.TLS.Insecure,
+               }).WithRequestID(r.Header.Get("X-Request-Id"))
+               ctx := arvados.ContextWithAuthorization(r.Context(), "Bearer "+acaAuth)
+               aca = new(arvados.APIClientAuthorization)
+               err := client.RequestAndDecodeContext(ctx, aca, "GET", "arvados/v1/api_client_authorizations/"+acaPath, nil, nil)
+               if err != nil {
+                       ctxlog.FromContext(r.Context()).WithError(err).WithField("UUID", key).Info("token lookup failed")
+                       return "", errors.New("invalid access key")
+               }
+       }
        var secret string
-       var err error
-       if len(key) == 27 && key[5:12] == "-gj3su-" {
-               // Access key is the UUID of an Arvados token, secret
-               // key is the secret part.
-               ctx := arvados.ContextWithAuthorization(r.Context(), "Bearer "+h.Cluster.SystemRootToken)
-               err = client.RequestAndDecodeContext(ctx, &aca, "GET", "arvados/v1/api_client_authorizations/"+key, nil, nil)
+       if keyIsUUID {
                secret = aca.APIToken
        } else {
-               // Access key and secret key are both an entire
-               // Arvados token or OIDC access token.
-               ctx := arvados.ContextWithAuthorization(r.Context(), "Bearer "+unescapeKey(key))
-               err = client.RequestAndDecodeContext(ctx, &aca, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
                secret = key
        }
-       if err != nil {
-               ctxlog.FromContext(r.Context()).WithError(err).WithField("UUID", key).Info("token lookup failed")
-               return "", errors.New("invalid access key")
-       }
        stringToSign, err := s3stringToSign(s3SignAlgorithm, scope, signedHeaders, r)
        if err != nil {
                return "", err
@@ -276,6 +341,9 @@ func (h *handler) checks3signature(r *http.Request) (string, error) {
        } else if expect != signature {
                return "", fmt.Errorf("signature does not match (scope %q signedHeaders %q stringToSign %q)", scope, signedHeaders, stringToSign)
        }
+       if !usedCache {
+               h.updateS3SecretCache(aca, unescapedKey)
+       }
        return aca.TokenV2(), nil
 }
 
@@ -335,6 +403,7 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                return true
        }
+       defer sess.Release()
        readfs := fs
        if writeMethod[r.Method] {
                // Create a FileSystem for this request, to avoid
@@ -648,6 +717,9 @@ func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path st
                switch src := fi.Sys().(type) {
                case *arvados.Collection:
                        props = src.Properties
+                       if src.PortableDataHash != "" {
+                               header.Set("Etag", fmt.Sprintf(`"%s"`, src.PortableDataHash))
+                       }
                case *arvados.Group:
                        props = src.Properties
                default:
@@ -772,6 +844,9 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        http.Error(w, "invalid continuation token", http.StatusBadRequest)
                        return
                }
+               // marker and start-after perform the same function,
+               // but we keep them separate so we can repeat them
+               // back to the client in the response.
                params.marker = string(marker)
                params.startAfter = r.FormValue("start-after")
                switch r.FormValue("encoding-type") {
@@ -783,9 +858,17 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        return
                }
        } else {
+               // marker is functionally equivalent to start-after.
                params.marker = r.FormValue("marker")
        }
 
+       // startAfter is params.marker or params.startAfter, whichever
+       // comes last.
+       startAfter := params.startAfter
+       if startAfter < params.marker {
+               startAfter = params.marker
+       }
+
        bucketdir := "by_id/" + bucket
        // walkpath is the directory (relative to bucketdir) we need
        // to walk: the innermost directory that is guaranteed to
@@ -809,9 +892,15 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                ContinuationToken: r.FormValue("continuation-token"),
                StartAfter:        params.startAfter,
        }
+
+       // nextMarker will be the last path we add to either
+       // resp.Contents or commonPrefixes.  It will be included in
+       // the response as NextMarker or NextContinuationToken if
+       // needed.
        nextMarker := ""
 
        commonPrefixes := map[string]bool{}
+       full := false
        err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), true, func(path string, fi os.FileInfo) error {
                if path == bucketdir {
                        return nil
@@ -822,36 +911,29 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        path += "/"
                        filesize = 0
                }
-               if len(path) <= len(params.prefix) {
-                       if path > params.prefix[:len(path)] {
-                               // with prefix "foobar", walking "fooz" means we're done
-                               return errDone
-                       }
-                       if path < params.prefix[:len(path)] {
-                               // with prefix "foobar", walking "foobag" is pointless
-                               return filepath.SkipDir
-                       }
-                       if fi.IsDir() && !strings.HasPrefix(params.prefix+"/", path) {
-                               // with prefix "foo/bar", walking "fo"
-                               // is pointless (but walking "foo" or
-                               // "foo/bar" is necessary)
-                               return filepath.SkipDir
-                       }
-                       if len(path) < len(params.prefix) {
-                               // can't skip anything, and this entry
-                               // isn't in the results, so just
-                               // continue descent
-                               return nil
-                       }
-               } else {
-                       if path[:len(params.prefix)] > params.prefix {
-                               // with prefix "foobar", nothing we
-                               // see after "foozzz" is relevant
-                               return errDone
-                       }
-               }
-               if path < params.marker || path < params.prefix || path <= params.startAfter {
+               if strings.HasPrefix(params.prefix, path) && params.prefix != path {
+                       // Descend into subtree until we reach desired prefix
+                       return nil
+               } else if path < params.prefix {
+                       // Not an ancestor or descendant of desired
+                       // prefix, therefore none of its descendants
+                       // can be either -- skip
+                       return filepath.SkipDir
+               } else if path > params.prefix && !strings.HasPrefix(path, params.prefix) {
+                       // We must have traversed everything under
+                       // desired prefix
+                       return errDone
+               } else if path == startAfter {
+                       // Skip startAfter itself, just descend into
+                       // subtree
+                       return nil
+               } else if strings.HasPrefix(startAfter, path) {
+                       // Descend into subtree in case it contains
+                       // something after startAfter
                        return nil
+               } else if path < startAfter {
+                       // Skip ahead until we reach startAfter
+                       return filepath.SkipDir
                }
                if fi.IsDir() && !h.Cluster.Collections.S3FolderObjects {
                        // Note we don't add anything to
@@ -861,13 +943,6 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        // finding a regular file inside it.
                        return nil
                }
-               if len(resp.Contents)+len(commonPrefixes) >= params.maxKeys {
-                       resp.IsTruncated = true
-                       if params.delimiter != "" || params.v2 {
-                               nextMarker = path
-                       }
-                       return errDone
-               }
                if params.delimiter != "" {
                        idx := strings.Index(path[len(params.prefix):], params.delimiter)
                        if idx >= 0 {
@@ -875,21 +950,42 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                                // "z", when we hit "foobar/baz", we
                                // add "/baz" to commonPrefixes and
                                // stop descending.
-                               commonPrefixes[path[:len(params.prefix)+idx+1]] = true
-                               return filepath.SkipDir
+                               prefix := path[:len(params.prefix)+idx+1]
+                               if prefix == startAfter {
+                                       return nil
+                               } else if prefix < startAfter && !strings.HasPrefix(startAfter, prefix) {
+                                       return nil
+                               } else if full {
+                                       resp.IsTruncated = true
+                                       return errDone
+                               } else {
+                                       commonPrefixes[prefix] = true
+                                       nextMarker = prefix
+                                       full = len(resp.Contents)+len(commonPrefixes) >= params.maxKeys
+                                       return filepath.SkipDir
+                               }
                        }
                }
+               if full {
+                       resp.IsTruncated = true
+                       return errDone
+               }
                resp.Contents = append(resp.Contents, s3Key{
                        Key:          path,
                        LastModified: fi.ModTime().UTC().Format("2006-01-02T15:04:05.999") + "Z",
                        Size:         filesize,
                })
+               nextMarker = path
+               full = len(resp.Contents)+len(commonPrefixes) >= params.maxKeys
                return nil
        })
        if err != nil && err != errDone {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
+       if params.delimiter == "" && !params.v2 || !resp.IsTruncated {
+               nextMarker = ""
+       }
        if params.delimiter != "" {
                resp.CommonPrefixes = make([]commonPrefix, 0, len(commonPrefixes))
                for prefix := range commonPrefixes {