// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

// Package keepstore implements the keepstore service component and
// back-end storage drivers.
//
// It is an internal module, only intended to be imported by
// /cmd/arvados-server and other server-side components in this
// repository.
package keepstore

import (
	"bytes"
	"context"
	"crypto/md5"
	"errors"
	"fmt"
	"io"
	"net/http"
	"os"
	"sort"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"git.arvados.org/arvados.git/sdk/go/arvados"
	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
	"git.arvados.org/arvados.git/sdk/go/auth"
	"git.arvados.org/arvados.git/sdk/go/ctxlog"
	"git.arvados.org/arvados.git/sdk/go/httpserver"
	"git.arvados.org/arvados.git/sdk/go/keepclient"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/sirupsen/logrus"
)

// Maximum size of a keep block is 64 MiB.
const BlockSize = 1 << 26

var (
	errChecksum          = httpserver.ErrorWithStatus(errors.New("checksum mismatch in stored data"), http.StatusBadGateway)
	errNoTokenProvided   = httpserver.ErrorWithStatus(errors.New("no token provided in Authorization header"), http.StatusUnauthorized)
	errMethodNotAllowed  = httpserver.ErrorWithStatus(errors.New("method not allowed"), http.StatusMethodNotAllowed)
	errVolumeUnavailable = httpserver.ErrorWithStatus(errors.New("volume unavailable"), http.StatusServiceUnavailable)
	errCollision         = httpserver.ErrorWithStatus(errors.New("hash collision"), http.StatusInternalServerError)
	errExpiredSignature  = httpserver.ErrorWithStatus(errors.New("expired signature"), http.StatusUnauthorized)
	errInvalidSignature  = httpserver.ErrorWithStatus(errors.New("invalid signature"), http.StatusBadRequest)
	errInvalidLocator    = httpserver.ErrorWithStatus(errors.New("invalid locator"), http.StatusBadRequest)
	errFull              = httpserver.ErrorWithStatus(errors.New("insufficient storage"), http.StatusInsufficientStorage)
	errTooLarge          = httpserver.ErrorWithStatus(errors.New("request entity too large"), http.StatusRequestEntityTooLarge)
	driver               = make(map[string]volumeDriver)
)

type indexOptions struct {
	MountUUID string
	Prefix    string
	WriteTo   io.Writer
}

type mount struct {
	arvados.KeepMount
	volume
	priority int
}

type keepstore struct {
	cluster    *arvados.Cluster
	logger     logrus.FieldLogger
	serviceURL arvados.URL
	mounts     map[string]*mount
	mountsR    []*mount
	mountsW    []*mount
	bufferPool *bufferPool

	iostats map[volume]*ioStats

	remoteClients    map[string]*keepclient.KeepClient
	remoteClientsMtx sync.Mutex
}

func newKeepstore(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) (*keepstore, error) {
	logger := ctxlog.FromContext(ctx)

	if cluster.API.MaxConcurrentRequests > 0 && cluster.API.MaxConcurrentRequests < cluster.API.MaxKeepBlobBuffers {
		logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", cluster.API.MaxKeepBlobBuffers, cluster.API.MaxConcurrentRequests)
	}

	if cluster.Collections.BlobSigningKey != "" {
	} else if cluster.Collections.BlobSigning {
		return nil, errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey")
	} else {
		logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.")
	}

	if cluster.API.MaxKeepBlobBuffers <= 0 {
		return nil, fmt.Errorf("API.MaxKeepBlobBuffers must be greater than zero")
	}
	bufferPool := newBufferPool(logger, cluster.API.MaxKeepBlobBuffers, reg)

	ks := &keepstore{
		cluster:       cluster,
		logger:        logger,
		serviceURL:    serviceURL,
		bufferPool:    bufferPool,
		remoteClients: make(map[string]*keepclient.KeepClient),
	}

	err := ks.setupMounts(newVolumeMetricsVecs(reg))
	if err != nil {
		return nil, err
	}

	return ks, nil
}

func (ks *keepstore) setupMounts(metrics *volumeMetricsVecs) error {
	ks.mounts = make(map[string]*mount)
	if len(ks.cluster.Volumes) == 0 {
		return errors.New("no volumes configured")
	}
	for uuid, cfgvol := range ks.cluster.Volumes {
		va, ok := cfgvol.AccessViaHosts[ks.serviceURL]
		if !ok && len(cfgvol.AccessViaHosts) > 0 {
			continue
		}
		dri, ok := driver[cfgvol.Driver]
		if !ok {
			return fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
		}
		vol, err := dri(newVolumeParams{
			UUID:         uuid,
			Cluster:      ks.cluster,
			ConfigVolume: cfgvol,
			Logger:       ks.logger,
			MetricsVecs:  metrics,
			BufferPool:   ks.bufferPool,
		})
		if err != nil {
			return fmt.Errorf("error initializing volume %s: %s", uuid, err)
		}
		sc := cfgvol.StorageClasses
		if len(sc) == 0 {
			sc = map[string]bool{"default": true}
		}
		repl := cfgvol.Replication
		if repl < 1 {
			repl = 1
		}
		pri := 0
		for class, in := range cfgvol.StorageClasses {
			p := ks.cluster.StorageClasses[class].Priority
			if in && p > pri {
				pri = p
			}
		}
		mnt := &mount{
			volume:   vol,
			priority: pri,
			KeepMount: arvados.KeepMount{
				UUID:           uuid,
				DeviceID:       vol.DeviceID(),
				AllowWrite:     !va.ReadOnly && !cfgvol.ReadOnly,
				AllowTrash:     !va.ReadOnly && (!cfgvol.ReadOnly || cfgvol.AllowTrashWhenReadOnly),
				Replication:    repl,
				StorageClasses: sc,
			},
		}
		ks.mounts[uuid] = mnt
		ks.logger.Printf("started volume %s (%s), AllowWrite=%v, AllowTrash=%v", uuid, vol.DeviceID(), mnt.AllowWrite, mnt.AllowTrash)
	}
	if len(ks.mounts) == 0 {
		return fmt.Errorf("no volumes configured for %s", ks.serviceURL)
	}

	ks.mountsR = nil
	ks.mountsW = nil
	for _, mnt := range ks.mounts {
		ks.mountsR = append(ks.mountsR, mnt)
		if mnt.AllowWrite {
			ks.mountsW = append(ks.mountsW, mnt)
		}
	}
	// Sorting mounts by UUID makes behavior more predictable, and
	// is convenient for testing -- for example, "index all
	// volumes" and "trash block on all volumes" will visit
	// volumes in predictable order.
	sort.Slice(ks.mountsR, func(i, j int) bool { return ks.mountsR[i].UUID < ks.mountsR[j].UUID })
	sort.Slice(ks.mountsW, func(i, j int) bool { return ks.mountsW[i].UUID < ks.mountsW[j].UUID })
	return nil
}

// checkLocatorSignature checks that locator has a valid signature.
// If the BlobSigning config is false, it returns nil even if the
// signature is invalid or missing.
func (ks *keepstore) checkLocatorSignature(ctx context.Context, locator string) error {
	if !ks.cluster.Collections.BlobSigning {
		return nil
	}
	token := ctxToken(ctx)
	if token == "" {
		return errNoTokenProvided
	}
	err := arvados.VerifySignature(locator, token, ks.cluster.Collections.BlobSigningTTL.Duration(), []byte(ks.cluster.Collections.BlobSigningKey))
	if err == arvados.ErrSignatureExpired {
		return errExpiredSignature
	} else if err != nil {
		return errInvalidSignature
	}
	return nil
}

// signLocator signs the locator for the given token, if possible.
// Note this signs if the BlobSigningKey config is available, even if
// the BlobSigning config is false.
func (ks *keepstore) signLocator(token, locator string) string {
	if token == "" || len(ks.cluster.Collections.BlobSigningKey) == 0 {
		return locator
	}
	ttl := ks.cluster.Collections.BlobSigningTTL.Duration()
	return arvados.SignLocator(locator, token, time.Now().Add(ttl), ttl, []byte(ks.cluster.Collections.BlobSigningKey))
}

func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (n int, err error) {
	li, err := getLocatorInfo(opts.Locator)
	if err != nil {
		return 0, err
	}
	out := opts.WriteTo
	if rw, ok := out.(http.ResponseWriter); ok && li.size > 0 {
		out = &setSizeOnWrite{ResponseWriter: rw, size: li.size}
	}
	if li.remote && !li.signed {
		return ks.blockReadRemote(ctx, opts)
	}
	if err := ks.checkLocatorSignature(ctx, opts.Locator); err != nil {
		return 0, err
	}
	hashcheck := md5.New()
	if li.size > 0 {
		out = newHashCheckWriter(out, hashcheck, int64(li.size), li.hash)
	} else {
		out = io.MultiWriter(out, hashcheck)
	}

	buf, err := ks.bufferPool.GetContext(ctx)
	if err != nil {
		return 0, err
	}
	defer ks.bufferPool.Put(buf)
	streamer := newStreamWriterAt(out, 65536, buf)
	defer streamer.Close()

	var errToCaller error = os.ErrNotExist
	for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
		if ctx.Err() != nil {
			return 0, ctx.Err()
		}
		err := mnt.BlockRead(ctx, li.hash, streamer)
		if err != nil {
			if streamer.WroteAt() != 0 {
				// BlockRead encountered an error
				// after writing some data, so it's
				// too late to try another
				// volume. Flush streamer before
				// calling Wrote() to ensure our
				// return value accurately reflects
				// the number of bytes written to
				// opts.WriteTo.
				streamer.Close()
				return streamer.Wrote(), err
			}
			if !os.IsNotExist(err) {
				errToCaller = err
			}
			continue
		}
		if li.size == 0 {
			// hashCheckingWriter isn't in use because we
			// don't know the expected size. All we can do
			// is check after writing all the data, and
			// trust the caller is doing a HEAD request so
			// it's not too late to set an error code in
			// the response header.
			err = streamer.Close()
			if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil {
				err = errChecksum
			}
			if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
				// We didn't set the content-length header
				// above because we didn't know the block size
				// until now.
				rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt()))
			}
			return streamer.WroteAt(), err
		} else if streamer.WroteAt() != li.size {
			// If the backend read fewer bytes than
			// expected but returns no error, we can
			// classify this as a checksum error (even
			// though hashCheckWriter doesn't know that
			// yet, it's just waiting for the next
			// write). If our caller is serving a GET
			// request it's too late to do anything about
			// it anyway, but if it's a HEAD request the
			// caller can still change the response status
			// code.
			return streamer.WroteAt(), errChecksum
		}
		// Ensure streamer flushes all buffered data without
		// errors.
		err = streamer.Close()
		return streamer.Wrote(), err
	}
	return 0, errToCaller
}

func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
	token := ctxToken(ctx)
	if token == "" {
		return 0, errNoTokenProvided
	}
	var remoteClient *keepclient.KeepClient
	var parts []string
	li, err := getLocatorInfo(opts.Locator)
	if err != nil {
		return 0, err
	}
	for i, part := range strings.Split(opts.Locator, "+") {
		switch {
		case i == 0:
			// don't try to parse hash part as hint
		case strings.HasPrefix(part, "A"):
			// drop local permission hint
			continue
		case len(part) > 7 && part[0] == 'R' && part[6] == '-':
			remoteID := part[1:6]
			remote, ok := ks.cluster.RemoteClusters[remoteID]
			if !ok {
				return 0, httpserver.ErrorWithStatus(errors.New("remote cluster not configured"), http.StatusBadRequest)
			}
			kc, err := ks.remoteClient(remoteID, remote, token)
			if err == auth.ErrObsoleteToken {
				return 0, httpserver.ErrorWithStatus(err, http.StatusBadRequest)
			} else if err != nil {
				return 0, err
			}
			remoteClient = kc
			part = "A" + part[7:]
		}
		parts = append(parts, part)
	}
	if remoteClient == nil {
		return 0, httpserver.ErrorWithStatus(errors.New("invalid remote hint"), http.StatusBadRequest)
	}
	locator := strings.Join(parts, "+")
	if opts.LocalLocator == nil {
		// Read from remote cluster and stream response back
		// to caller
		if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size > 0 {
			rw.Header().Set("Content-Length", fmt.Sprintf("%d", li.size))
		}
		return remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
			Locator: locator,
			WriteTo: opts.WriteTo,
		})
	}
	// We must call LocalLocator before writing any data to
	// opts.WriteTo, otherwise the caller can't put the local
	// locator in a response header.  So we copy into memory,
	// generate the local signature, then copy from memory to
	// opts.WriteTo.
	buf, err := ks.bufferPool.GetContext(ctx)
	if err != nil {
		return 0, err
	}
	defer ks.bufferPool.Put(buf)
	writebuf := bytes.NewBuffer(buf[:0])
	ks.logger.Infof("blockReadRemote(%s): remote read(%s)", opts.Locator, locator)
	_, err = remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
		Locator: locator,
		WriteTo: writebuf,
	})
	if err != nil {
		return 0, err
	}
	resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
		Hash: locator,
		Data: writebuf.Bytes(),
	})
	if err != nil {
		return 0, err
	}
	opts.LocalLocator(resp.Locator)
	if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
		rw.Header().Set("Content-Length", fmt.Sprintf("%d", writebuf.Len()))
	}
	n, err := io.Copy(opts.WriteTo, bytes.NewReader(writebuf.Bytes()))
	return int(n), err
}

func (ks *keepstore) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
	ks.remoteClientsMtx.Lock()
	kc, ok := ks.remoteClients[remoteID]
	ks.remoteClientsMtx.Unlock()
	if !ok {
		c := &arvados.Client{
			APIHost:   remoteCluster.Host,
			AuthToken: "xxx",
			Insecure:  remoteCluster.Insecure,
		}
		ac, err := arvadosclient.New(c)
		if err != nil {
			return nil, err
		}
		kc, err = keepclient.MakeKeepClient(ac)
		if err != nil {
			return nil, err
		}
		kc.DiskCacheSize = keepclient.DiskCacheDisabled

		ks.remoteClientsMtx.Lock()
		ks.remoteClients[remoteID] = kc
		ks.remoteClientsMtx.Unlock()
	}
	accopy := *kc.Arvados
	accopy.ApiToken = token
	kccopy := kc.Clone()
	kccopy.Arvados = &accopy
	token, err := auth.SaltToken(token, remoteID)
	if err != nil {
		return nil, err
	}
	kccopy.Arvados.ApiToken = token
	return kccopy, nil
}

// BlockWrite writes a block to one or more volumes.
func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
	var resp arvados.BlockWriteResponse
	var hash string
	if opts.Data == nil {
		buf, err := ks.bufferPool.GetContext(ctx)
		if err != nil {
			return resp, err
		}
		defer ks.bufferPool.Put(buf)
		w := bytes.NewBuffer(buf[:0])
		h := md5.New()
		limitedReader := &io.LimitedReader{R: opts.Reader, N: BlockSize}
		n, err := io.Copy(io.MultiWriter(w, h), limitedReader)
		if err != nil {
			return resp, err
		}
		if limitedReader.N == 0 {
			// Data size is either exactly BlockSize, or too big.
			n, err := opts.Reader.Read(make([]byte, 1))
			if n > 0 {
				return resp, httpserver.ErrorWithStatus(err, http.StatusRequestEntityTooLarge)
			}
			if err != io.EOF {
				return resp, err
			}
		}
		opts.Data = buf[:n]
		if opts.DataSize != 0 && int(n) != opts.DataSize {
			return resp, httpserver.ErrorWithStatus(fmt.Errorf("content length %d did not match specified data size %d", n, opts.DataSize), http.StatusBadRequest)
		}
		hash = fmt.Sprintf("%x", h.Sum(nil))
	} else {
		hash = fmt.Sprintf("%x", md5.Sum(opts.Data))
	}
	if opts.Hash != "" && !strings.HasPrefix(opts.Hash, hash) {
		return resp, httpserver.ErrorWithStatus(fmt.Errorf("content hash %s did not match specified locator %s", hash, opts.Hash), http.StatusBadRequest)
	}
	rvzmounts := ks.rendezvous(hash, ks.mountsW)
	result := newPutProgress(opts.StorageClasses)
	for _, mnt := range rvzmounts {
		if !result.Want(mnt) {
			continue
		}
		cmp := &checkEqual{Expect: opts.Data}
		if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
			if !cmp.Equal() {
				return resp, errCollision
			}
			err := mnt.BlockTouch(hash)
			if err == nil {
				result.Add(mnt)
			}
		}
	}
	var allFull atomic.Bool
	allFull.Store(true)
	// pending tracks what result will be if all outstanding
	// writes succeed.
	pending := result.Copy()
	cond := sync.NewCond(new(sync.Mutex))
	cond.L.Lock()
	var wg sync.WaitGroup
nextmnt:
	for _, mnt := range rvzmounts {
		for {
			if result.Done() || ctx.Err() != nil {
				break nextmnt
			}
			if !result.Want(mnt) {
				continue nextmnt
			}
			if pending.Want(mnt) {
				break
			}
			// This mount might not be needed, depending
			// on the outcome of pending writes. Wait for
			// a pending write to finish, then check
			// again.
			cond.Wait()
		}
		mnt := mnt
		logger := ks.logger.WithField("mount", mnt.UUID)
		pending.Add(mnt)
		wg.Add(1)
		go func() {
			defer wg.Done()
			logger.Debug("start write")
			err := mnt.BlockWrite(ctx, hash, opts.Data)
			cond.L.Lock()
			defer cond.L.Unlock()
			defer cond.Broadcast()
			if err != nil {
				logger.Debug("write failed")
				pending.Sub(mnt)
				if err != errFull {
					allFull.Store(false)
				}
			} else {
				result.Add(mnt)
				pending.Sub(mnt)
			}
		}()
	}
	cond.L.Unlock()
	wg.Wait()
	if ctx.Err() != nil {
		return resp, ctx.Err()
	}
	if result.Done() || result.totalReplication > 0 {
		resp = arvados.BlockWriteResponse{
			Locator:        ks.signLocator(ctxToken(ctx), fmt.Sprintf("%s+%d", hash, len(opts.Data))),
			Replicas:       result.totalReplication,
			StorageClasses: result.classDone,
		}
		return resp, nil
	}
	if allFull.Load() {
		return resp, errFull
	}
	return resp, errVolumeUnavailable
}

// rendezvous sorts the given mounts by descending priority, then by
// rendezvous order for the given locator.
func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount {
	hash := locator
	if len(hash) > 32 {
		hash = hash[:32]
	}
	// copy the provided []*mount before doing an in-place sort
	mnts = append([]*mount(nil), mnts...)
	weight := make(map[*mount]string)
	for _, mnt := range mnts {
		uuidpart := mnt.UUID
		if len(uuidpart) == 27 {
			// strip zzzzz-yyyyy- prefixes
			uuidpart = uuidpart[12:]
		}
		weight[mnt] = fmt.Sprintf("%x", md5.Sum([]byte(hash+uuidpart)))
	}
	sort.Slice(mnts, func(i, j int) bool {
		if p := mnts[i].priority - mnts[j].priority; p != 0 {
			return p > 0
		}
		return weight[mnts[i]] < weight[mnts[j]]
	})
	return mnts
}

// checkEqual reports whether the data written to it (via io.WriterAt
// interface) is equal to the expected data.
//
// Expect should not be changed after the first Write.
//
// Results are undefined if WriteAt is called with overlapping ranges.
type checkEqual struct {
	Expect   []byte
	equal    atomic.Int64
	notequal atomic.Bool
}

func (ce *checkEqual) Equal() bool {
	return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect))
}

func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) {
	endpos := int(offset) + len(p)
	if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) {
		ce.equal.Add(int64(len(p)))
	} else {
		ce.notequal.Store(true)
	}
	return len(p), nil
}

func (ks *keepstore) BlockUntrash(ctx context.Context, locator string) error {
	li, err := getLocatorInfo(locator)
	if err != nil {
		return err
	}
	var errToCaller error = os.ErrNotExist
	for _, mnt := range ks.mountsW {
		if ctx.Err() != nil {
			return ctx.Err()
		}
		err := mnt.BlockUntrash(li.hash)
		if err == nil {
			errToCaller = nil
		} else if !os.IsNotExist(err) && errToCaller != nil {
			errToCaller = err
		}
	}
	return errToCaller
}

func (ks *keepstore) BlockTouch(ctx context.Context, locator string) error {
	li, err := getLocatorInfo(locator)
	if err != nil {
		return err
	}
	var errToCaller error = os.ErrNotExist
	for _, mnt := range ks.mountsW {
		if ctx.Err() != nil {
			return ctx.Err()
		}
		err := mnt.BlockTouch(li.hash)
		if err == nil {
			return nil
		}
		if !os.IsNotExist(err) {
			errToCaller = err
		}
	}
	return errToCaller
}

func (ks *keepstore) BlockTrash(ctx context.Context, locator string) error {
	if !ks.cluster.Collections.BlobTrash {
		return errMethodNotAllowed
	}
	li, err := getLocatorInfo(locator)
	if err != nil {
		return err
	}
	var errToCaller error = os.ErrNotExist
	for _, mnt := range ks.mounts {
		if !mnt.AllowTrash {
			continue
		}
		if ctx.Err() != nil {
			return ctx.Err()
		}
		t, err := mnt.Mtime(li.hash)
		if err == nil && time.Now().Sub(t) > ks.cluster.Collections.BlobSigningTTL.Duration() {
			err = mnt.BlockTrash(li.hash)
		}
		if os.IsNotExist(errToCaller) || (errToCaller == nil && !os.IsNotExist(err)) {
			errToCaller = err
		}
	}
	return errToCaller
}

func (ks *keepstore) Mounts() []*mount {
	return ks.mountsR
}

func (ks *keepstore) Index(ctx context.Context, opts indexOptions) error {
	mounts := ks.mountsR
	if opts.MountUUID != "" {
		mnt, ok := ks.mounts[opts.MountUUID]
		if !ok {
			return os.ErrNotExist
		}
		mounts = []*mount{mnt}
	}
	for _, mnt := range mounts {
		err := mnt.Index(ctx, opts.Prefix, opts.WriteTo)
		if err != nil {
			return err
		}
	}
	return nil
}

func ctxToken(ctx context.Context) string {
	if c, ok := auth.FromContext(ctx); ok && len(c.Tokens) > 0 {
		return c.Tokens[0]
	} else {
		return ""
	}
}

// locatorInfo expresses the attributes of a locator that are relevant
// for keepstore decision-making.
type locatorInfo struct {
	hash   string
	size   int
	remote bool // locator has a +R hint
	signed bool // locator has a +A hint
}

func getLocatorInfo(loc string) (locatorInfo, error) {
	var li locatorInfo
	plus := 0    // number of '+' chars seen so far
	partlen := 0 // chars since last '+'
	for i, c := range loc + "+" {
		if c == '+' {
			if partlen == 0 {
				// double/leading/trailing '+'
				return li, errInvalidLocator
			}
			if plus == 0 {
				if i != 32 {
					return li, errInvalidLocator
				}
				li.hash = loc[:i]
			}
			if plus == 1 {
				if size, err := strconv.Atoi(loc[i-partlen : i]); err == nil {
					li.size = size
				}
			}
			plus++
			partlen = 0
			continue
		}
		partlen++
		if partlen == 1 {
			if c == 'A' {
				li.signed = true
			}
			if c == 'R' {
				li.remote = true
			}
			if plus > 1 && c >= '0' && c <= '9' {
				// size, if present at all, must come first
				return li, errInvalidLocator
			}
		}
		if plus == 0 && !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) {
			// non-hexadecimal char in hash part
			return li, errInvalidLocator
		}
	}
	return li, nil
}