19362: Fix unchecked error.
[arvados.git] / services / keep-web / s3.go
index 59ab3cd4389c0cc57f9a923983ee58e12bca5184..f98efd8fdfcdf39febe29e84610aea474ccda02d 100644 (file)
@@ -8,12 +8,15 @@ import (
        "crypto/hmac"
        "crypto/sha256"
        "encoding/base64"
+       "encoding/json"
        "encoding/xml"
        "errors"
        "fmt"
        "hash"
        "io"
+       "mime"
        "net/http"
+       "net/textproto"
        "net/url"
        "os"
        "path/filepath"
@@ -24,9 +27,7 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "git.arvados.org/arvados.git/sdk/go/keepclient"
        "github.com/AdRoll/goamz/s3"
 )
 
@@ -309,33 +310,18 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                return false
        }
 
-       var err error
-       var fs arvados.CustomFileSystem
-       var arvclient *arvadosclient.ArvadosClient
-       if r.Method == http.MethodGet || r.Method == http.MethodHead {
-               // Use a single session (cached FileSystem) across
-               // multiple read requests.
-               var sess *cachedSession
-               fs, sess, err = h.Cache.GetSession(token)
-               if err != nil {
-                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
-                       return true
-               }
-               arvclient = sess.arvadosclient
-       } else {
+       fs, sess, tokenUser, err := h.Cache.GetSession(token)
+       if err != nil {
+               s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+               return true
+       }
+       readfs := fs
+       if writeMethod[r.Method] {
                // Create a FileSystem for this request, to avoid
                // exposing incomplete write operations to concurrent
                // requests.
-               var kc *keepclient.KeepClient
-               var release func()
-               var client *arvados.Client
-               arvclient, kc, client, release, err = h.getClients(r.Header.Get("X-Request-Id"), token)
-               if err != nil {
-                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
-                       return true
-               }
-               defer release()
-               fs = client.SiteFileSystem(kc)
+               client := sess.client.WithRequestID(r.Header.Get("X-Request-Id"))
+               fs = client.SiteFileSystem(sess.keepclient)
                fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
        }
 
@@ -385,6 +371,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                if r.Method == "HEAD" && !objectNameGiven {
                        // HeadBucket
                        if err == nil && fi.IsDir() {
+                               err = setFileInfoHeaders(w.Header(), fs, fspath)
+                               if err != nil {
+                                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusBadGateway)
+                                       return true
+                               }
                                w.WriteHeader(http.StatusOK)
                        } else if os.IsNotExist(err) {
                                s3ErrorResponse(w, NoSuchBucket, "The specified bucket does not exist.", r.URL.Path, http.StatusNotFound)
@@ -394,6 +385,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        return true
                }
                if err == nil && fi.IsDir() && objectNameGiven && strings.HasSuffix(fspath, "/") && h.Cluster.Collections.S3FolderObjects {
+                       err = setFileInfoHeaders(w.Header(), fs, fspath)
+                       if err != nil {
+                               s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusBadGateway)
+                               return true
+                       }
                        w.Header().Set("Content-Type", "application/x-directory")
                        w.WriteHeader(http.StatusOK)
                        return true
@@ -405,16 +401,20 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        return true
                }
 
-               tokenUser, err := h.Cache.GetTokenUser(token)
                if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
                        http.Error(w, "Not permitted", http.StatusForbidden)
                        return true
                }
-               h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+               h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
 
                // shallow copy r, and change URL path
                r := *r
                r.URL.Path = fspath
+               err = setFileInfoHeaders(w.Header(), fs, fspath)
+               if err != nil {
+                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusBadGateway)
+                       return true
+               }
                http.FileServer(fs).ServeHTTP(w, &r)
                return true
        case r.Method == http.MethodPut:
@@ -496,12 +496,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        }
                        defer f.Close()
 
-                       tokenUser, err := h.Cache.GetTokenUser(token)
                        if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
                                http.Error(w, "Not permitted", http.StatusForbidden)
                                return true
                        }
-                       h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+                       h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
 
                        _, err = io.Copy(f, r.Body)
                        if err != nil {
@@ -516,14 +515,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                                return true
                        }
                }
-               err = fs.Sync()
+               err = h.syncCollection(fs, readfs, fspath)
                if err != nil {
                        err = fmt.Errorf("sync failed: %w", err)
                        s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                        return true
                }
-               // Ensure a subsequent read operation will see the changes.
-               h.Cache.ResetSession(token)
                w.WriteHeader(http.StatusOK)
                return true
        case r.Method == http.MethodDelete:
@@ -570,14 +567,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        s3ErrorResponse(w, InvalidArgument, err.Error(), r.URL.Path, http.StatusBadRequest)
                        return true
                }
-               err = fs.Sync()
+               err = h.syncCollection(fs, readfs, fspath)
                if err != nil {
                        err = fmt.Errorf("sync failed: %w", err)
                        s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                        return true
                }
-               // Ensure a subsequent read operation will see the changes.
-               h.Cache.ResetSession(token)
                w.WriteHeader(http.StatusNoContent)
                return true
        default:
@@ -586,6 +581,88 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
        }
 }
 
+// Save modifications to the indicated collection in srcfs, then (if
+// successful) ensure they are also reflected in dstfs.
+func (h *handler) syncCollection(srcfs, dstfs arvados.CustomFileSystem, path string) error {
+       coll, _ := h.determineCollection(srcfs, path)
+       if coll == nil || coll.UUID == "" {
+               return errors.New("could not determine collection to sync")
+       }
+       d, err := srcfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777)
+       if err != nil {
+               return err
+       }
+       defer d.Close()
+       err = d.Sync()
+       if err != nil {
+               return err
+       }
+       snap, err := d.Snapshot()
+       if err != nil {
+               return err
+       }
+       dstd, err := dstfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777)
+       if err != nil {
+               return err
+       }
+       defer dstd.Close()
+       return dstd.Splice(snap)
+}
+
+func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path string) error {
+       maybeEncode := func(s string) string {
+               for _, c := range s {
+                       if c > '\u007f' || c < ' ' {
+                               return mime.BEncoding.Encode("UTF-8", s)
+                       }
+               }
+               return s
+       }
+       path = strings.TrimSuffix(path, "/")
+       var props map[string]interface{}
+       for {
+               fi, err := fs.Stat(path)
+               if err != nil {
+                       return err
+               }
+               switch src := fi.Sys().(type) {
+               case *arvados.Collection:
+                       props = src.Properties
+               case *arvados.Group:
+                       props = src.Properties
+               default:
+                       if err, ok := src.(error); ok {
+                               return err
+                       }
+                       // Try parent
+                       cut := strings.LastIndexByte(path, '/')
+                       if cut < 0 {
+                               return nil
+                       }
+                       path = path[:cut]
+                       continue
+               }
+               break
+       }
+       for k, v := range props {
+               if !validMIMEHeaderKey(k) {
+                       continue
+               }
+               k = "x-amz-meta-" + k
+               if s, ok := v.(string); ok {
+                       header.Set(k, maybeEncode(s))
+               } else if j, err := json.Marshal(v); err == nil {
+                       header.Set(k, maybeEncode(string(j)))
+               }
+       }
+       return nil
+}
+
+func validMIMEHeaderKey(k string) bool {
+       check := "z-" + k
+       return check != textproto.CanonicalMIMEHeaderKey(check)
+}
+
 // Call fn on the given path (directory) and its contents, in
 // lexicographic order.
 //