1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 // Package keepstore implements the keepstore service component and
6 // back-end storage drivers.
8 // It is an internal module, only intended to be imported by
9 // /cmd/arvados-server and other server-side components in this
29 "git.arvados.org/arvados.git/sdk/go/arvados"
30 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
31 "git.arvados.org/arvados.git/sdk/go/auth"
32 "git.arvados.org/arvados.git/sdk/go/ctxlog"
33 "git.arvados.org/arvados.git/sdk/go/httpserver"
34 "git.arvados.org/arvados.git/sdk/go/keepclient"
35 "github.com/prometheus/client_golang/prometheus"
36 "github.com/sirupsen/logrus"
39 // Maximum size of a keep block is 64 MiB.
40 const BlockSize = 1 << 26
43 errChecksum = httpserver.ErrorWithStatus(errors.New("checksum mismatch in stored data"), http.StatusBadGateway)
44 errNoTokenProvided = httpserver.ErrorWithStatus(errors.New("no token provided in Authorization header"), http.StatusUnauthorized)
45 errMethodNotAllowed = httpserver.ErrorWithStatus(errors.New("method not allowed"), http.StatusMethodNotAllowed)
46 errVolumeUnavailable = httpserver.ErrorWithStatus(errors.New("volume unavailable"), http.StatusServiceUnavailable)
47 errCollision = httpserver.ErrorWithStatus(errors.New("hash collision"), http.StatusInternalServerError)
48 errExpiredSignature = httpserver.ErrorWithStatus(errors.New("expired signature"), http.StatusUnauthorized)
49 errInvalidSignature = httpserver.ErrorWithStatus(errors.New("invalid signature"), http.StatusBadRequest)
50 errInvalidLocator = httpserver.ErrorWithStatus(errors.New("invalid locator"), http.StatusBadRequest)
51 errFull = httpserver.ErrorWithStatus(errors.New("insufficient storage"), http.StatusInsufficientStorage)
52 errTooLarge = httpserver.ErrorWithStatus(errors.New("request entity too large"), http.StatusRequestEntityTooLarge)
53 driver = make(map[string]volumeDriver)
56 type indexOptions struct {
68 type keepstore struct {
69 cluster *arvados.Cluster
70 logger logrus.FieldLogger
71 serviceURL arvados.URL
72 mounts map[string]*mount
75 bufferPool *bufferPool
77 iostats map[volume]*ioStats
79 remoteClients map[string]*keepclient.KeepClient
80 remoteClientsMtx sync.Mutex
83 func newKeepstore(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) (*keepstore, error) {
84 logger := ctxlog.FromContext(ctx)
86 if cluster.API.MaxConcurrentRequests > 0 && cluster.API.MaxConcurrentRequests < cluster.API.MaxKeepBlobBuffers {
87 logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", cluster.API.MaxKeepBlobBuffers, cluster.API.MaxConcurrentRequests)
90 if cluster.Collections.BlobSigningKey != "" {
91 } else if cluster.Collections.BlobSigning {
92 return nil, errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey")
94 logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.")
97 if cluster.API.MaxKeepBlobBuffers <= 0 {
98 return nil, fmt.Errorf("API.MaxKeepBlobBuffers must be greater than zero")
100 bufferPool := newBufferPool(logger, cluster.API.MaxKeepBlobBuffers, reg)
105 serviceURL: serviceURL,
106 bufferPool: bufferPool,
107 remoteClients: make(map[string]*keepclient.KeepClient),
110 err := ks.setupMounts(newVolumeMetricsVecs(reg))
118 func (ks *keepstore) setupMounts(metrics *volumeMetricsVecs) error {
119 ks.mounts = make(map[string]*mount)
120 if len(ks.cluster.Volumes) == 0 {
121 return errors.New("no volumes configured")
123 for uuid, cfgvol := range ks.cluster.Volumes {
124 va, ok := cfgvol.AccessViaHosts[ks.serviceURL]
125 if !ok && len(cfgvol.AccessViaHosts) > 0 {
128 dri, ok := driver[cfgvol.Driver]
130 return fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
132 vol, err := dri(newVolumeParams{
135 ConfigVolume: cfgvol,
137 MetricsVecs: metrics,
138 BufferPool: ks.bufferPool,
141 return fmt.Errorf("error initializing volume %s: %s", uuid, err)
143 sc := cfgvol.StorageClasses
145 sc = map[string]bool{"default": true}
147 repl := cfgvol.Replication
152 for class, in := range cfgvol.StorageClasses {
153 p := ks.cluster.StorageClasses[class].Priority
161 KeepMount: arvados.KeepMount{
163 DeviceID: vol.DeviceID(),
164 AllowWrite: !va.ReadOnly && !cfgvol.ReadOnly,
165 AllowTrash: !va.ReadOnly && (!cfgvol.ReadOnly || cfgvol.AllowTrashWhenReadOnly),
170 ks.mounts[uuid] = mnt
171 ks.logger.Printf("started volume %s (%s), AllowWrite=%v, AllowTrash=%v", uuid, vol.DeviceID(), mnt.AllowWrite, mnt.AllowTrash)
173 if len(ks.mounts) == 0 {
174 return fmt.Errorf("no volumes configured for %s", ks.serviceURL)
179 for _, mnt := range ks.mounts {
180 ks.mountsR = append(ks.mountsR, mnt)
182 ks.mountsW = append(ks.mountsW, mnt)
185 // Sorting mounts by UUID makes behavior more predictable, and
186 // is convenient for testing -- for example, "index all
187 // volumes" and "trash block on all volumes" will visit
188 // volumes in predictable order.
189 sort.Slice(ks.mountsR, func(i, j int) bool { return ks.mountsR[i].UUID < ks.mountsR[j].UUID })
190 sort.Slice(ks.mountsW, func(i, j int) bool { return ks.mountsW[i].UUID < ks.mountsW[j].UUID })
194 // checkLocatorSignature checks that locator has a valid signature.
195 // If the BlobSigning config is false, it returns nil even if the
196 // signature is invalid or missing.
197 func (ks *keepstore) checkLocatorSignature(ctx context.Context, locator string) error {
198 if !ks.cluster.Collections.BlobSigning {
201 token := ctxToken(ctx)
203 return errNoTokenProvided
205 err := arvados.VerifySignature(locator, token, ks.cluster.Collections.BlobSigningTTL.Duration(), []byte(ks.cluster.Collections.BlobSigningKey))
206 if err == arvados.ErrSignatureExpired {
207 return errExpiredSignature
208 } else if err != nil {
209 return errInvalidSignature
214 // signLocator signs the locator for the given token, if possible.
215 // Note this signs if the BlobSigningKey config is available, even if
216 // the BlobSigning config is false.
217 func (ks *keepstore) signLocator(token, locator string) string {
218 if token == "" || len(ks.cluster.Collections.BlobSigningKey) == 0 {
221 ttl := ks.cluster.Collections.BlobSigningTTL.Duration()
222 return arvados.SignLocator(locator, token, time.Now().Add(ttl), ttl, []byte(ks.cluster.Collections.BlobSigningKey))
225 func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (n int, err error) {
226 li, err := getLocatorInfo(opts.Locator)
230 if opts.CheckCacheOnly {
231 return 0, arvados.ErrNotCached
234 if rw, ok := out.(http.ResponseWriter); ok && li.size > 0 {
235 out = &setSizeOnWrite{ResponseWriter: rw, size: li.size}
237 if li.remote && !li.signed {
238 return ks.blockReadRemote(ctx, opts)
240 if err := ks.checkLocatorSignature(ctx, opts.Locator); err != nil {
243 hashcheck := md5.New()
245 out = newHashCheckWriter(out, hashcheck, int64(li.size), li.hash)
247 out = io.MultiWriter(out, hashcheck)
250 buf, err := ks.bufferPool.GetContext(ctx)
254 defer ks.bufferPool.Put(buf)
255 streamer := newStreamWriterAt(out, 65536, buf)
256 defer streamer.Close()
258 var errToCaller error = os.ErrNotExist
259 for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
260 if ctx.Err() != nil {
263 err := mnt.BlockRead(ctx, li.hash, streamer)
265 if streamer.WroteAt() != 0 {
266 // BlockRead encountered an error
267 // after writing some data, so it's
268 // too late to try another
269 // volume. Flush streamer before
270 // calling Wrote() to ensure our
271 // return value accurately reflects
272 // the number of bytes written to
275 return streamer.Wrote(), err
277 if !os.IsNotExist(err) {
283 // hashCheckingWriter isn't in use because we
284 // don't know the expected size. All we can do
285 // is check after writing all the data, and
286 // trust the caller is doing a HEAD request so
287 // it's not too late to set an error code in
288 // the response header.
289 err = streamer.Close()
290 if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil {
293 if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
294 // We didn't set the content-length header
295 // above because we didn't know the block size
297 rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt()))
299 return streamer.WroteAt(), err
300 } else if streamer.WroteAt() != li.size {
301 // If the backend read fewer bytes than
302 // expected but returns no error, we can
303 // classify this as a checksum error (even
304 // though hashCheckWriter doesn't know that
305 // yet, it's just waiting for the next
306 // write). If our caller is serving a GET
307 // request it's too late to do anything about
308 // it anyway, but if it's a HEAD request the
309 // caller can still change the response status
311 return streamer.WroteAt(), errChecksum
313 // Ensure streamer flushes all buffered data without
315 err = streamer.Close()
316 return streamer.Wrote(), err
318 return 0, errToCaller
321 func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
322 token := ctxToken(ctx)
324 return 0, errNoTokenProvided
326 var remoteClient *keepclient.KeepClient
328 li, err := getLocatorInfo(opts.Locator)
332 for i, part := range strings.Split(opts.Locator, "+") {
335 // don't try to parse hash part as hint
336 case strings.HasPrefix(part, "A"):
337 // drop local permission hint
339 case len(part) > 7 && part[0] == 'R' && part[6] == '-':
340 remoteID := part[1:6]
341 remote, ok := ks.cluster.RemoteClusters[remoteID]
343 return 0, httpserver.ErrorWithStatus(errors.New("remote cluster not configured"), http.StatusBadRequest)
345 kc, err := ks.remoteClient(remoteID, remote, token)
346 if err == auth.ErrObsoleteToken {
347 return 0, httpserver.ErrorWithStatus(err, http.StatusBadRequest)
348 } else if err != nil {
352 part = "A" + part[7:]
354 parts = append(parts, part)
356 if remoteClient == nil {
357 return 0, httpserver.ErrorWithStatus(errors.New("invalid remote hint"), http.StatusBadRequest)
359 locator := strings.Join(parts, "+")
360 if opts.LocalLocator == nil {
361 // Read from remote cluster and stream response back
363 if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size > 0 {
364 rw.Header().Set("Content-Length", fmt.Sprintf("%d", li.size))
366 return remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
368 WriteTo: opts.WriteTo,
371 // We must call LocalLocator before writing any data to
372 // opts.WriteTo, otherwise the caller can't put the local
373 // locator in a response header. So we copy into memory,
374 // generate the local signature, then copy from memory to
376 buf, err := ks.bufferPool.GetContext(ctx)
380 defer ks.bufferPool.Put(buf)
381 writebuf := bytes.NewBuffer(buf[:0])
382 ks.logger.Infof("blockReadRemote(%s): remote read(%s)", opts.Locator, locator)
383 _, err = remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
390 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
392 Data: writebuf.Bytes(),
397 opts.LocalLocator(resp.Locator)
398 if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
399 rw.Header().Set("Content-Length", fmt.Sprintf("%d", writebuf.Len()))
401 n, err := io.Copy(opts.WriteTo, bytes.NewReader(writebuf.Bytes()))
405 func (ks *keepstore) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
406 ks.remoteClientsMtx.Lock()
407 kc, ok := ks.remoteClients[remoteID]
408 ks.remoteClientsMtx.Unlock()
410 c := &arvados.Client{
411 APIHost: remoteCluster.Host,
413 Insecure: remoteCluster.Insecure,
415 ac, err := arvadosclient.New(c)
419 kc, err = keepclient.MakeKeepClient(ac)
423 kc.DiskCacheSize = keepclient.DiskCacheDisabled
425 ks.remoteClientsMtx.Lock()
426 ks.remoteClients[remoteID] = kc
427 ks.remoteClientsMtx.Unlock()
429 accopy := *kc.Arvados
430 accopy.ApiToken = token
432 kccopy.Arvados = &accopy
433 token, err := auth.SaltToken(token, remoteID)
437 kccopy.Arvados.ApiToken = token
441 // BlockWrite writes a block to one or more volumes.
442 func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
443 var resp arvados.BlockWriteResponse
445 if opts.Data == nil {
446 buf, err := ks.bufferPool.GetContext(ctx)
450 defer ks.bufferPool.Put(buf)
451 w := bytes.NewBuffer(buf[:0])
453 limitedReader := &io.LimitedReader{R: opts.Reader, N: BlockSize}
454 n, err := io.Copy(io.MultiWriter(w, h), limitedReader)
458 if limitedReader.N == 0 {
459 // Data size is either exactly BlockSize, or too big.
460 n, err := opts.Reader.Read(make([]byte, 1))
462 return resp, httpserver.ErrorWithStatus(err, http.StatusRequestEntityTooLarge)
469 if opts.DataSize != 0 && int(n) != opts.DataSize {
470 return resp, httpserver.ErrorWithStatus(fmt.Errorf("content length %d did not match specified data size %d", n, opts.DataSize), http.StatusBadRequest)
472 hash = fmt.Sprintf("%x", h.Sum(nil))
474 hash = fmt.Sprintf("%x", md5.Sum(opts.Data))
476 if opts.Hash != "" && !strings.HasPrefix(opts.Hash, hash) {
477 return resp, httpserver.ErrorWithStatus(fmt.Errorf("content hash %s did not match specified locator %s", hash, opts.Hash), http.StatusBadRequest)
479 rvzmounts := ks.rendezvous(hash, ks.mountsW)
480 result := newPutProgress(opts.StorageClasses)
481 for _, mnt := range rvzmounts {
482 if !result.Want(mnt) {
485 cmp := &checkEqual{Expect: opts.Data}
486 if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
488 return resp, errCollision
490 err := mnt.BlockTouch(hash)
496 var allFull atomic.Bool
498 // pending tracks what result will be if all outstanding
500 pending := result.Copy()
501 cond := sync.NewCond(new(sync.Mutex))
503 var wg sync.WaitGroup
505 for _, mnt := range rvzmounts {
507 if result.Done() || ctx.Err() != nil {
510 if !result.Want(mnt) {
513 if pending.Want(mnt) {
516 // This mount might not be needed, depending
517 // on the outcome of pending writes. Wait for
518 // a pending write to finish, then check
523 logger := ks.logger.WithField("mount", mnt.UUID)
528 logger.Debug("start write")
529 err := mnt.BlockWrite(ctx, hash, opts.Data)
531 defer cond.L.Unlock()
532 defer cond.Broadcast()
534 logger.Debug("write failed")
547 if ctx.Err() != nil {
548 return resp, ctx.Err()
550 if result.Done() || result.totalReplication > 0 {
551 resp = arvados.BlockWriteResponse{
552 Locator: ks.signLocator(ctxToken(ctx), fmt.Sprintf("%s+%d", hash, len(opts.Data))),
553 Replicas: result.totalReplication,
554 StorageClasses: result.classDone,
561 return resp, errVolumeUnavailable
564 // rendezvous sorts the given mounts by descending priority, then by
565 // rendezvous order for the given locator.
566 func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount {
571 // copy the provided []*mount before doing an in-place sort
572 mnts = append([]*mount(nil), mnts...)
573 weight := make(map[*mount]string)
574 for _, mnt := range mnts {
576 if len(uuidpart) == 27 {
577 // strip zzzzz-yyyyy- prefixes
578 uuidpart = uuidpart[12:]
580 weight[mnt] = fmt.Sprintf("%x", md5.Sum([]byte(hash+uuidpart)))
582 sort.Slice(mnts, func(i, j int) bool {
583 if p := mnts[i].priority - mnts[j].priority; p != 0 {
586 return weight[mnts[i]] < weight[mnts[j]]
591 // checkEqual reports whether the data written to it (via io.WriterAt
592 // interface) is equal to the expected data.
594 // Expect should not be changed after the first Write.
596 // Results are undefined if WriteAt is called with overlapping ranges.
597 type checkEqual struct {
603 func (ce *checkEqual) Equal() bool {
604 return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect))
607 func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) {
608 endpos := int(offset) + len(p)
609 if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) {
610 ce.equal.Add(int64(len(p)))
612 ce.notequal.Store(true)
617 func (ks *keepstore) BlockUntrash(ctx context.Context, locator string) error {
618 li, err := getLocatorInfo(locator)
622 var errToCaller error = os.ErrNotExist
623 for _, mnt := range ks.mountsW {
624 if ctx.Err() != nil {
627 err := mnt.BlockUntrash(li.hash)
630 } else if !os.IsNotExist(err) && errToCaller != nil {
637 func (ks *keepstore) BlockTouch(ctx context.Context, locator string) error {
638 li, err := getLocatorInfo(locator)
642 var errToCaller error = os.ErrNotExist
643 for _, mnt := range ks.mountsW {
644 if ctx.Err() != nil {
647 err := mnt.BlockTouch(li.hash)
651 if !os.IsNotExist(err) {
658 func (ks *keepstore) BlockTrash(ctx context.Context, locator string) error {
659 if !ks.cluster.Collections.BlobTrash {
660 return errMethodNotAllowed
662 li, err := getLocatorInfo(locator)
666 var errToCaller error = os.ErrNotExist
667 for _, mnt := range ks.mounts {
671 if ctx.Err() != nil {
674 t, err := mnt.Mtime(li.hash)
675 if err == nil && time.Now().Sub(t) > ks.cluster.Collections.BlobSigningTTL.Duration() {
676 err = mnt.BlockTrash(li.hash)
678 if os.IsNotExist(errToCaller) || (errToCaller == nil && !os.IsNotExist(err)) {
685 func (ks *keepstore) Mounts() []*mount {
689 func (ks *keepstore) Index(ctx context.Context, opts indexOptions) error {
691 if opts.MountUUID != "" {
692 mnt, ok := ks.mounts[opts.MountUUID]
694 return os.ErrNotExist
696 mounts = []*mount{mnt}
698 for _, mnt := range mounts {
699 err := mnt.Index(ctx, opts.Prefix, opts.WriteTo)
707 func ctxToken(ctx context.Context) string {
708 if c, ok := auth.FromContext(ctx); ok && len(c.Tokens) > 0 {
715 // locatorInfo expresses the attributes of a locator that are relevant
716 // for keepstore decision-making.
717 type locatorInfo struct {
720 remote bool // locator has a +R hint
721 signed bool // locator has a +A hint
724 func getLocatorInfo(loc string) (locatorInfo, error) {
726 plus := 0 // number of '+' chars seen so far
727 partlen := 0 // chars since last '+'
728 for i, c := range loc + "+" {
731 // double/leading/trailing '+'
732 return li, errInvalidLocator
736 return li, errInvalidLocator
741 if size, err := strconv.Atoi(loc[i-partlen : i]); err == nil {
757 if plus > 1 && c >= '0' && c <= '9' {
758 // size, if present at all, must come first
759 return li, errInvalidLocator
762 if plus == 0 && !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) {
763 // non-hexadecimal char in hash part
764 return li, errInvalidLocator