21697: Don't hold session lock while processing read requests.
[arvados.git] / services / keep-web / handler.go
index a321fbc00a08a95118ddc2e587873ccaccc96a5b..4fae0aee5f1e1fec81186f5206d3cd06ea41a4ea 100644 (file)
@@ -18,23 +18,27 @@ import (
        "strconv"
        "strings"
        "sync"
+       "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/webdavfs"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/auth"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
-       "git.arvados.org/arvados.git/sdk/go/keepclient"
        "github.com/sirupsen/logrus"
        "golang.org/x/net/webdav"
 )
 
 type handler struct {
-       Cache     cache
-       Cluster   *arvados.Cluster
-       setupOnce sync.Once
-       webdavLS  webdav.LockSystem
+       Cache   cache
+       Cluster *arvados.Cluster
+       metrics *metrics
+
+       lockMtx    sync.Mutex
+       lock       map[string]*sync.RWMutex
+       lockTidied time.Time
 }
 
 var urlPDHDecoder = strings.NewReplacer(" ", "+", "-", "+")
@@ -55,14 +59,6 @@ func parseCollectionIDFromURL(s string) string {
        return ""
 }
 
-func (h *handler) setup() {
-       keepclient.DefaultBlockCache.MaxBlocks = h.Cluster.Collections.WebDAVCache.MaxBlockEntries
-
-       // Even though we don't accept LOCK requests, every webdav
-       // handler must have a non-nil LockSystem.
-       h.webdavLS = &noLockSystem{}
-}
-
 func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
        json.NewEncoder(w).Encode(struct{ Version string }{cmd.Version.String()})
 }
@@ -178,23 +174,18 @@ func (h *handler) Done() <-chan struct{} {
 
 // ServeHTTP implements http.Handler.
 func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
-       h.setupOnce.Do(h.setup)
-
        if xfp := r.Header.Get("X-Forwarded-Proto"); xfp != "" && xfp != "http" {
                r.URL.Scheme = xfp
        }
 
-       w := httpserver.WrapResponseWriter(wOrig)
+       wbuffer := newWriteBuffer(wOrig, int(h.Cluster.Collections.WebDAVOutputBuffer))
+       defer wbuffer.Close()
+       w := httpserver.WrapResponseWriter(responseWriter{
+               Writer:         wbuffer,
+               ResponseWriter: wOrig,
+       })
 
-       if method := r.Header.Get("Access-Control-Request-Method"); method != "" && r.Method == "OPTIONS" {
-               if !browserMethod[method] && !webdavMethod[method] {
-                       w.WriteHeader(http.StatusMethodNotAllowed)
-                       return
-               }
-               w.Header().Set("Access-Control-Allow-Headers", corsAllowHeadersHeader)
-               w.Header().Set("Access-Control-Allow-Methods", "COPY, DELETE, GET, LOCK, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PROPPATCH, PUT, RMCOL, UNLOCK")
-               w.Header().Set("Access-Control-Allow-Origin", "*")
-               w.Header().Set("Access-Control-Max-Age", "86400")
+       if r.Method == "OPTIONS" && ServeCORSPreflight(w, r.Header) {
                return
        }
 
@@ -217,7 +208,26 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                return
        }
 
-       pathParts := strings.Split(r.URL.Path[1:], "/")
+       webdavPrefix := ""
+       arvPath := r.URL.Path
+       if prefix := r.Header.Get("X-Webdav-Prefix"); prefix != "" {
+               // Enable a proxy (e.g., container log handler in
+               // controller) to satisfy a request for path
+               // "/foo/bar/baz.txt" using content from
+               // "//abc123-4.internal/bar/baz.txt", by adding a
+               // request header "X-Webdav-Prefix: /foo"
+               if !strings.HasPrefix(arvPath, prefix) {
+                       http.Error(w, "X-Webdav-Prefix header is not a prefix of the requested path", http.StatusBadRequest)
+                       return
+               }
+               arvPath = r.URL.Path[len(prefix):]
+               if arvPath == "" {
+                       arvPath = "/"
+               }
+               w.Header().Set("Vary", "X-Webdav-Prefix, "+w.Header().Get("Vary"))
+               webdavPrefix = prefix
+       }
+       pathParts := strings.Split(arvPath[1:], "/")
 
        var stripParts int
        var collectionID string
@@ -281,12 +291,18 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                reqTokens = auth.CredentialsFromRequest(r).Tokens
        }
 
-       formToken := r.FormValue("api_token")
+       r.ParseForm()
        origin := r.Header.Get("Origin")
        cors := origin != "" && !strings.HasSuffix(origin, "://"+r.Host)
        safeAjax := cors && (r.Method == http.MethodGet || r.Method == http.MethodHead)
-       safeAttachment := attachment && r.URL.Query().Get("api_token") == ""
-       if formToken == "" {
+       // Important distinction: safeAttachment checks whether api_token exists
+       // as a query parameter. haveFormTokens checks whether api_token exists
+       // as request form data *or* a query parameter. Different checks are
+       // necessary because both the request disposition and the location of
+       // the API token affect whether or not the request needs to be
+       // redirected. The different branch comments below explain further.
+       safeAttachment := attachment && !r.URL.Query().Has("api_token")
+       if formTokens, haveFormTokens := r.Form["api_token"]; !haveFormTokens {
                // No token to use or redact.
        } else if safeAjax || safeAttachment {
                // If this is a cross-origin request, the URL won't
@@ -301,7 +317,9 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                // form?" problem, so provided the token isn't
                // embedded in the URL, there's no reason to do
                // redirect-with-cookie in this case either.
-               reqTokens = append(reqTokens, formToken)
+               for _, tok := range formTokens {
+                       reqTokens = append(reqTokens, tok)
+               }
        } else if browserMethod[r.Method] {
                // If this is a page view, and the client provided a
                // token via query string or POST body, we must put
@@ -329,7 +347,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        fsprefix := ""
        if useSiteFS {
                if writeMethod[r.Method] {
-                       http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
+                       http.Error(w, webdavfs.ErrReadOnly.Error(), http.StatusMethodNotAllowed)
                        return
                }
                if len(reqTokens) == 0 {
@@ -345,6 +363,10 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                fsprefix = "by_id/" + collectionID + "/"
        }
 
+       if src := r.Header.Get("X-Webdav-Source"); strings.HasPrefix(src, "/") && !strings.Contains(src, "//") && !strings.Contains(src, "/../") {
+               fsprefix += src[1:]
+       }
+
        if tokens == nil {
                tokens = reqTokens
                if h.Cluster.Users.AnonymousUserToken != "" {
@@ -395,13 +417,16 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        // collection id is outside scope of supplied
                        // token
                        tokenScopeProblem = true
+                       sess.Release()
                        continue
                } else if os.IsNotExist(err) {
                        // collection does not exist or is not
                        // readable using this token
+                       sess.Release()
                        continue
                } else if err != nil {
                        http.Error(w, err.Error(), http.StatusInternalServerError)
+                       sess.Release()
                        return
                }
                defer f.Close()
@@ -409,6 +434,14 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                collectionDir, sessionFS, session, tokenUser = f, fs, sess, user
                break
        }
+
+       releaseSession := func() {}
+       if session != nil {
+               var releaseSessionOnce sync.Once
+               releaseSession = func() { releaseSessionOnce.Do(func() { session.Release() }) }
+       }
+       defer releaseSession()
+
        if forceReload && collectionDir != nil {
                err := collectionDir.Sync()
                if err != nil {
@@ -496,6 +529,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        if r.Method == http.MethodGet || r.Method == http.MethodHead {
                targetfnm := fsprefix + strings.Join(pathParts[stripParts:], "/")
                if fi, err := sessionFS.Stat(targetfnm); err == nil && fi.IsDir() {
+                       releaseSession() // because we won't be writing anything
                        if !strings.HasSuffix(r.URL.Path, "/") {
                                h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
                        } else {
@@ -510,7 +544,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                basename = targetPath[len(targetPath)-1]
        }
        if arvadosclient.PDHMatch(collectionID) && writeMethod[r.Method] {
-               http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
+               http.Error(w, webdavfs.ErrReadOnly.Error(), http.StatusMethodNotAllowed)
                return
        }
        if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
@@ -519,7 +553,11 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
        h.logUploadOrDownload(r, session.arvadosclient, sessionFS, fsprefix+strings.Join(targetPath, "/"), nil, tokenUser)
 
-       if writeMethod[r.Method] {
+       writing := writeMethod[r.Method]
+       locker := h.collectionLock(collectionID, writing)
+       defer locker.Unlock()
+
+       if writing {
                // Save the collection only if/when all
                // webdav->filesystem operations succeed --
                // and send a 500 error if the modified
@@ -561,26 +599,38 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                                collectionDir.Splice(snap)
                                return nil
                        }}
+       } else {
+               // When writing, we need to block session renewal
+               // until we're finished, in order to guarantee the
+               // effect of the write is visible in future responses.
+               // But if we're not writing, we can release the lcok
+               // early.  This enables us to keep renewing sessions
+               // and processing more requests even if a slow client
+               // takes a long time to download a large file.
+               releaseSession()
        }
        if r.Method == http.MethodGet {
                applyContentDispositionHdr(w, r, basename, attachment)
        }
-       wh := webdav.Handler{
-               Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
-               FileSystem: &webdavFS{
-                       collfs:        sessionFS,
-                       prefix:        fsprefix,
-                       writing:       writeMethod[r.Method],
-                       alwaysReadEOF: r.Method == "PROPFIND",
+       if webdavPrefix == "" {
+               webdavPrefix = "/" + strings.Join(pathParts[:stripParts], "/")
+       }
+       wh := &webdav.Handler{
+               Prefix: webdavPrefix,
+               FileSystem: &webdavfs.FS{
+                       FileSystem:    sessionFS,
+                       Prefix:        fsprefix,
+                       Writing:       writeMethod[r.Method],
+                       AlwaysReadEOF: r.Method == "PROPFIND",
                },
-               LockSystem: h.webdavLS,
+               LockSystem: webdavfs.NoLockSystem,
                Logger: func(r *http.Request, err error) {
-                       if err != nil {
+                       if err != nil && !os.IsNotExist(err) {
                                ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
                        }
                },
        }
-       wh.ServeHTTP(w, r)
+       h.metrics.track(wh, w, r)
        if r.Method == http.MethodGet && w.WroteStatus() == http.StatusOK {
                wrote := int64(w.WroteBodyBytes())
                fnm := strings.Join(pathParts[stripParts:], "/")
@@ -749,7 +799,7 @@ func applyContentDispositionHdr(w http.ResponseWriter, r *http.Request, filename
 }
 
 func (h *handler) seeOtherWithCookie(w http.ResponseWriter, r *http.Request, location string, credentialsOK bool) {
-       if formToken := r.FormValue("api_token"); formToken != "" {
+       if formTokens, haveFormTokens := r.Form["api_token"]; haveFormTokens {
                if !credentialsOK {
                        // It is not safe to copy the provided token
                        // into a cookie unless the current vhost
@@ -770,13 +820,19 @@ func (h *handler) seeOtherWithCookie(w http.ResponseWriter, r *http.Request, loc
                // bar, and in the case of a POST request to avoid
                // raising warnings when the user refreshes the
                // resulting page.
-               http.SetCookie(w, &http.Cookie{
-                       Name:     "arvados_api_token",
-                       Value:    auth.EncodeTokenCookie([]byte(formToken)),
-                       Path:     "/",
-                       HttpOnly: true,
-                       SameSite: http.SameSiteLaxMode,
-               })
+               for _, tok := range formTokens {
+                       if tok == "" {
+                               continue
+                       }
+                       http.SetCookie(w, &http.Cookie{
+                               Name:     "arvados_api_token",
+                               Value:    auth.EncodeTokenCookie([]byte(tok)),
+                               Path:     "/",
+                               HttpOnly: true,
+                               SameSite: http.SameSiteLaxMode,
+                       })
+                       break
+               }
        }
 
        // Propagate query parameters (except api_token) from
@@ -927,3 +983,54 @@ func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string)
        }
        return nil, ""
 }
+
+var lockTidyInterval = time.Minute * 10
+
+// Lock the specified collection for reading or writing. Caller must
+// call Unlock() on the returned Locker when the operation is
+// finished.
+func (h *handler) collectionLock(collectionID string, writing bool) sync.Locker {
+       h.lockMtx.Lock()
+       defer h.lockMtx.Unlock()
+       if time.Since(h.lockTidied) > lockTidyInterval {
+               // Periodically delete all locks that aren't in use.
+               h.lockTidied = time.Now()
+               for id, locker := range h.lock {
+                       if locker.TryLock() {
+                               locker.Unlock()
+                               delete(h.lock, id)
+                       }
+               }
+       }
+       locker := h.lock[collectionID]
+       if locker == nil {
+               locker = new(sync.RWMutex)
+               if h.lock == nil {
+                       h.lock = map[string]*sync.RWMutex{}
+               }
+               h.lock[collectionID] = locker
+       }
+       if writing {
+               locker.Lock()
+               return locker
+       } else {
+               locker.RLock()
+               return locker.RLocker()
+       }
+}
+
+func ServeCORSPreflight(w http.ResponseWriter, header http.Header) bool {
+       method := header.Get("Access-Control-Request-Method")
+       if method == "" {
+               return false
+       }
+       if !browserMethod[method] && !webdavMethod[method] {
+               w.WriteHeader(http.StatusMethodNotAllowed)
+               return true
+       }
+       w.Header().Set("Access-Control-Allow-Headers", corsAllowHeadersHeader)
+       w.Header().Set("Access-Control-Allow-Methods", "COPY, DELETE, GET, LOCK, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PROPPATCH, PUT, RMCOL, UNLOCK")
+       w.Header().Set("Access-Control-Allow-Origin", "*")
+       w.Header().Set("Access-Control-Max-Age", "86400")
+       return true
+}