Merge branch '21654-express-upgrade'. Refs #21654
[arvados.git] / services / keepstore / keepstore.go
index c9a80230597c5eacc5bac7e66d48d82439420610..60d062e1e3467b113a137126296f983f969a9f01 100644 (file)
@@ -223,7 +223,7 @@ func (ks *keepstore) signLocator(token, locator string) string {
 }
 
 func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (n int, err error) {
-       li, err := parseLocator(opts.Locator)
+       li, err := getLocatorInfo(opts.Locator)
        if err != nil {
                return 0, err
        }
@@ -322,7 +322,10 @@ func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockRead
        }
        var remoteClient *keepclient.KeepClient
        var parts []string
-       var size int
+       li, err := getLocatorInfo(opts.Locator)
+       if err != nil {
+               return 0, err
+       }
        for i, part := range strings.Split(opts.Locator, "+") {
                switch {
                case i == 0:
@@ -344,8 +347,6 @@ func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockRead
                        }
                        remoteClient = kc
                        part = "A" + part[7:]
-               case len(part) > 0 && part[0] >= '0' && part[0] <= '9':
-                       size, _ = strconv.Atoi(part)
                }
                parts = append(parts, part)
        }
@@ -356,8 +357,8 @@ func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockRead
        if opts.LocalLocator == nil {
                // Read from remote cluster and stream response back
                // to caller
-               if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && size > 0 {
-                       rw.Header().Set("Content-Length", fmt.Sprintf("%d", size))
+               if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size > 0 {
+                       rw.Header().Set("Content-Length", fmt.Sprintf("%d", li.size))
                }
                return remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
                        Locator: locator,
@@ -611,7 +612,7 @@ func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) {
 }
 
 func (ks *keepstore) BlockUntrash(ctx context.Context, locator string) error {
-       li, err := parseLocator(locator)
+       li, err := getLocatorInfo(locator)
        if err != nil {
                return err
        }
@@ -631,7 +632,7 @@ func (ks *keepstore) BlockUntrash(ctx context.Context, locator string) error {
 }
 
 func (ks *keepstore) BlockTouch(ctx context.Context, locator string) error {
-       li, err := parseLocator(locator)
+       li, err := getLocatorInfo(locator)
        if err != nil {
                return err
        }
@@ -655,7 +656,7 @@ func (ks *keepstore) BlockTrash(ctx context.Context, locator string) error {
        if !ks.cluster.Collections.BlobTrash {
                return errMethodNotAllowed
        }
-       li, err := parseLocator(locator)
+       li, err := getLocatorInfo(locator)
        if err != nil {
                return err
        }
@@ -708,40 +709,55 @@ func ctxToken(ctx context.Context) string {
        }
 }
 
+// locatorInfo expresses the attributes of a locator that are relevant
+// for keepstore decision-making.
 type locatorInfo struct {
        hash   string
        size   int
-       remote bool
-       signed bool
+       remote bool // locator has a +R hint
+       signed bool // locator has a +A hint
 }
 
-func parseLocator(loc string) (locatorInfo, error) {
+func getLocatorInfo(loc string) (locatorInfo, error) {
        var li locatorInfo
-       for i, part := range strings.Split(loc, "+") {
-               if i == 0 {
-                       if len(part) != 32 {
+       plus := 0    // number of '+' chars seen so far
+       partlen := 0 // chars since last '+'
+       for i, c := range loc + "+" {
+               if c == '+' {
+                       if partlen == 0 {
+                               // double/leading/trailing '+'
                                return li, errInvalidLocator
                        }
-                       li.hash = part
+                       if plus == 0 {
+                               if i != 32 {
+                                       return li, errInvalidLocator
+                               }
+                               li.hash = loc[:i]
+                       }
+                       if plus == 1 {
+                               if size, err := strconv.Atoi(loc[i-partlen : i]); err == nil {
+                                       li.size = size
+                               }
+                       }
+                       plus++
+                       partlen = 0
                        continue
                }
-               if i == 1 {
-                       if size, err := strconv.Atoi(part); err == nil {
-                               li.size = size
-                               continue
+               partlen++
+               if partlen == 1 {
+                       if c == 'A' {
+                               li.signed = true
+                       }
+                       if c == 'R' {
+                               li.remote = true
+                       }
+                       if plus > 1 && c >= '0' && c <= '9' {
+                               // size, if present at all, must come first
+                               return li, errInvalidLocator
                        }
                }
-               if len(part) == 0 {
-                       return li, errInvalidLocator
-               }
-               if part[0] == 'A' {
-                       li.signed = true
-               }
-               if part[0] == 'R' {
-                       li.remote = true
-               }
-               if part[0] >= '0' && part[0] <= '9' {
-                       // size, if present at all, must come first
+               if plus == 0 && !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) {
+                       // non-hexadecimal char in hash part
                        return li, errInvalidLocator
                }
        }