2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / keepstore.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "sort"
17         "strconv"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25         "git.arvados.org/arvados.git/sdk/go/auth"
26         "git.arvados.org/arvados.git/sdk/go/ctxlog"
27         "git.arvados.org/arvados.git/sdk/go/httpserver"
28         "git.arvados.org/arvados.git/sdk/go/keepclient"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/sirupsen/logrus"
31 )
32
33 // Maximum size of a keep block is 64 MiB.
34 const BlockSize = 1 << 26
35
36 var (
37         errChecksum          = httpserver.ErrorWithStatus(errors.New("checksum mismatch in stored data"), http.StatusBadGateway)
38         errNoTokenProvided   = httpserver.ErrorWithStatus(errors.New("no token provided in Authorization header"), http.StatusUnauthorized)
39         errMethodNotAllowed  = httpserver.ErrorWithStatus(errors.New("method not allowed"), http.StatusMethodNotAllowed)
40         errVolumeUnavailable = httpserver.ErrorWithStatus(errors.New("volume unavailable"), http.StatusServiceUnavailable)
41         errCollision         = httpserver.ErrorWithStatus(errors.New("hash collision"), http.StatusInternalServerError)
42         errExpiredSignature  = httpserver.ErrorWithStatus(errors.New("expired signature"), http.StatusUnauthorized)
43         errInvalidSignature  = httpserver.ErrorWithStatus(errors.New("invalid signature"), http.StatusBadRequest)
44         errInvalidLocator    = httpserver.ErrorWithStatus(errors.New("invalid locator"), http.StatusBadRequest)
45         errFull              = httpserver.ErrorWithStatus(errors.New("insufficient storage"), http.StatusInsufficientStorage)
46         errTooLarge          = httpserver.ErrorWithStatus(errors.New("request entity too large"), http.StatusRequestEntityTooLarge)
47         driver               = make(map[string]volumeDriver)
48 )
49
50 type IndexOptions struct {
51         MountUUID string
52         Prefix    string
53         WriteTo   io.Writer
54 }
55
56 type mount struct {
57         arvados.KeepMount
58         volume
59         priority int
60 }
61
62 type keepstore struct {
63         cluster    *arvados.Cluster
64         logger     logrus.FieldLogger
65         serviceURL arvados.URL
66         mounts     map[string]*mount
67         mountsR    []*mount
68         mountsW    []*mount
69         bufferPool *bufferPool
70
71         iostats map[volume]*ioStats
72
73         remoteClients    map[string]*keepclient.KeepClient
74         remoteClientsMtx sync.Mutex
75 }
76
77 func newKeepstore(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) (*keepstore, error) {
78         logger := ctxlog.FromContext(ctx)
79
80         if cluster.API.MaxConcurrentRequests > 0 && cluster.API.MaxConcurrentRequests < cluster.API.MaxKeepBlobBuffers {
81                 logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", cluster.API.MaxKeepBlobBuffers, cluster.API.MaxConcurrentRequests)
82         }
83
84         if cluster.Collections.BlobSigningKey != "" {
85         } else if cluster.Collections.BlobSigning {
86                 return nil, errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey")
87         } else {
88                 logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.")
89         }
90
91         if cluster.API.MaxKeepBlobBuffers <= 0 {
92                 return nil, fmt.Errorf("API.MaxKeepBlobBuffers must be greater than zero")
93         }
94         bufferPool := newBufferPool(logger, cluster.API.MaxKeepBlobBuffers, reg)
95
96         ks := &keepstore{
97                 cluster:       cluster,
98                 logger:        logger,
99                 serviceURL:    serviceURL,
100                 bufferPool:    bufferPool,
101                 remoteClients: make(map[string]*keepclient.KeepClient),
102         }
103
104         err := ks.setupMounts(newVolumeMetricsVecs(reg))
105         if err != nil {
106                 return nil, err
107         }
108
109         return ks, nil
110 }
111
112 func (ks *keepstore) setupMounts(metrics *volumeMetricsVecs) error {
113         ks.mounts = make(map[string]*mount)
114         if len(ks.cluster.Volumes) == 0 {
115                 return errors.New("no volumes configured")
116         }
117         for uuid, cfgvol := range ks.cluster.Volumes {
118                 va, ok := cfgvol.AccessViaHosts[ks.serviceURL]
119                 if !ok && len(cfgvol.AccessViaHosts) > 0 {
120                         continue
121                 }
122                 dri, ok := driver[cfgvol.Driver]
123                 if !ok {
124                         return fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
125                 }
126                 vol, err := dri(newVolumeParams{
127                         UUID:         uuid,
128                         Cluster:      ks.cluster,
129                         ConfigVolume: cfgvol,
130                         Logger:       ks.logger,
131                         MetricsVecs:  metrics,
132                         BufferPool:   ks.bufferPool,
133                 })
134                 if err != nil {
135                         return fmt.Errorf("error initializing volume %s: %s", uuid, err)
136                 }
137                 sc := cfgvol.StorageClasses
138                 if len(sc) == 0 {
139                         sc = map[string]bool{"default": true}
140                 }
141                 repl := cfgvol.Replication
142                 if repl < 1 {
143                         repl = 1
144                 }
145                 pri := 0
146                 for class, in := range cfgvol.StorageClasses {
147                         p := ks.cluster.StorageClasses[class].Priority
148                         if in && p > pri {
149                                 pri = p
150                         }
151                 }
152                 mnt := &mount{
153                         volume:   vol,
154                         priority: pri,
155                         KeepMount: arvados.KeepMount{
156                                 UUID:           uuid,
157                                 DeviceID:       vol.DeviceID(),
158                                 AllowWrite:     !va.ReadOnly && !cfgvol.ReadOnly,
159                                 AllowTrash:     !va.ReadOnly && (!cfgvol.ReadOnly || cfgvol.AllowTrashWhenReadOnly),
160                                 Replication:    repl,
161                                 StorageClasses: sc,
162                         },
163                 }
164                 ks.mounts[uuid] = mnt
165                 ks.logger.Printf("started volume %s (%s), AllowWrite=%v, AllowTrash=%v", uuid, vol.DeviceID(), mnt.AllowWrite, mnt.AllowTrash)
166         }
167         if len(ks.mounts) == 0 {
168                 return fmt.Errorf("no volumes configured for %s", ks.serviceURL)
169         }
170
171         ks.mountsR = nil
172         ks.mountsW = nil
173         for _, mnt := range ks.mounts {
174                 ks.mountsR = append(ks.mountsR, mnt)
175                 if mnt.AllowWrite {
176                         ks.mountsW = append(ks.mountsW, mnt)
177                 }
178         }
179         // Sorting mounts by UUID makes behavior more predictable, and
180         // is convenient for testing -- for example, "index all
181         // volumes" and "trash block on all volumes" will visit
182         // volumes in predictable order.
183         sort.Slice(ks.mountsR, func(i, j int) bool { return ks.mountsR[i].UUID < ks.mountsR[j].UUID })
184         sort.Slice(ks.mountsW, func(i, j int) bool { return ks.mountsW[i].UUID < ks.mountsW[j].UUID })
185         return nil
186 }
187
188 // checkLocatorSignature checks that locator has a valid signature.
189 // If the BlobSigning config is false, it returns nil even if the
190 // signature is invalid or missing.
191 func (ks *keepstore) checkLocatorSignature(ctx context.Context, locator string) error {
192         if !ks.cluster.Collections.BlobSigning {
193                 return nil
194         }
195         token := ctxToken(ctx)
196         if token == "" {
197                 return errNoTokenProvided
198         }
199         err := arvados.VerifySignature(locator, token, ks.cluster.Collections.BlobSigningTTL.Duration(), []byte(ks.cluster.Collections.BlobSigningKey))
200         if err == arvados.ErrSignatureExpired {
201                 return errExpiredSignature
202         } else if err != nil {
203                 return errInvalidSignature
204         }
205         return nil
206 }
207
208 // signLocator signs the locator for the given token, if possible.
209 // Note this signs if the BlobSigningKey config is available, even if
210 // the BlobSigning config is false.
211 func (ks *keepstore) signLocator(token, locator string) string {
212         if token == "" || len(ks.cluster.Collections.BlobSigningKey) == 0 {
213                 return locator
214         }
215         ttl := ks.cluster.Collections.BlobSigningTTL.Duration()
216         return arvados.SignLocator(locator, token, time.Now().Add(ttl), ttl, []byte(ks.cluster.Collections.BlobSigningKey))
217 }
218
219 func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (n int, err error) {
220         li, err := parseLocator(opts.Locator)
221         if err != nil {
222                 return 0, err
223         }
224         out := opts.WriteTo
225         if rw, ok := out.(http.ResponseWriter); ok && li.size > 0 {
226                 out = &setSizeOnWrite{ResponseWriter: rw, size: li.size}
227         }
228         if li.remote && !li.signed {
229                 return ks.blockReadRemote(ctx, opts)
230         }
231         if err := ks.checkLocatorSignature(ctx, opts.Locator); err != nil {
232                 return 0, err
233         }
234         hashcheck := md5.New()
235         if li.size > 0 {
236                 out = newHashCheckWriter(out, hashcheck, int64(li.size), li.hash)
237         } else {
238                 out = io.MultiWriter(out, hashcheck)
239         }
240         var errToCaller error = os.ErrNotExist
241         for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
242                 if ctx.Err() != nil {
243                         return 0, ctx.Err()
244                 }
245                 n, err = mnt.BlockRead(ctx, li.hash, out)
246                 if err == nil && li.size > 0 && n != li.size {
247                         // If the backend read fewer bytes than
248                         // expected but returns no error, we can
249                         // classify this as a checksum error (even
250                         // though hashCheckWriter doesn't know that
251                         // yet, it's just waiting for the next
252                         // write). If our caller is serving a GET
253                         // request it's too late to do anything about
254                         // it anyway, but if it's a HEAD request the
255                         // caller can still change the response status
256                         // code.
257                         return n, errChecksum
258                 }
259                 if err == nil && li.size == 0 {
260                         // hashCheckingWriter isn't in use because we
261                         // don't know the expected size. All we can do
262                         // is check after writing all the data, and
263                         // trust the caller is doing a HEAD request so
264                         // it's not too late to set an error code in
265                         // the response header.
266                         if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash {
267                                 return n, errChecksum
268                         }
269                 }
270                 if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size == 0 && err == nil {
271                         // We didn't set the content-length header
272                         // above because we didn't know the block size
273                         // until now.
274                         rw.Header().Set("Content-Length", fmt.Sprintf("%d", n))
275                 }
276                 if n > 0 || err == nil {
277                         // success, or there's an error but we can't
278                         // retry because we've already sent some data.
279                         return n, err
280                 }
281                 if !os.IsNotExist(err) {
282                         // If some volume returns a transient error,
283                         // return it to the caller instead of "Not
284                         // found" so it can retry.
285                         errToCaller = err
286                 }
287         }
288         return 0, errToCaller
289 }
290
291 func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
292         ks.logger.Infof("blockReadRemote(%s)", opts.Locator)
293         token := ctxToken(ctx)
294         if token == "" {
295                 return 0, errNoTokenProvided
296         }
297         var remoteClient *keepclient.KeepClient
298         var parts []string
299         var size int
300         for i, part := range strings.Split(opts.Locator, "+") {
301                 switch {
302                 case i == 0:
303                         // don't try to parse hash part as hint
304                 case strings.HasPrefix(part, "A"):
305                         // drop local permission hint
306                         continue
307                 case len(part) > 7 && part[0] == 'R' && part[6] == '-':
308                         remoteID := part[1:6]
309                         remote, ok := ks.cluster.RemoteClusters[remoteID]
310                         if !ok {
311                                 return 0, httpserver.ErrorWithStatus(errors.New("remote cluster not configured"), http.StatusBadRequest)
312                         }
313                         kc, err := ks.remoteClient(remoteID, remote, token)
314                         if err == auth.ErrObsoleteToken {
315                                 return 0, httpserver.ErrorWithStatus(err, http.StatusBadRequest)
316                         } else if err != nil {
317                                 return 0, err
318                         }
319                         remoteClient = kc
320                         part = "A" + part[7:]
321                 case len(part) > 0 && part[0] >= '0' && part[0] <= '9':
322                         size, _ = strconv.Atoi(part)
323                 }
324                 parts = append(parts, part)
325         }
326         if remoteClient == nil {
327                 return 0, httpserver.ErrorWithStatus(errors.New("invalid remote hint"), http.StatusBadRequest)
328         }
329         locator := strings.Join(parts, "+")
330         if opts.LocalLocator == nil {
331                 // Read from remote cluster and stream response back
332                 // to caller
333                 if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && size > 0 {
334                         rw.Header().Set("Content-Length", fmt.Sprintf("%d", size))
335                 }
336                 return remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
337                         Locator: locator,
338                         WriteTo: opts.WriteTo,
339                 })
340         }
341         // We must call LocalLocator before writing any data to
342         // opts.WriteTo, otherwise the caller can't put the local
343         // locator in a response header.  So we copy into memory,
344         // generate the local signature, then copy from memory to
345         // opts.WriteTo.
346         buf, err := ks.bufferPool.GetContext(ctx)
347         if err != nil {
348                 return 0, err
349         }
350         defer ks.bufferPool.Put(buf)
351         writebuf := bytes.NewBuffer(buf[:0])
352         ks.logger.Infof("blockReadRemote(%s): remote read(%s)", opts.Locator, locator)
353         _, err = remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
354                 Locator: locator,
355                 WriteTo: writebuf,
356         })
357         if err != nil {
358                 return 0, err
359         }
360         resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
361                 Hash: locator,
362                 Data: writebuf.Bytes(),
363         })
364         if err != nil {
365                 return 0, err
366         }
367         opts.LocalLocator(resp.Locator)
368         if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
369                 rw.Header().Set("Content-Length", fmt.Sprintf("%d", writebuf.Len()))
370         }
371         n, err := io.Copy(opts.WriteTo, bytes.NewReader(writebuf.Bytes()))
372         return int(n), err
373 }
374
375 func (ks *keepstore) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
376         ks.remoteClientsMtx.Lock()
377         kc, ok := ks.remoteClients[remoteID]
378         ks.remoteClientsMtx.Unlock()
379         if !ok {
380                 c := &arvados.Client{
381                         APIHost:   remoteCluster.Host,
382                         AuthToken: "xxx",
383                         Insecure:  remoteCluster.Insecure,
384                 }
385                 ac, err := arvadosclient.New(c)
386                 if err != nil {
387                         return nil, err
388                 }
389                 kc, err = keepclient.MakeKeepClient(ac)
390                 if err != nil {
391                         return nil, err
392                 }
393                 kc.DiskCacheSize = keepclient.DiskCacheDisabled
394
395                 ks.remoteClientsMtx.Lock()
396                 ks.remoteClients[remoteID] = kc
397                 ks.remoteClientsMtx.Unlock()
398         }
399         accopy := *kc.Arvados
400         accopy.ApiToken = token
401         kccopy := kc.Clone()
402         kccopy.Arvados = &accopy
403         token, err := auth.SaltToken(token, remoteID)
404         if err != nil {
405                 return nil, err
406         }
407         kccopy.Arvados.ApiToken = token
408         return kccopy, nil
409 }
410
411 // BlockWrite writes a block to one or more volumes.
412 func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
413         var resp arvados.BlockWriteResponse
414         var hash string
415         if opts.Data == nil {
416                 buf, err := ks.bufferPool.GetContext(ctx)
417                 if err != nil {
418                         return resp, err
419                 }
420                 defer ks.bufferPool.Put(buf)
421                 w := bytes.NewBuffer(buf[:0])
422                 h := md5.New()
423                 limitedReader := &io.LimitedReader{R: opts.Reader, N: BlockSize}
424                 n, err := io.Copy(io.MultiWriter(w, h), limitedReader)
425                 if err != nil {
426                         return resp, err
427                 }
428                 if limitedReader.N == 0 {
429                         // Data size is either exactly BlockSize, or too big.
430                         n, err := opts.Reader.Read(make([]byte, 1))
431                         if n > 0 {
432                                 return resp, httpserver.ErrorWithStatus(err, http.StatusRequestEntityTooLarge)
433                         }
434                         if err != io.EOF {
435                                 return resp, err
436                         }
437                 }
438                 opts.Data = buf[:n]
439                 if opts.DataSize != 0 && int(n) != opts.DataSize {
440                         return resp, httpserver.ErrorWithStatus(fmt.Errorf("content length %d did not match specified data size %d", n, opts.DataSize), http.StatusBadRequest)
441                 }
442                 hash = fmt.Sprintf("%x", h.Sum(nil))
443         } else {
444                 hash = fmt.Sprintf("%x", md5.Sum(opts.Data))
445         }
446         if opts.Hash != "" && !strings.HasPrefix(opts.Hash, hash) {
447                 return resp, httpserver.ErrorWithStatus(fmt.Errorf("content hash %s did not match specified locator %s", hash, opts.Hash), http.StatusBadRequest)
448         }
449         rvzmounts := ks.rendezvous(hash, ks.mountsW)
450         result := newPutProgress(opts.StorageClasses)
451         for _, mnt := range rvzmounts {
452                 if !result.Want(mnt) {
453                         continue
454                 }
455                 cmp := &checkEqual{Expect: opts.Data}
456                 if _, err := mnt.BlockRead(ctx, hash, cmp); err == nil {
457                         if !cmp.Equal() {
458                                 return resp, errCollision
459                         }
460                         err := mnt.BlockTouch(hash)
461                         if err == nil {
462                                 result.Add(mnt)
463                         }
464                 }
465         }
466         var allFull atomic.Bool
467         allFull.Store(true)
468         // pending tracks what result will be if all outstanding
469         // writes succeed.
470         pending := result.Copy()
471         cond := sync.NewCond(new(sync.Mutex))
472         cond.L.Lock()
473         var wg sync.WaitGroup
474 nextmnt:
475         for _, mnt := range rvzmounts {
476                 for {
477                         if result.Done() || ctx.Err() != nil {
478                                 break nextmnt
479                         }
480                         if !result.Want(mnt) {
481                                 continue nextmnt
482                         }
483                         if pending.Want(mnt) {
484                                 break
485                         }
486                         // This mount might not be needed, depending
487                         // on the outcome of pending writes. Wait for
488                         // a pending write to finish, then check
489                         // again.
490                         cond.Wait()
491                 }
492                 mnt := mnt
493                 logger := ks.logger.WithField("mount", mnt.UUID)
494                 pending.Add(mnt)
495                 wg.Add(1)
496                 go func() {
497                         defer wg.Done()
498                         logger.Debug("start write")
499                         err := mnt.BlockWrite(ctx, hash, opts.Data)
500                         cond.L.Lock()
501                         defer cond.L.Unlock()
502                         defer cond.Broadcast()
503                         if err != nil {
504                                 logger.Debug("write failed")
505                                 pending.Sub(mnt)
506                                 if err != errFull {
507                                         allFull.Store(false)
508                                 }
509                         } else {
510                                 result.Add(mnt)
511                                 pending.Sub(mnt)
512                         }
513                 }()
514         }
515         cond.L.Unlock()
516         wg.Wait()
517         if ctx.Err() != nil {
518                 return resp, ctx.Err()
519         }
520         if result.Done() || result.totalReplication > 0 {
521                 resp = arvados.BlockWriteResponse{
522                         Locator:        ks.signLocator(ctxToken(ctx), fmt.Sprintf("%s+%d", hash, len(opts.Data))),
523                         Replicas:       result.totalReplication,
524                         StorageClasses: result.classDone,
525                 }
526                 return resp, nil
527         }
528         if allFull.Load() {
529                 return resp, errFull
530         }
531         return resp, errVolumeUnavailable
532 }
533
534 // rendezvous sorts the given mounts by descending priority, then by
535 // rendezvous order for the given locator.
536 func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount {
537         hash := locator
538         if len(hash) > 32 {
539                 hash = hash[:32]
540         }
541         // copy the provided []*mount before doing an in-place sort
542         mnts = append([]*mount(nil), mnts...)
543         weight := make(map[*mount]string)
544         for _, mnt := range mnts {
545                 uuidpart := mnt.UUID
546                 if len(uuidpart) == 27 {
547                         // strip zzzzz-yyyyy- prefixes
548                         uuidpart = uuidpart[12:]
549                 }
550                 weight[mnt] = fmt.Sprintf("%x", md5.Sum([]byte(hash+uuidpart)))
551         }
552         sort.Slice(mnts, func(i, j int) bool {
553                 if p := mnts[i].priority - mnts[j].priority; p != 0 {
554                         return p > 0
555                 }
556                 return weight[mnts[i]] < weight[mnts[j]]
557         })
558         return mnts
559 }
560
561 // checkEqual reports whether the data written to it (via io.Writer
562 // interface) is equal to the expected data.
563 //
564 // Expect should not be changed after the first Write.
565 type checkEqual struct {
566         Expect     []byte
567         equalUntil int
568 }
569
570 func (ce *checkEqual) Equal() bool {
571         return ce.equalUntil == len(ce.Expect)
572 }
573
574 func (ce *checkEqual) Write(p []byte) (int, error) {
575         endpos := ce.equalUntil + len(p)
576         if ce.equalUntil >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[ce.equalUntil:endpos]) {
577                 ce.equalUntil = endpos
578         } else {
579                 ce.equalUntil = -1
580         }
581         return len(p), nil
582 }
583
584 func (ks *keepstore) BlockUntrash(ctx context.Context, locator string) error {
585         li, err := parseLocator(locator)
586         if err != nil {
587                 return err
588         }
589         var errToCaller error = os.ErrNotExist
590         for _, mnt := range ks.mountsW {
591                 if ctx.Err() != nil {
592                         return ctx.Err()
593                 }
594                 err := mnt.BlockUntrash(li.hash)
595                 if err == nil {
596                         errToCaller = nil
597                 } else if !os.IsNotExist(err) && errToCaller != nil {
598                         errToCaller = err
599                 }
600         }
601         return errToCaller
602 }
603
604 func (ks *keepstore) BlockTouch(ctx context.Context, locator string) error {
605         li, err := parseLocator(locator)
606         if err != nil {
607                 return err
608         }
609         var errToCaller error = os.ErrNotExist
610         for _, mnt := range ks.mountsW {
611                 if ctx.Err() != nil {
612                         return ctx.Err()
613                 }
614                 err := mnt.BlockTouch(li.hash)
615                 if err == nil {
616                         return nil
617                 }
618                 if !os.IsNotExist(err) {
619                         errToCaller = err
620                 }
621         }
622         return errToCaller
623 }
624
625 func (ks *keepstore) BlockTrash(ctx context.Context, locator string) error {
626         if !ks.cluster.Collections.BlobTrash {
627                 return errMethodNotAllowed
628         }
629         li, err := parseLocator(locator)
630         if err != nil {
631                 return err
632         }
633         var errToCaller error = os.ErrNotExist
634         for _, mnt := range ks.mounts {
635                 if !mnt.AllowTrash {
636                         continue
637                 }
638                 if ctx.Err() != nil {
639                         return ctx.Err()
640                 }
641                 t, err := mnt.Mtime(li.hash)
642                 if err == nil && time.Now().Sub(t) > ks.cluster.Collections.BlobSigningTTL.Duration() {
643                         err = mnt.BlockTrash(li.hash)
644                 }
645                 if os.IsNotExist(errToCaller) || (errToCaller == nil && !os.IsNotExist(err)) {
646                         errToCaller = err
647                 }
648         }
649         return errToCaller
650 }
651
652 func (ks *keepstore) Mounts() []*mount {
653         return ks.mountsR
654 }
655
656 func (ks *keepstore) Index(ctx context.Context, opts IndexOptions) error {
657         mounts := ks.mountsR
658         if opts.MountUUID != "" {
659                 mnt, ok := ks.mounts[opts.MountUUID]
660                 if !ok {
661                         return os.ErrNotExist
662                 }
663                 mounts = []*mount{mnt}
664         }
665         for _, mnt := range mounts {
666                 err := mnt.Index(ctx, opts.Prefix, opts.WriteTo)
667                 if err != nil {
668                         return err
669                 }
670         }
671         return nil
672 }
673
674 func ctxToken(ctx context.Context) string {
675         if c, ok := auth.FromContext(ctx); ok && len(c.Tokens) > 0 {
676                 return c.Tokens[0]
677         } else {
678                 return ""
679         }
680 }
681
682 type locatorInfo struct {
683         hash   string
684         size   int
685         remote bool
686         signed bool
687 }
688
689 func parseLocator(loc string) (locatorInfo, error) {
690         var li locatorInfo
691         for i, part := range strings.Split(loc, "+") {
692                 if i == 0 {
693                         if len(part) != 32 {
694                                 return li, errInvalidLocator
695                         }
696                         li.hash = part
697                         continue
698                 }
699                 if i == 1 {
700                         if size, err := strconv.Atoi(part); err == nil {
701                                 li.size = size
702                                 continue
703                         }
704                 }
705                 if len(part) == 0 {
706                         return li, errInvalidLocator
707                 }
708                 if part[0] == 'A' {
709                         li.signed = true
710                 }
711                 if part[0] == 'R' {
712                         li.remote = true
713                 }
714                 if part[0] >= '0' && part[0] <= '9' {
715                         // size, if present at all, must come first
716                         return li, errInvalidLocator
717                 }
718         }
719         return li, nil
720 }