]> git.arvados.org - arvados.git/blob - services/keepstore/keepstore.go
22703: Enable email notifications before testing them.
[arvados.git] / services / keepstore / keepstore.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Package keepstore implements the keepstore service component and
6 // back-end storage drivers.
7 //
8 // It is an internal module, only intended to be imported by
9 // /cmd/arvados-server and other server-side components in this
10 // repository.
11 package keepstore
12
13 import (
14         "bytes"
15         "context"
16         "crypto/md5"
17         "errors"
18         "fmt"
19         "io"
20         "net/http"
21         "os"
22         "sort"
23         "strconv"
24         "strings"
25         "sync"
26         "sync/atomic"
27         "time"
28
29         "git.arvados.org/arvados.git/sdk/go/arvados"
30         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
31         "git.arvados.org/arvados.git/sdk/go/auth"
32         "git.arvados.org/arvados.git/sdk/go/ctxlog"
33         "git.arvados.org/arvados.git/sdk/go/httpserver"
34         "git.arvados.org/arvados.git/sdk/go/keepclient"
35         "github.com/prometheus/client_golang/prometheus"
36         "github.com/sirupsen/logrus"
37 )
38
39 // Maximum size of a keep block is 64 MiB.
40 const BlockSize = 1 << 26
41
42 var (
43         errChecksum          = httpserver.ErrorWithStatus(errors.New("checksum mismatch in stored data"), http.StatusBadGateway)
44         errNoTokenProvided   = httpserver.ErrorWithStatus(errors.New("no token provided in Authorization header"), http.StatusUnauthorized)
45         errMethodNotAllowed  = httpserver.ErrorWithStatus(errors.New("method not allowed"), http.StatusMethodNotAllowed)
46         errVolumeUnavailable = httpserver.ErrorWithStatus(errors.New("volume unavailable"), http.StatusServiceUnavailable)
47         errCollision         = httpserver.ErrorWithStatus(errors.New("hash collision"), http.StatusInternalServerError)
48         errExpiredSignature  = httpserver.ErrorWithStatus(errors.New("expired signature"), http.StatusUnauthorized)
49         errInvalidSignature  = httpserver.ErrorWithStatus(errors.New("invalid signature"), http.StatusBadRequest)
50         errInvalidLocator    = httpserver.ErrorWithStatus(errors.New("invalid locator"), http.StatusBadRequest)
51         errFull              = httpserver.ErrorWithStatus(errors.New("insufficient storage"), http.StatusInsufficientStorage)
52         errTooLarge          = httpserver.ErrorWithStatus(errors.New("request entity too large"), http.StatusRequestEntityTooLarge)
53         driver               = make(map[string]volumeDriver)
54 )
55
56 type indexOptions struct {
57         MountUUID string
58         Prefix    string
59         WriteTo   io.Writer
60 }
61
62 type mount struct {
63         arvados.KeepMount
64         volume
65         priority int
66 }
67
68 type keepstore struct {
69         cluster    *arvados.Cluster
70         logger     logrus.FieldLogger
71         serviceURL arvados.URL
72         mounts     map[string]*mount
73         mountsR    []*mount
74         mountsW    []*mount
75         bufferPool *bufferPool
76
77         iostats map[volume]*ioStats
78
79         remoteClients    map[string]*keepclient.KeepClient
80         remoteClientsMtx sync.Mutex
81 }
82
83 func newKeepstore(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) (*keepstore, error) {
84         logger := ctxlog.FromContext(ctx)
85
86         if cluster.API.MaxConcurrentRequests > 0 && cluster.API.MaxConcurrentRequests < cluster.API.MaxKeepBlobBuffers {
87                 logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", cluster.API.MaxKeepBlobBuffers, cluster.API.MaxConcurrentRequests)
88         }
89
90         if cluster.Collections.BlobSigningKey != "" {
91         } else if cluster.Collections.BlobSigning {
92                 return nil, errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey")
93         } else {
94                 logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.")
95         }
96
97         if cluster.API.MaxKeepBlobBuffers <= 0 {
98                 return nil, fmt.Errorf("API.MaxKeepBlobBuffers must be greater than zero")
99         }
100         bufferPool := newBufferPool(logger, cluster.API.MaxKeepBlobBuffers, reg)
101
102         ks := &keepstore{
103                 cluster:       cluster,
104                 logger:        logger,
105                 serviceURL:    serviceURL,
106                 bufferPool:    bufferPool,
107                 remoteClients: make(map[string]*keepclient.KeepClient),
108         }
109
110         err := ks.setupMounts(newVolumeMetricsVecs(reg))
111         if err != nil {
112                 return nil, err
113         }
114
115         return ks, nil
116 }
117
118 func (ks *keepstore) setupMounts(metrics *volumeMetricsVecs) error {
119         ks.mounts = make(map[string]*mount)
120         if len(ks.cluster.Volumes) == 0 {
121                 return errors.New("no volumes configured")
122         }
123         for uuid, cfgvol := range ks.cluster.Volumes {
124                 va, ok := cfgvol.AccessViaHosts[ks.serviceURL]
125                 if !ok && len(cfgvol.AccessViaHosts) > 0 {
126                         continue
127                 }
128                 dri, ok := driver[cfgvol.Driver]
129                 if !ok {
130                         return fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
131                 }
132                 vol, err := dri(newVolumeParams{
133                         UUID:         uuid,
134                         Cluster:      ks.cluster,
135                         ConfigVolume: cfgvol,
136                         Logger:       ks.logger,
137                         MetricsVecs:  metrics,
138                         BufferPool:   ks.bufferPool,
139                 })
140                 if err != nil {
141                         return fmt.Errorf("error initializing volume %s: %s", uuid, err)
142                 }
143                 sc := cfgvol.StorageClasses
144                 if len(sc) == 0 {
145                         sc = map[string]bool{"default": true}
146                 }
147                 repl := cfgvol.Replication
148                 if repl < 1 {
149                         repl = 1
150                 }
151                 pri := 0
152                 for class, in := range cfgvol.StorageClasses {
153                         p := ks.cluster.StorageClasses[class].Priority
154                         if in && p > pri {
155                                 pri = p
156                         }
157                 }
158                 mnt := &mount{
159                         volume:   vol,
160                         priority: pri,
161                         KeepMount: arvados.KeepMount{
162                                 UUID:           uuid,
163                                 DeviceID:       vol.DeviceID(),
164                                 AllowWrite:     !va.ReadOnly && !cfgvol.ReadOnly,
165                                 AllowTrash:     !va.ReadOnly && (!cfgvol.ReadOnly || cfgvol.AllowTrashWhenReadOnly),
166                                 Replication:    repl,
167                                 StorageClasses: sc,
168                         },
169                 }
170                 ks.mounts[uuid] = mnt
171                 ks.logger.Printf("started volume %s (%s), AllowWrite=%v, AllowTrash=%v", uuid, vol.DeviceID(), mnt.AllowWrite, mnt.AllowTrash)
172         }
173         if len(ks.mounts) == 0 {
174                 return fmt.Errorf("no volumes configured for %s", ks.serviceURL)
175         }
176
177         ks.mountsR = nil
178         ks.mountsW = nil
179         for _, mnt := range ks.mounts {
180                 ks.mountsR = append(ks.mountsR, mnt)
181                 if mnt.AllowWrite {
182                         ks.mountsW = append(ks.mountsW, mnt)
183                 }
184         }
185         // Sorting mounts by UUID makes behavior more predictable, and
186         // is convenient for testing -- for example, "index all
187         // volumes" and "trash block on all volumes" will visit
188         // volumes in predictable order.
189         sort.Slice(ks.mountsR, func(i, j int) bool { return ks.mountsR[i].UUID < ks.mountsR[j].UUID })
190         sort.Slice(ks.mountsW, func(i, j int) bool { return ks.mountsW[i].UUID < ks.mountsW[j].UUID })
191         return nil
192 }
193
194 // checkLocatorSignature checks that locator has a valid signature.
195 // If the BlobSigning config is false, it returns nil even if the
196 // signature is invalid or missing.
197 func (ks *keepstore) checkLocatorSignature(ctx context.Context, locator string) error {
198         if !ks.cluster.Collections.BlobSigning {
199                 return nil
200         }
201         token := ctxToken(ctx)
202         if token == "" {
203                 return errNoTokenProvided
204         }
205         err := arvados.VerifySignature(locator, token, ks.cluster.Collections.BlobSigningTTL.Duration(), []byte(ks.cluster.Collections.BlobSigningKey))
206         if err == arvados.ErrSignatureExpired {
207                 return errExpiredSignature
208         } else if err != nil {
209                 return errInvalidSignature
210         }
211         return nil
212 }
213
214 // signLocator signs the locator for the given token, if possible.
215 // Note this signs if the BlobSigningKey config is available, even if
216 // the BlobSigning config is false.
217 func (ks *keepstore) signLocator(token, locator string) string {
218         if token == "" || len(ks.cluster.Collections.BlobSigningKey) == 0 {
219                 return locator
220         }
221         ttl := ks.cluster.Collections.BlobSigningTTL.Duration()
222         return arvados.SignLocator(locator, token, time.Now().Add(ttl), ttl, []byte(ks.cluster.Collections.BlobSigningKey))
223 }
224
225 func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (n int, err error) {
226         li, err := getLocatorInfo(opts.Locator)
227         if err != nil {
228                 return 0, err
229         }
230         if opts.CheckCacheOnly {
231                 return 0, arvados.ErrNotCached
232         }
233         out := opts.WriteTo
234         if rw, ok := out.(http.ResponseWriter); ok && li.size > 0 {
235                 out = &setSizeOnWrite{ResponseWriter: rw, size: li.size}
236         }
237         if li.remote && !li.signed {
238                 return ks.blockReadRemote(ctx, opts)
239         }
240         if err := ks.checkLocatorSignature(ctx, opts.Locator); err != nil {
241                 return 0, err
242         }
243         hashcheck := md5.New()
244         if li.size > 0 {
245                 out = newHashCheckWriter(out, hashcheck, int64(li.size), li.hash)
246         } else {
247                 out = io.MultiWriter(out, hashcheck)
248         }
249
250         buf, err := ks.bufferPool.GetContext(ctx)
251         if err != nil {
252                 return 0, err
253         }
254         defer ks.bufferPool.Put(buf)
255         streamer := newStreamWriterAt(out, 65536, buf)
256         defer streamer.Close()
257
258         var errToCaller error = os.ErrNotExist
259         for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
260                 if ctx.Err() != nil {
261                         return 0, ctx.Err()
262                 }
263                 err := mnt.BlockRead(ctx, li.hash, streamer)
264                 if err != nil {
265                         if streamer.WroteAt() != 0 {
266                                 // BlockRead encountered an error
267                                 // after writing some data, so it's
268                                 // too late to try another
269                                 // volume. Flush streamer before
270                                 // calling Wrote() to ensure our
271                                 // return value accurately reflects
272                                 // the number of bytes written to
273                                 // opts.WriteTo.
274                                 streamer.Close()
275                                 return streamer.Wrote(), err
276                         }
277                         if !os.IsNotExist(err) {
278                                 errToCaller = err
279                         }
280                         continue
281                 }
282                 if li.size == 0 {
283                         // hashCheckingWriter isn't in use because we
284                         // don't know the expected size. All we can do
285                         // is check after writing all the data, and
286                         // trust the caller is doing a HEAD request so
287                         // it's not too late to set an error code in
288                         // the response header.
289                         err = streamer.Close()
290                         if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil {
291                                 err = errChecksum
292                         }
293                         if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
294                                 // We didn't set the content-length header
295                                 // above because we didn't know the block size
296                                 // until now.
297                                 rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt()))
298                         }
299                         return streamer.WroteAt(), err
300                 } else if streamer.WroteAt() != li.size {
301                         // If the backend read fewer bytes than
302                         // expected but returns no error, we can
303                         // classify this as a checksum error (even
304                         // though hashCheckWriter doesn't know that
305                         // yet, it's just waiting for the next
306                         // write). If our caller is serving a GET
307                         // request it's too late to do anything about
308                         // it anyway, but if it's a HEAD request the
309                         // caller can still change the response status
310                         // code.
311                         return streamer.WroteAt(), errChecksum
312                 }
313                 // Ensure streamer flushes all buffered data without
314                 // errors.
315                 err = streamer.Close()
316                 return streamer.Wrote(), err
317         }
318         return 0, errToCaller
319 }
320
321 func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
322         token := ctxToken(ctx)
323         if token == "" {
324                 return 0, errNoTokenProvided
325         }
326         var remoteClient *keepclient.KeepClient
327         var parts []string
328         li, err := getLocatorInfo(opts.Locator)
329         if err != nil {
330                 return 0, err
331         }
332         for i, part := range strings.Split(opts.Locator, "+") {
333                 switch {
334                 case i == 0:
335                         // don't try to parse hash part as hint
336                 case strings.HasPrefix(part, "A"):
337                         // drop local permission hint
338                         continue
339                 case len(part) > 7 && part[0] == 'R' && part[6] == '-':
340                         remoteID := part[1:6]
341                         remote, ok := ks.cluster.RemoteClusters[remoteID]
342                         if !ok {
343                                 return 0, httpserver.ErrorWithStatus(errors.New("remote cluster not configured"), http.StatusBadRequest)
344                         }
345                         kc, err := ks.remoteClient(remoteID, remote, token)
346                         if err == auth.ErrObsoleteToken {
347                                 return 0, httpserver.ErrorWithStatus(err, http.StatusBadRequest)
348                         } else if err != nil {
349                                 return 0, err
350                         }
351                         remoteClient = kc
352                         part = "A" + part[7:]
353                 }
354                 parts = append(parts, part)
355         }
356         if remoteClient == nil {
357                 return 0, httpserver.ErrorWithStatus(errors.New("invalid remote hint"), http.StatusBadRequest)
358         }
359         locator := strings.Join(parts, "+")
360         if opts.LocalLocator == nil {
361                 // Read from remote cluster and stream response back
362                 // to caller
363                 if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size > 0 {
364                         rw.Header().Set("Content-Length", fmt.Sprintf("%d", li.size))
365                 }
366                 return remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
367                         Locator: locator,
368                         WriteTo: opts.WriteTo,
369                 })
370         }
371         // We must call LocalLocator before writing any data to
372         // opts.WriteTo, otherwise the caller can't put the local
373         // locator in a response header.  So we copy into memory,
374         // generate the local signature, then copy from memory to
375         // opts.WriteTo.
376         buf, err := ks.bufferPool.GetContext(ctx)
377         if err != nil {
378                 return 0, err
379         }
380         defer ks.bufferPool.Put(buf)
381         writebuf := bytes.NewBuffer(buf[:0])
382         ks.logger.Infof("blockReadRemote(%s): remote read(%s)", opts.Locator, locator)
383         _, err = remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
384                 Locator: locator,
385                 WriteTo: writebuf,
386         })
387         if err != nil {
388                 return 0, err
389         }
390         resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
391                 Hash: locator,
392                 Data: writebuf.Bytes(),
393         })
394         if err != nil {
395                 return 0, err
396         }
397         opts.LocalLocator(resp.Locator)
398         if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
399                 rw.Header().Set("Content-Length", fmt.Sprintf("%d", writebuf.Len()))
400         }
401         n, err := io.Copy(opts.WriteTo, bytes.NewReader(writebuf.Bytes()))
402         return int(n), err
403 }
404
405 func (ks *keepstore) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
406         ks.remoteClientsMtx.Lock()
407         kc, ok := ks.remoteClients[remoteID]
408         ks.remoteClientsMtx.Unlock()
409         if !ok {
410                 c := &arvados.Client{
411                         APIHost:   remoteCluster.Host,
412                         AuthToken: "xxx",
413                         Insecure:  remoteCluster.Insecure,
414                 }
415                 ac, err := arvadosclient.New(c)
416                 if err != nil {
417                         return nil, err
418                 }
419                 kc, err = keepclient.MakeKeepClient(ac)
420                 if err != nil {
421                         return nil, err
422                 }
423                 kc.DiskCacheSize = keepclient.DiskCacheDisabled
424
425                 ks.remoteClientsMtx.Lock()
426                 ks.remoteClients[remoteID] = kc
427                 ks.remoteClientsMtx.Unlock()
428         }
429         accopy := *kc.Arvados
430         accopy.ApiToken = token
431         kccopy := kc.Clone()
432         kccopy.Arvados = &accopy
433         token, err := auth.SaltToken(token, remoteID)
434         if err != nil {
435                 return nil, err
436         }
437         kccopy.Arvados.ApiToken = token
438         return kccopy, nil
439 }
440
441 // BlockWrite writes a block to one or more volumes.
442 func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
443         var resp arvados.BlockWriteResponse
444         var hash string
445         if opts.Data == nil {
446                 buf, err := ks.bufferPool.GetContext(ctx)
447                 if err != nil {
448                         return resp, err
449                 }
450                 defer ks.bufferPool.Put(buf)
451                 w := bytes.NewBuffer(buf[:0])
452                 h := md5.New()
453                 limitedReader := &io.LimitedReader{R: opts.Reader, N: BlockSize}
454                 n, err := io.Copy(io.MultiWriter(w, h), limitedReader)
455                 if err != nil {
456                         return resp, err
457                 }
458                 if limitedReader.N == 0 {
459                         // Data size is either exactly BlockSize, or too big.
460                         n, err := opts.Reader.Read(make([]byte, 1))
461                         if n > 0 {
462                                 return resp, httpserver.ErrorWithStatus(err, http.StatusRequestEntityTooLarge)
463                         }
464                         if err != io.EOF {
465                                 return resp, err
466                         }
467                 }
468                 opts.Data = buf[:n]
469                 if opts.DataSize != 0 && int(n) != opts.DataSize {
470                         return resp, httpserver.ErrorWithStatus(fmt.Errorf("content length %d did not match specified data size %d", n, opts.DataSize), http.StatusBadRequest)
471                 }
472                 hash = fmt.Sprintf("%x", h.Sum(nil))
473         } else {
474                 hash = fmt.Sprintf("%x", md5.Sum(opts.Data))
475         }
476         if opts.Hash != "" && !strings.HasPrefix(opts.Hash, hash) {
477                 return resp, httpserver.ErrorWithStatus(fmt.Errorf("content hash %s did not match specified locator %s", hash, opts.Hash), http.StatusBadRequest)
478         }
479         rvzmounts := ks.rendezvous(hash, ks.mountsW)
480         result := newPutProgress(opts.StorageClasses)
481         for _, mnt := range rvzmounts {
482                 if !result.Want(mnt) {
483                         continue
484                 }
485                 cmp := &checkEqual{Expect: opts.Data}
486                 if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
487                         if !cmp.Equal() {
488                                 return resp, errCollision
489                         }
490                         err := mnt.BlockTouch(hash)
491                         if err == nil {
492                                 result.Add(mnt)
493                         }
494                 }
495         }
496         var allFull atomic.Bool
497         allFull.Store(true)
498         // pending tracks what result will be if all outstanding
499         // writes succeed.
500         pending := result.Copy()
501         cond := sync.NewCond(new(sync.Mutex))
502         cond.L.Lock()
503         var wg sync.WaitGroup
504 nextmnt:
505         for _, mnt := range rvzmounts {
506                 for {
507                         if result.Done() || ctx.Err() != nil {
508                                 break nextmnt
509                         }
510                         if !result.Want(mnt) {
511                                 continue nextmnt
512                         }
513                         if pending.Want(mnt) {
514                                 break
515                         }
516                         // This mount might not be needed, depending
517                         // on the outcome of pending writes. Wait for
518                         // a pending write to finish, then check
519                         // again.
520                         cond.Wait()
521                 }
522                 mnt := mnt
523                 logger := ks.logger.WithField("mount", mnt.UUID)
524                 pending.Add(mnt)
525                 wg.Add(1)
526                 go func() {
527                         defer wg.Done()
528                         logger.Debug("start write")
529                         err := mnt.BlockWrite(ctx, hash, opts.Data)
530                         cond.L.Lock()
531                         defer cond.L.Unlock()
532                         defer cond.Broadcast()
533                         if err != nil {
534                                 logger.Debug("write failed")
535                                 pending.Sub(mnt)
536                                 if err != errFull {
537                                         allFull.Store(false)
538                                 }
539                         } else {
540                                 result.Add(mnt)
541                                 pending.Sub(mnt)
542                         }
543                 }()
544         }
545         cond.L.Unlock()
546         wg.Wait()
547         if ctx.Err() != nil {
548                 return resp, ctx.Err()
549         }
550         if result.Done() || result.totalReplication > 0 {
551                 resp = arvados.BlockWriteResponse{
552                         Locator:        ks.signLocator(ctxToken(ctx), fmt.Sprintf("%s+%d", hash, len(opts.Data))),
553                         Replicas:       result.totalReplication,
554                         StorageClasses: result.classDone,
555                 }
556                 return resp, nil
557         }
558         if allFull.Load() {
559                 return resp, errFull
560         }
561         return resp, errVolumeUnavailable
562 }
563
564 // rendezvous sorts the given mounts by descending priority, then by
565 // rendezvous order for the given locator.
566 func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount {
567         hash := locator
568         if len(hash) > 32 {
569                 hash = hash[:32]
570         }
571         // copy the provided []*mount before doing an in-place sort
572         mnts = append([]*mount(nil), mnts...)
573         weight := make(map[*mount]string)
574         for _, mnt := range mnts {
575                 uuidpart := mnt.UUID
576                 if len(uuidpart) == 27 {
577                         // strip zzzzz-yyyyy- prefixes
578                         uuidpart = uuidpart[12:]
579                 }
580                 weight[mnt] = fmt.Sprintf("%x", md5.Sum([]byte(hash+uuidpart)))
581         }
582         sort.Slice(mnts, func(i, j int) bool {
583                 if p := mnts[i].priority - mnts[j].priority; p != 0 {
584                         return p > 0
585                 }
586                 return weight[mnts[i]] < weight[mnts[j]]
587         })
588         return mnts
589 }
590
591 // checkEqual reports whether the data written to it (via io.WriterAt
592 // interface) is equal to the expected data.
593 //
594 // Expect should not be changed after the first Write.
595 //
596 // Results are undefined if WriteAt is called with overlapping ranges.
597 type checkEqual struct {
598         Expect   []byte
599         equal    atomic.Int64
600         notequal atomic.Bool
601 }
602
603 func (ce *checkEqual) Equal() bool {
604         return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect))
605 }
606
607 func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) {
608         endpos := int(offset) + len(p)
609         if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) {
610                 ce.equal.Add(int64(len(p)))
611         } else {
612                 ce.notequal.Store(true)
613         }
614         return len(p), nil
615 }
616
617 func (ks *keepstore) BlockUntrash(ctx context.Context, locator string) error {
618         li, err := getLocatorInfo(locator)
619         if err != nil {
620                 return err
621         }
622         var errToCaller error = os.ErrNotExist
623         for _, mnt := range ks.mountsW {
624                 if ctx.Err() != nil {
625                         return ctx.Err()
626                 }
627                 err := mnt.BlockUntrash(li.hash)
628                 if err == nil {
629                         errToCaller = nil
630                 } else if !os.IsNotExist(err) && errToCaller != nil {
631                         errToCaller = err
632                 }
633         }
634         return errToCaller
635 }
636
637 func (ks *keepstore) BlockTouch(ctx context.Context, locator string) error {
638         li, err := getLocatorInfo(locator)
639         if err != nil {
640                 return err
641         }
642         var errToCaller error = os.ErrNotExist
643         for _, mnt := range ks.mountsW {
644                 if ctx.Err() != nil {
645                         return ctx.Err()
646                 }
647                 err := mnt.BlockTouch(li.hash)
648                 if err == nil {
649                         return nil
650                 }
651                 if !os.IsNotExist(err) {
652                         errToCaller = err
653                 }
654         }
655         return errToCaller
656 }
657
658 func (ks *keepstore) BlockTrash(ctx context.Context, locator string) error {
659         if !ks.cluster.Collections.BlobTrash {
660                 return errMethodNotAllowed
661         }
662         li, err := getLocatorInfo(locator)
663         if err != nil {
664                 return err
665         }
666         var errToCaller error = os.ErrNotExist
667         for _, mnt := range ks.mounts {
668                 if !mnt.AllowTrash {
669                         continue
670                 }
671                 if ctx.Err() != nil {
672                         return ctx.Err()
673                 }
674                 t, err := mnt.Mtime(li.hash)
675                 if err == nil && time.Now().Sub(t) > ks.cluster.Collections.BlobSigningTTL.Duration() {
676                         err = mnt.BlockTrash(li.hash)
677                 }
678                 if os.IsNotExist(errToCaller) || (errToCaller == nil && !os.IsNotExist(err)) {
679                         errToCaller = err
680                 }
681         }
682         return errToCaller
683 }
684
685 func (ks *keepstore) Mounts() []*mount {
686         return ks.mountsR
687 }
688
689 func (ks *keepstore) Index(ctx context.Context, opts indexOptions) error {
690         mounts := ks.mountsR
691         if opts.MountUUID != "" {
692                 mnt, ok := ks.mounts[opts.MountUUID]
693                 if !ok {
694                         return os.ErrNotExist
695                 }
696                 mounts = []*mount{mnt}
697         }
698         for _, mnt := range mounts {
699                 err := mnt.Index(ctx, opts.Prefix, opts.WriteTo)
700                 if err != nil {
701                         return err
702                 }
703         }
704         return nil
705 }
706
707 func ctxToken(ctx context.Context) string {
708         if c, ok := auth.FromContext(ctx); ok && len(c.Tokens) > 0 {
709                 return c.Tokens[0]
710         } else {
711                 return ""
712         }
713 }
714
715 // locatorInfo expresses the attributes of a locator that are relevant
716 // for keepstore decision-making.
717 type locatorInfo struct {
718         hash   string
719         size   int
720         remote bool // locator has a +R hint
721         signed bool // locator has a +A hint
722 }
723
724 func getLocatorInfo(loc string) (locatorInfo, error) {
725         var li locatorInfo
726         plus := 0    // number of '+' chars seen so far
727         partlen := 0 // chars since last '+'
728         for i, c := range loc + "+" {
729                 if c == '+' {
730                         if partlen == 0 {
731                                 // double/leading/trailing '+'
732                                 return li, errInvalidLocator
733                         }
734                         if plus == 0 {
735                                 if i != 32 {
736                                         return li, errInvalidLocator
737                                 }
738                                 li.hash = loc[:i]
739                         }
740                         if plus == 1 {
741                                 if size, err := strconv.Atoi(loc[i-partlen : i]); err == nil {
742                                         li.size = size
743                                 }
744                         }
745                         plus++
746                         partlen = 0
747                         continue
748                 }
749                 partlen++
750                 if partlen == 1 {
751                         if c == 'A' {
752                                 li.signed = true
753                         }
754                         if c == 'R' {
755                                 li.remote = true
756                         }
757                         if plus > 1 && c >= '0' && c <= '9' {
758                                 // size, if present at all, must come first
759                                 return li, errInvalidLocator
760                         }
761                 }
762                 if plus == 0 && !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) {
763                         // non-hexadecimal char in hash part
764                         return li, errInvalidLocator
765                 }
766         }
767         return li, nil
768 }