X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c502c5a50aae825683ee4cff629c6839a4209501..25ded0f5c0b44dfcc97f945331487abf07a91feb:/services/keep-web/s3.go diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go index 4117dafbc6..f98efd8fdf 100644 --- a/services/keep-web/s3.go +++ b/services/keep-web/s3.go @@ -14,6 +14,7 @@ import ( "fmt" "hash" "io" + "mime" "net/http" "net/textproto" "net/url" @@ -26,9 +27,7 @@ import ( "time" "git.arvados.org/arvados.git/sdk/go/arvados" - "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/ctxlog" - "git.arvados.org/arvados.git/sdk/go/keepclient" "github.com/AdRoll/goamz/s3" ) @@ -311,33 +310,18 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return false } - var err error - var fs arvados.CustomFileSystem - var arvclient *arvadosclient.ArvadosClient - if r.Method == http.MethodGet || r.Method == http.MethodHead { - // Use a single session (cached FileSystem) across - // multiple read requests. - var sess *cachedSession - fs, sess, err = h.Cache.GetSession(token) - if err != nil { - s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) - return true - } - arvclient = sess.arvadosclient - } else { + fs, sess, tokenUser, err := h.Cache.GetSession(token) + if err != nil { + s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) + return true + } + readfs := fs + if writeMethod[r.Method] { // Create a FileSystem for this request, to avoid // exposing incomplete write operations to concurrent // requests. - var kc *keepclient.KeepClient - var release func() - var client *arvados.Client - arvclient, kc, client, release, err = h.getClients(r.Header.Get("X-Request-Id"), token) - if err != nil { - s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) - return true - } - defer release() - fs = client.SiteFileSystem(kc) + client := sess.client.WithRequestID(r.Header.Get("X-Request-Id")) + fs = client.SiteFileSystem(sess.keepclient) fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution) } @@ -387,7 +371,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { if r.Method == "HEAD" && !objectNameGiven { // HeadBucket if err == nil && fi.IsDir() { - setFileInfoHeaders(w.Header(), fs, fspath) + err = setFileInfoHeaders(w.Header(), fs, fspath) + if err != nil { + s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusBadGateway) + return true + } w.WriteHeader(http.StatusOK) } else if os.IsNotExist(err) { s3ErrorResponse(w, NoSuchBucket, "The specified bucket does not exist.", r.URL.Path, http.StatusNotFound) @@ -397,7 +385,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return true } if err == nil && fi.IsDir() && objectNameGiven && strings.HasSuffix(fspath, "/") && h.Cluster.Collections.S3FolderObjects { - setFileInfoHeaders(w.Header(), fs, fspath) + err = setFileInfoHeaders(w.Header(), fs, fspath) + if err != nil { + s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusBadGateway) + return true + } w.Header().Set("Content-Type", "application/x-directory") w.WriteHeader(http.StatusOK) return true @@ -409,17 +401,20 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return true } - tokenUser, err := h.Cache.GetTokenUser(token) if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) { http.Error(w, "Not permitted", http.StatusForbidden) return true } - h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser) + h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser) // shallow copy r, and change URL path r := *r r.URL.Path = fspath - setFileInfoHeaders(w.Header(), fs, fspath) + err = setFileInfoHeaders(w.Header(), fs, fspath) + if err != nil { + s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusBadGateway) + return true + } http.FileServer(fs).ServeHTTP(w, &r) return true case r.Method == http.MethodPut: @@ -501,12 +496,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { } defer f.Close() - tokenUser, err := h.Cache.GetTokenUser(token) if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) { http.Error(w, "Not permitted", http.StatusForbidden) return true } - h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser) + h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser) _, err = io.Copy(f, r.Body) if err != nil { @@ -521,14 +515,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { return true } } - err = fs.Sync() + err = h.syncCollection(fs, readfs, fspath) if err != nil { err = fmt.Errorf("sync failed: %w", err) s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) return true } - // Ensure a subsequent read operation will see the changes. - h.Cache.ResetSession(token) w.WriteHeader(http.StatusOK) return true case r.Method == http.MethodDelete: @@ -575,14 +567,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { s3ErrorResponse(w, InvalidArgument, err.Error(), r.URL.Path, http.StatusBadRequest) return true } - err = fs.Sync() + err = h.syncCollection(fs, readfs, fspath) if err != nil { err = fmt.Errorf("sync failed: %w", err) s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError) return true } - // Ensure a subsequent read operation will see the changes. - h.Cache.ResetSession(token) w.WriteHeader(http.StatusNoContent) return true default: @@ -591,13 +581,49 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool { } } -func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path string) { +// Save modifications to the indicated collection in srcfs, then (if +// successful) ensure they are also reflected in dstfs. +func (h *handler) syncCollection(srcfs, dstfs arvados.CustomFileSystem, path string) error { + coll, _ := h.determineCollection(srcfs, path) + if coll == nil || coll.UUID == "" { + return errors.New("could not determine collection to sync") + } + d, err := srcfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777) + if err != nil { + return err + } + defer d.Close() + err = d.Sync() + if err != nil { + return err + } + snap, err := d.Snapshot() + if err != nil { + return err + } + dstd, err := dstfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777) + if err != nil { + return err + } + defer dstd.Close() + return dstd.Splice(snap) +} + +func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path string) error { + maybeEncode := func(s string) string { + for _, c := range s { + if c > '\u007f' || c < ' ' { + return mime.BEncoding.Encode("UTF-8", s) + } + } + return s + } path = strings.TrimSuffix(path, "/") var props map[string]interface{} for { fi, err := fs.Stat(path) if err != nil { - return + return err } switch src := fi.Sys().(type) { case *arvados.Collection: @@ -605,10 +631,13 @@ func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path st case *arvados.Group: props = src.Properties default: + if err, ok := src.(error); ok { + return err + } // Try parent cut := strings.LastIndexByte(path, '/') if cut < 0 { - return + return nil } path = path[:cut] continue @@ -621,11 +650,12 @@ func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path st } k = "x-amz-meta-" + k if s, ok := v.(string); ok { - header.Set(k, s) + header.Set(k, maybeEncode(s)) } else if j, err := json.Marshal(v); err == nil { - header.Set(k, string(j)) + header.Set(k, maybeEncode(string(j))) } } + return nil } func validMIMEHeaderKey(k string) bool {