"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
- "git.arvados.org/arvados.git/sdk/go/keepclient"
- "github.com/AdRoll/goamz/s3"
)
const (
}
type listV1Resp struct {
- XMLName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
- s3.ListResp
- // s3.ListResp marshals an empty tag when
- // CommonPrefixes is nil, which confuses some clients.
- // Fix by using this nested struct instead.
+ XMLName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
+ Name string
+ Prefix string
+ Delimiter string
+ Marker string
+ MaxKeys int
+ IsTruncated bool
+ Contents []s3Key
+ // If we use a []string here, xml marshals an empty tag when
+ // CommonPrefixes is nil, which confuses some clients. Fix by
+ // using this nested struct instead.
CommonPrefixes []commonPrefix
// Similarly, we need omitempty here, because an empty
// tag confuses some clients (e.g.,
type listV2Resp struct {
XMLName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
IsTruncated bool
- Contents []s3.Key
+ Contents []s3Key
Name string
Prefix string
Delimiter string
StartAfter string `xml:",omitempty"`
}
+type s3Key struct {
+ Key string
+ LastModified string
+ Size int64
+ // The following fields are not populated, but are here in
+ // case clients rely on the keys being present in xml
+ // responses.
+ ETag string
+ StorageClass string
+ Owner struct {
+ ID string
+ DisplayName string
+ }
+}
+
func hmacstring(msg string, key []byte) []byte {
h := hmac.New(sha256.New, key)
io.WriteString(h, msg)
return false
}
- var err error
- var fs arvados.CustomFileSystem
- var arvclient *arvadosclient.ArvadosClient
- if r.Method == http.MethodGet || r.Method == http.MethodHead {
- // Use a single session (cached FileSystem) across
- // multiple read requests.
- var sess *cachedSession
- fs, sess, err = h.Cache.GetSession(token)
- if err != nil {
- s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
- return true
- }
- arvclient = sess.arvadosclient
- } else {
+ fs, sess, tokenUser, err := h.Cache.GetSession(token)
+ if err != nil {
+ s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+ return true
+ }
+ defer sess.Release()
+ readfs := fs
+ if writeMethod[r.Method] {
// Create a FileSystem for this request, to avoid
// exposing incomplete write operations to concurrent
// requests.
- var kc *keepclient.KeepClient
- var release func()
- var client *arvados.Client
- arvclient, kc, client, release, err = h.getClients(r.Header.Get("X-Request-Id"), token)
- if err != nil {
- s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
- return true
- }
- defer release()
- fs = client.SiteFileSystem(kc)
+ client := sess.client.WithRequestID(r.Header.Get("X-Request-Id"))
+ fs = client.SiteFileSystem(sess.keepclient)
fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
}
return true
}
- tokenUser, err := h.Cache.GetTokenUser(token)
if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
http.Error(w, "Not permitted", http.StatusForbidden)
return true
}
- h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+ h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
// shallow copy r, and change URL path
r := *r
}
defer f.Close()
- tokenUser, err := h.Cache.GetTokenUser(token)
if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
http.Error(w, "Not permitted", http.StatusForbidden)
return true
}
- h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+ h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
_, err = io.Copy(f, r.Body)
if err != nil {
return true
}
}
- err = fs.Sync()
+ err = h.syncCollection(fs, readfs, fspath)
if err != nil {
err = fmt.Errorf("sync failed: %w", err)
s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
return true
}
- // Ensure a subsequent read operation will see the changes.
- h.Cache.ResetSession(token)
w.WriteHeader(http.StatusOK)
return true
case r.Method == http.MethodDelete:
s3ErrorResponse(w, InvalidArgument, err.Error(), r.URL.Path, http.StatusBadRequest)
return true
}
- err = fs.Sync()
+ err = h.syncCollection(fs, readfs, fspath)
if err != nil {
err = fmt.Errorf("sync failed: %w", err)
s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
return true
}
- // Ensure a subsequent read operation will see the changes.
- h.Cache.ResetSession(token)
w.WriteHeader(http.StatusNoContent)
return true
default:
}
}
+// Save modifications to the indicated collection in srcfs, then (if
+// successful) ensure they are also reflected in dstfs.
+func (h *handler) syncCollection(srcfs, dstfs arvados.CustomFileSystem, path string) error {
+ coll, _ := h.determineCollection(srcfs, path)
+ if coll == nil || coll.UUID == "" {
+ return errors.New("could not determine collection to sync")
+ }
+ d, err := srcfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777)
+ if err != nil {
+ return err
+ }
+ defer d.Close()
+ err = d.Sync()
+ if err != nil {
+ return err
+ }
+ snap, err := d.Snapshot()
+ if err != nil {
+ return err
+ }
+ dstd, err := dstfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777)
+ if err != nil {
+ return err
+ }
+ defer dstd.Close()
+ return dstd.Splice(snap)
+}
+
func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path string) error {
maybeEncode := func(s string) string {
for _, c := range s {
- if c > '\u007f' {
+ if c > '\u007f' || c < ' ' {
return mime.BEncoding.Encode("UTF-8", s)
}
}
http.Error(w, "invalid continuation token", http.StatusBadRequest)
return
}
+ // marker and start-after perform the same function,
+ // but we keep them separate so we can repeat them
+ // back to the client in the response.
params.marker = string(marker)
params.startAfter = r.FormValue("start-after")
switch r.FormValue("encoding-type") {
return
}
} else {
+ // marker is functionally equivalent to start-after.
params.marker = r.FormValue("marker")
}
+ // startAfter is params.marker or params.startAfter, whichever
+ // comes last.
+ startAfter := params.startAfter
+ if startAfter < params.marker {
+ startAfter = params.marker
+ }
+
bucketdir := "by_id/" + bucket
// walkpath is the directory (relative to bucketdir) we need
// to walk: the innermost directory that is guaranteed to
ContinuationToken: r.FormValue("continuation-token"),
StartAfter: params.startAfter,
}
+
+ // nextMarker will be the last path we add to either
+ // resp.Contents or commonPrefixes. It will be included in
+ // the response as NextMarker or NextContinuationToken if
+ // needed.
nextMarker := ""
commonPrefixes := map[string]bool{}
+ full := false
err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), true, func(path string, fi os.FileInfo) error {
if path == bucketdir {
return nil
path += "/"
filesize = 0
}
- if len(path) <= len(params.prefix) {
- if path > params.prefix[:len(path)] {
- // with prefix "foobar", walking "fooz" means we're done
- return errDone
- }
- if path < params.prefix[:len(path)] {
- // with prefix "foobar", walking "foobag" is pointless
- return filepath.SkipDir
- }
- if fi.IsDir() && !strings.HasPrefix(params.prefix+"/", path) {
- // with prefix "foo/bar", walking "fo"
- // is pointless (but walking "foo" or
- // "foo/bar" is necessary)
- return filepath.SkipDir
- }
- if len(path) < len(params.prefix) {
- // can't skip anything, and this entry
- // isn't in the results, so just
- // continue descent
- return nil
- }
- } else {
- if path[:len(params.prefix)] > params.prefix {
- // with prefix "foobar", nothing we
- // see after "foozzz" is relevant
- return errDone
- }
- }
- if path < params.marker || path < params.prefix || path <= params.startAfter {
+ if strings.HasPrefix(params.prefix, path) && params.prefix != path {
+ // Descend into subtree until we reach desired prefix
+ return nil
+ } else if path < params.prefix {
+ // Not an ancestor or descendant of desired
+ // prefix, therefore none of its descendants
+ // can be either -- skip
+ return filepath.SkipDir
+ } else if path > params.prefix && !strings.HasPrefix(path, params.prefix) {
+ // We must have traversed everything under
+ // desired prefix
+ return errDone
+ } else if path == startAfter {
+ // Skip startAfter itself, just descend into
+ // subtree
+ return nil
+ } else if strings.HasPrefix(startAfter, path) {
+ // Descend into subtree in case it contains
+ // something after startAfter
return nil
+ } else if path < startAfter {
+ // Skip ahead until we reach startAfter
+ return filepath.SkipDir
}
if fi.IsDir() && !h.Cluster.Collections.S3FolderObjects {
// Note we don't add anything to
// finding a regular file inside it.
return nil
}
- if len(resp.Contents)+len(commonPrefixes) >= params.maxKeys {
- resp.IsTruncated = true
- if params.delimiter != "" || params.v2 {
- nextMarker = path
- }
- return errDone
- }
if params.delimiter != "" {
idx := strings.Index(path[len(params.prefix):], params.delimiter)
if idx >= 0 {
// "z", when we hit "foobar/baz", we
// add "/baz" to commonPrefixes and
// stop descending.
- commonPrefixes[path[:len(params.prefix)+idx+1]] = true
- return filepath.SkipDir
+ prefix := path[:len(params.prefix)+idx+1]
+ if prefix == startAfter {
+ return nil
+ } else if prefix < startAfter && !strings.HasPrefix(startAfter, prefix) {
+ return nil
+ } else if full {
+ resp.IsTruncated = true
+ return errDone
+ } else {
+ commonPrefixes[prefix] = true
+ nextMarker = prefix
+ full = len(resp.Contents)+len(commonPrefixes) >= params.maxKeys
+ return filepath.SkipDir
+ }
}
}
- resp.Contents = append(resp.Contents, s3.Key{
+ if full {
+ resp.IsTruncated = true
+ return errDone
+ }
+ resp.Contents = append(resp.Contents, s3Key{
Key: path,
LastModified: fi.ModTime().UTC().Format("2006-01-02T15:04:05.999") + "Z",
Size: filesize,
})
+ nextMarker = path
+ full = len(resp.Contents)+len(commonPrefixes) >= params.maxKeys
return nil
})
if err != nil && err != errDone {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+ if params.delimiter == "" && !params.v2 || !resp.IsTruncated {
+ nextMarker = ""
+ }
if params.delimiter != "" {
resp.CommonPrefixes = make([]commonPrefix, 0, len(commonPrefixes))
for prefix := range commonPrefixes {
CommonPrefixes: resp.CommonPrefixes,
NextMarker: nextMarker,
KeyCount: resp.KeyCount,
- ListResp: s3.ListResp{
- IsTruncated: resp.IsTruncated,
- Name: bucket,
- Prefix: params.prefix,
- Delimiter: params.delimiter,
- Marker: params.marker,
- MaxKeys: params.maxKeys,
- Contents: resp.Contents,
- },
+ IsTruncated: resp.IsTruncated,
+ Name: bucket,
+ Prefix: params.prefix,
+ Delimiter: params.delimiter,
+ Marker: params.marker,
+ MaxKeys: params.maxKeys,
+ Contents: resp.Contents,
}
}