Merge branch '21217-invalid-auth-header'
[arvados.git] / services / keep-web / s3.go
index 38428cdab1cb0a54775481ee9c45c977acab8af0..3e60f3006db843ee108bb17effe9392e0142c0e5 100644 (file)
@@ -335,6 +335,7 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                return true
        }
+       defer sess.Release()
        readfs := fs
        if writeMethod[r.Method] {
                // Create a FileSystem for this request, to avoid
@@ -772,6 +773,9 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        http.Error(w, "invalid continuation token", http.StatusBadRequest)
                        return
                }
+               // marker and start-after perform the same function,
+               // but we keep them separate so we can repeat them
+               // back to the client in the response.
                params.marker = string(marker)
                params.startAfter = r.FormValue("start-after")
                switch r.FormValue("encoding-type") {
@@ -783,9 +787,17 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        return
                }
        } else {
+               // marker is functionally equivalent to start-after.
                params.marker = r.FormValue("marker")
        }
 
+       // startAfter is params.marker or params.startAfter, whichever
+       // comes last.
+       startAfter := params.startAfter
+       if startAfter < params.marker {
+               startAfter = params.marker
+       }
+
        bucketdir := "by_id/" + bucket
        // walkpath is the directory (relative to bucketdir) we need
        // to walk: the innermost directory that is guaranteed to
@@ -809,9 +821,15 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                ContinuationToken: r.FormValue("continuation-token"),
                StartAfter:        params.startAfter,
        }
+
+       // nextMarker will be the last path we add to either
+       // resp.Contents or commonPrefixes.  It will be included in
+       // the response as NextMarker or NextContinuationToken if
+       // needed.
        nextMarker := ""
 
        commonPrefixes := map[string]bool{}
+       full := false
        err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), true, func(path string, fi os.FileInfo) error {
                if path == bucketdir {
                        return nil
@@ -822,36 +840,29 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        path += "/"
                        filesize = 0
                }
-               if len(path) <= len(params.prefix) {
-                       if path > params.prefix[:len(path)] {
-                               // with prefix "foobar", walking "fooz" means we're done
-                               return errDone
-                       }
-                       if path < params.prefix[:len(path)] {
-                               // with prefix "foobar", walking "foobag" is pointless
-                               return filepath.SkipDir
-                       }
-                       if fi.IsDir() && !strings.HasPrefix(params.prefix+"/", path) {
-                               // with prefix "foo/bar", walking "fo"
-                               // is pointless (but walking "foo" or
-                               // "foo/bar" is necessary)
-                               return filepath.SkipDir
-                       }
-                       if len(path) < len(params.prefix) {
-                               // can't skip anything, and this entry
-                               // isn't in the results, so just
-                               // continue descent
-                               return nil
-                       }
-               } else {
-                       if path[:len(params.prefix)] > params.prefix {
-                               // with prefix "foobar", nothing we
-                               // see after "foozzz" is relevant
-                               return errDone
-                       }
-               }
-               if path < params.marker || path < params.prefix || path <= params.startAfter {
+               if strings.HasPrefix(params.prefix, path) && params.prefix != path {
+                       // Descend into subtree until we reach desired prefix
                        return nil
+               } else if path < params.prefix {
+                       // Not an ancestor or descendant of desired
+                       // prefix, therefore none of its descendants
+                       // can be either -- skip
+                       return filepath.SkipDir
+               } else if path > params.prefix && !strings.HasPrefix(path, params.prefix) {
+                       // We must have traversed everything under
+                       // desired prefix
+                       return errDone
+               } else if path == startAfter {
+                       // Skip startAfter itself, just descend into
+                       // subtree
+                       return nil
+               } else if strings.HasPrefix(startAfter, path) {
+                       // Descend into subtree in case it contains
+                       // something after startAfter
+                       return nil
+               } else if path < startAfter {
+                       // Skip ahead until we reach startAfter
+                       return filepath.SkipDir
                }
                if fi.IsDir() && !h.Cluster.Collections.S3FolderObjects {
                        // Note we don't add anything to
@@ -861,13 +872,6 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                        // finding a regular file inside it.
                        return nil
                }
-               if len(resp.Contents)+len(commonPrefixes) >= params.maxKeys {
-                       resp.IsTruncated = true
-                       if params.delimiter != "" || params.v2 {
-                               nextMarker = path
-                       }
-                       return errDone
-               }
                if params.delimiter != "" {
                        idx := strings.Index(path[len(params.prefix):], params.delimiter)
                        if idx >= 0 {
@@ -875,21 +879,42 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request,
                                // "z", when we hit "foobar/baz", we
                                // add "/baz" to commonPrefixes and
                                // stop descending.
-                               commonPrefixes[path[:len(params.prefix)+idx+1]] = true
-                               return filepath.SkipDir
+                               prefix := path[:len(params.prefix)+idx+1]
+                               if prefix == startAfter {
+                                       return nil
+                               } else if prefix < startAfter && !strings.HasPrefix(startAfter, prefix) {
+                                       return nil
+                               } else if full {
+                                       resp.IsTruncated = true
+                                       return errDone
+                               } else {
+                                       commonPrefixes[prefix] = true
+                                       nextMarker = prefix
+                                       full = len(resp.Contents)+len(commonPrefixes) >= params.maxKeys
+                                       return filepath.SkipDir
+                               }
                        }
                }
+               if full {
+                       resp.IsTruncated = true
+                       return errDone
+               }
                resp.Contents = append(resp.Contents, s3Key{
                        Key:          path,
                        LastModified: fi.ModTime().UTC().Format("2006-01-02T15:04:05.999") + "Z",
                        Size:         filesize,
                })
+               nextMarker = path
+               full = len(resp.Contents)+len(commonPrefixes) >= params.maxKeys
                return nil
        })
        if err != nil && err != errDone {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
+       if params.delimiter == "" && !params.v2 || !resp.IsTruncated {
+               nextMarker = ""
+       }
        if params.delimiter != "" {
                resp.CommonPrefixes = make([]commonPrefix, 0, len(commonPrefixes))
                for prefix := range commonPrefixes {