X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/572e375e1c950f35d7e159bac031de5205354590..2f83fcd45b4b23db2bb5bb4afbe1e863ebd77ec6:/services/keep-web/s3.go diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go index e6262374d6..3e60f3006d 100644 --- a/services/keep-web/s3.go +++ b/services/keep-web/s3.go @@ -2,18 +2,21 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepweb import ( "crypto/hmac" "crypto/sha256" "encoding/base64" + "encoding/json" "encoding/xml" "errors" "fmt" "hash" "io" + "mime" "net/http" + "net/textproto" "net/url" "os" "path/filepath" @@ -24,10 +27,7 @@ import ( "time" "git.arvados.org/arvados.git/sdk/go/arvados" - "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/ctxlog" - "git.arvados.org/arvados.git/sdk/go/keepclient" - "github.com/AdRoll/goamz/s3" ) const ( @@ -41,11 +41,17 @@ type commonPrefix struct { } type listV1Resp struct { - XMLName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"` - s3.ListResp - // s3.ListResp marshals an empty tag when - // CommonPrefixes is nil, which confuses some clients. - // Fix by using this nested struct instead. + XMLName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"` + Name string + Prefix string + Delimiter string + Marker string + MaxKeys int + IsTruncated bool + Contents []s3Key + // If we use a []string here, xml marshals an empty tag when + // CommonPrefixes is nil, which confuses some clients. Fix by + // using this nested struct instead. CommonPrefixes []commonPrefix // Similarly, we need omitempty here, because an empty // tag confuses some clients (e.g., @@ -59,7 +65,7 @@ type listV1Resp struct { type listV2Resp struct { XMLName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"` IsTruncated bool - Contents []s3.Key + Contents []s3Key Name string Prefix string Delimiter string @@ -72,6 +78,21 @@ type listV2Resp struct { StartAfter string `xml:",omitempty"` } +type s3Key struct { + Key string + LastModified string + Size int64 + // The following fields are not populated, but are here in + // case clients rely on the keys being present in xml + // responses. + ETag string + StorageClass string + Owner struct { + ID string + DisplayName string + } +} + func hmacstring(msg string, key []byte) []byte { h := hmac.New(sha256.New, key) io.WriteString(h, msg) @@ -222,8 +243,8 @@ func (h *handler) checks3signature(r *http.Request) (string, error) { } client := (&arvados.Client{ - APIHost: h.Config.cluster.Services.Controller.ExternalURL.Host, - Insecure: h.Config.cluster.TLS.Insecure, + APIHost: h.Cluster.Services.Controller.ExternalURL.Host, + Insecure: h.Cluster.TLS.Insecure, }).WithRequestID(r.Header.Get("X-Request-Id")) var aca arvados.APIClientAuthorization var secret string @@ -231,7 +252,7 @@ func (h *handler) checks3signature(r *http.Request) (string, error) { if len(key) == 27 && key[5:12] == "-gj3su-" { // Access key is the UUID of an Arvados token, secret // key is the secret part. - ctx := arvados.ContextWithAuthorization(r.Context(), "Bearer "+h.Config.cluster.SystemRootToken) + ctx := arvados.ContextWithAuthorization(r.Context(), "Bearer "+h.Cluster.SystemRootToken) err = client.RequestAndDecodeContext(ctx, &aca, "GET", "arvados/v1/api_client_authorizations/"+key, nil, nil) secret = aca.APIToken } else { @@ -309,40 +330,26 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return false } - var err error - var fs arvados.CustomFileSystem - var arvclient *arvadosclient.ArvadosClient - if r.Method == http.MethodGet || r.Method == http.MethodHead { - // Use a single session (cached FileSystem) across - // multiple read requests. - var sess *cachedSession - fs, sess, err = h.Config.Cache.GetSession(token) - if err != nil { - s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) - return true - } - arvclient = sess.arvadosclient - } else { + fs, sess, tokenUser, err := h.Cache.GetSession(token) + if err != nil { + s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) + return true + } + defer sess.Release() + readfs := fs + if writeMethod[r.Method] { // Create a FileSystem for this request, to avoid // exposing incomplete write operations to concurrent // requests. - var kc *keepclient.KeepClient - var release func() - var client *arvados.Client - arvclient, kc, client, release, err = h.getClients(r.Header.Get("X-Request-Id"), token) - if err != nil { - s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) - return true - } - defer release() - fs = client.SiteFileSystem(kc) - fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution) + client := sess.client.WithRequestID(r.Header.Get("X-Request-Id")) + fs = client.SiteFileSystem(sess.keepclient) + fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution) } var objectNameGiven bool var bucketName string fspath := "/by_id" - if id := parseCollectionIDFromDNSName(r.Host); id != "" { + if id := arvados.CollectionIDFromDNSName(r.Host); id != "" { fspath += "/" + id bucketName = id objectNameGiven = strings.Count(strings.TrimSuffix(r.URL.Path, "/"), "/") > 0 @@ -365,7 +372,7 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { w.Header().Set("Content-Type", "application/xml") io.WriteString(w, xml.Header) fmt.Fprintln(w, ``+ - h.Config.cluster.ClusterID+ + h.Cluster.ClusterID+ ``) } else if reRawQueryIndicatesAPI.MatchString(r.URL.RawQuery) { // GetBucketWebsite ("GET /bucketid/?website"), GetBucketTagging, etc. @@ -385,6 +392,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { if r.Method == "HEAD" && !objectNameGiven { // HeadBucket if err == nil && fi.IsDir() { + err = setFileInfoHeaders(w.Header(), fs, fspath) + if err != nil { + s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusBadGateway) + return true + } w.WriteHeader(http.StatusOK) } else if os.IsNotExist(err) { s3ErrorResponse(w, NoSuchBucket, "The specified bucket does not exist.", r.URL.Path, http.StatusNotFound) @@ -393,7 +405,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { } return true } - if err == nil && fi.IsDir() && objectNameGiven && strings.HasSuffix(fspath, "/") && h.Config.cluster.Collections.S3FolderObjects { + if err == nil && fi.IsDir() && objectNameGiven && strings.HasSuffix(fspath, "/") && h.Cluster.Collections.S3FolderObjects { + err = setFileInfoHeaders(w.Header(), fs, fspath) + if err != nil { + s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusBadGateway) + return true + } w.Header().Set("Content-Type", "application/x-directory") w.WriteHeader(http.StatusOK) return true @@ -405,16 +422,20 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return true } - tokenUser, err := h.Config.Cache.GetTokenUser(token) if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) { http.Error(w, "Not permitted", http.StatusForbidden) return true } - h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser) + h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser) // shallow copy r, and change URL path r := *r r.URL.Path = fspath + err = setFileInfoHeaders(w.Header(), fs, fspath) + if err != nil { + s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusBadGateway) + return true + } http.FileServer(fs).ServeHTTP(w, &r) return true case r.Method == http.MethodPut: @@ -429,7 +450,7 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { } var objectIsDir bool if strings.HasSuffix(fspath, "/") { - if !h.Config.cluster.Collections.S3FolderObjects { + if !h.Cluster.Collections.S3FolderObjects { s3ErrorResponse(w, InvalidArgument, "invalid object name: trailing slash", r.URL.Path, http.StatusBadRequest) return true } @@ -471,7 +492,7 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return true } err = fs.Mkdir(dir, 0755) - if err == arvados.ErrInvalidArgument { + if errors.Is(err, arvados.ErrInvalidArgument) || errors.Is(err, arvados.ErrInvalidOperation) { // Cannot create a directory // here. err = fmt.Errorf("mkdir %q failed: %w", dir, err) @@ -496,12 +517,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { } defer f.Close() - tokenUser, err := h.Config.Cache.GetTokenUser(token) if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) { http.Error(w, "Not permitted", http.StatusForbidden) return true } - h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser) + h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser) _, err = io.Copy(f, r.Body) if err != nil { @@ -516,14 +536,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return true } } - err = fs.Sync() + err = h.syncCollection(fs, readfs, fspath) if err != nil { err = fmt.Errorf("sync failed: %w", err) s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) return true } - // Ensure a subsequent read operation will see the changes. - h.Config.Cache.ResetSession(token) w.WriteHeader(http.StatusOK) return true case r.Method == http.MethodDelete: @@ -570,14 +588,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { s3ErrorResponse(w, InvalidArgument, err.Error(), r.URL.Path, http.StatusBadRequest) return true } - err = fs.Sync() + err = h.syncCollection(fs, readfs, fspath) if err != nil { err = fmt.Errorf("sync failed: %w", err) s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) return true } - // Ensure a subsequent read operation will see the changes. - h.Config.Cache.ResetSession(token) w.WriteHeader(http.StatusNoContent) return true default: @@ -586,6 +602,88 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { } } +// Save modifications to the indicated collection in srcfs, then (if +// successful) ensure they are also reflected in dstfs. +func (h *handler) syncCollection(srcfs, dstfs arvados.CustomFileSystem, path string) error { + coll, _ := h.determineCollection(srcfs, path) + if coll == nil || coll.UUID == "" { + return errors.New("could not determine collection to sync") + } + d, err := srcfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777) + if err != nil { + return err + } + defer d.Close() + err = d.Sync() + if err != nil { + return err + } + snap, err := d.Snapshot() + if err != nil { + return err + } + dstd, err := dstfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777) + if err != nil { + return err + } + defer dstd.Close() + return dstd.Splice(snap) +} + +func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path string) error { + maybeEncode := func(s string) string { + for _, c := range s { + if c > '\u007f' || c < ' ' { + return mime.BEncoding.Encode("UTF-8", s) + } + } + return s + } + path = strings.TrimSuffix(path, "/") + var props map[string]interface{} + for { + fi, err := fs.Stat(path) + if err != nil { + return err + } + switch src := fi.Sys().(type) { + case *arvados.Collection: + props = src.Properties + case *arvados.Group: + props = src.Properties + default: + if err, ok := src.(error); ok { + return err + } + // Try parent + cut := strings.LastIndexByte(path, '/') + if cut < 0 { + return nil + } + path = path[:cut] + continue + } + break + } + for k, v := range props { + if !validMIMEHeaderKey(k) { + continue + } + k = "x-amz-meta-" + k + if s, ok := v.(string); ok { + header.Set(k, maybeEncode(s)) + } else if j, err := json.Marshal(v); err == nil { + header.Set(k, maybeEncode(string(j))) + } + } + return nil +} + +func validMIMEHeaderKey(k string) bool { + check := "z-" + k + return check != textproto.CanonicalMIMEHeaderKey(check) +} + // Call fn on the given path (directory) and its contents, in // lexicographic order. // @@ -675,6 +773,9 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request, http.Error(w, "invalid continuation token", http.StatusBadRequest) return } + // marker and start-after perform the same function, + // but we keep them separate so we can repeat them + // back to the client in the response. params.marker = string(marker) params.startAfter = r.FormValue("start-after") switch r.FormValue("encoding-type") { @@ -686,9 +787,17 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request, return } } else { + // marker is functionally equivalent to start-after. params.marker = r.FormValue("marker") } + // startAfter is params.marker or params.startAfter, whichever + // comes last. + startAfter := params.startAfter + if startAfter < params.marker { + startAfter = params.marker + } + bucketdir := "by_id/" + bucket // walkpath is the directory (relative to bucketdir) we need // to walk: the innermost directory that is guaranteed to @@ -712,9 +821,15 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request, ContinuationToken: r.FormValue("continuation-token"), StartAfter: params.startAfter, } + + // nextMarker will be the last path we add to either + // resp.Contents or commonPrefixes. It will be included in + // the response as NextMarker or NextContinuationToken if + // needed. nextMarker := "" commonPrefixes := map[string]bool{} + full := false err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), true, func(path string, fi os.FileInfo) error { if path == bucketdir { return nil @@ -725,38 +840,31 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request, path += "/" filesize = 0 } - if len(path) <= len(params.prefix) { - if path > params.prefix[:len(path)] { - // with prefix "foobar", walking "fooz" means we're done - return errDone - } - if path < params.prefix[:len(path)] { - // with prefix "foobar", walking "foobag" is pointless - return filepath.SkipDir - } - if fi.IsDir() && !strings.HasPrefix(params.prefix+"/", path) { - // with prefix "foo/bar", walking "fo" - // is pointless (but walking "foo" or - // "foo/bar" is necessary) - return filepath.SkipDir - } - if len(path) < len(params.prefix) { - // can't skip anything, and this entry - // isn't in the results, so just - // continue descent - return nil - } - } else { - if path[:len(params.prefix)] > params.prefix { - // with prefix "foobar", nothing we - // see after "foozzz" is relevant - return errDone - } - } - if path < params.marker || path < params.prefix || path <= params.startAfter { + if strings.HasPrefix(params.prefix, path) && params.prefix != path { + // Descend into subtree until we reach desired prefix + return nil + } else if path < params.prefix { + // Not an ancestor or descendant of desired + // prefix, therefore none of its descendants + // can be either -- skip + return filepath.SkipDir + } else if path > params.prefix && !strings.HasPrefix(path, params.prefix) { + // We must have traversed everything under + // desired prefix + return errDone + } else if path == startAfter { + // Skip startAfter itself, just descend into + // subtree + return nil + } else if strings.HasPrefix(startAfter, path) { + // Descend into subtree in case it contains + // something after startAfter return nil + } else if path < startAfter { + // Skip ahead until we reach startAfter + return filepath.SkipDir } - if fi.IsDir() && !h.Config.cluster.Collections.S3FolderObjects { + if fi.IsDir() && !h.Cluster.Collections.S3FolderObjects { // Note we don't add anything to // commonPrefixes here even if delimiter is // "/". We descend into the directory, and @@ -764,13 +872,6 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request, // finding a regular file inside it. return nil } - if len(resp.Contents)+len(commonPrefixes) >= params.maxKeys { - resp.IsTruncated = true - if params.delimiter != "" || params.v2 { - nextMarker = path - } - return errDone - } if params.delimiter != "" { idx := strings.Index(path[len(params.prefix):], params.delimiter) if idx >= 0 { @@ -778,21 +879,42 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request, // "z", when we hit "foobar/baz", we // add "/baz" to commonPrefixes and // stop descending. - commonPrefixes[path[:len(params.prefix)+idx+1]] = true - return filepath.SkipDir + prefix := path[:len(params.prefix)+idx+1] + if prefix == startAfter { + return nil + } else if prefix < startAfter && !strings.HasPrefix(startAfter, prefix) { + return nil + } else if full { + resp.IsTruncated = true + return errDone + } else { + commonPrefixes[prefix] = true + nextMarker = prefix + full = len(resp.Contents)+len(commonPrefixes) >= params.maxKeys + return filepath.SkipDir + } } } - resp.Contents = append(resp.Contents, s3.Key{ + if full { + resp.IsTruncated = true + return errDone + } + resp.Contents = append(resp.Contents, s3Key{ Key: path, LastModified: fi.ModTime().UTC().Format("2006-01-02T15:04:05.999") + "Z", Size: filesize, }) + nextMarker = path + full = len(resp.Contents)+len(commonPrefixes) >= params.maxKeys return nil }) if err != nil && err != errDone { http.Error(w, err.Error(), http.StatusInternalServerError) return } + if params.delimiter == "" && !params.v2 || !resp.IsTruncated { + nextMarker = "" + } if params.delimiter != "" { resp.CommonPrefixes = make([]commonPrefix, 0, len(commonPrefixes)) for prefix := range commonPrefixes { @@ -846,15 +968,13 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request, CommonPrefixes: resp.CommonPrefixes, NextMarker: nextMarker, KeyCount: resp.KeyCount, - ListResp: s3.ListResp{ - IsTruncated: resp.IsTruncated, - Name: bucket, - Prefix: params.prefix, - Delimiter: params.delimiter, - Marker: params.marker, - MaxKeys: params.maxKeys, - Contents: resp.Contents, - }, + IsTruncated: resp.IsTruncated, + Name: bucket, + Prefix: params.prefix, + Delimiter: params.delimiter, + Marker: params.marker, + MaxKeys: params.maxKeys, + Contents: resp.Contents, } }