1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25 "git.arvados.org/arvados.git/sdk/go/auth"
26 "git.arvados.org/arvados.git/sdk/go/ctxlog"
27 "git.arvados.org/arvados.git/sdk/go/httpserver"
28 "git.arvados.org/arvados.git/sdk/go/keepclient"
29 "github.com/prometheus/client_golang/prometheus"
30 "github.com/sirupsen/logrus"
33 // Maximum size of a keep block is 64 MiB.
34 const BlockSize = 1 << 26
37 errChecksum = httpserver.ErrorWithStatus(errors.New("checksum mismatch in stored data"), http.StatusBadGateway)
38 errNoTokenProvided = httpserver.ErrorWithStatus(errors.New("no token provided in Authorization header"), http.StatusUnauthorized)
39 errMethodNotAllowed = httpserver.ErrorWithStatus(errors.New("method not allowed"), http.StatusMethodNotAllowed)
40 errVolumeUnavailable = httpserver.ErrorWithStatus(errors.New("volume unavailable"), http.StatusServiceUnavailable)
41 errCollision = httpserver.ErrorWithStatus(errors.New("hash collision"), http.StatusInternalServerError)
42 errExpiredSignature = httpserver.ErrorWithStatus(errors.New("expired signature"), http.StatusUnauthorized)
43 errInvalidSignature = httpserver.ErrorWithStatus(errors.New("invalid signature"), http.StatusBadRequest)
44 errInvalidLocator = httpserver.ErrorWithStatus(errors.New("invalid locator"), http.StatusBadRequest)
45 errFull = httpserver.ErrorWithStatus(errors.New("insufficient storage"), http.StatusInsufficientStorage)
46 errTooLarge = httpserver.ErrorWithStatus(errors.New("request entity too large"), http.StatusRequestEntityTooLarge)
47 driver = make(map[string]volumeDriver)
50 type IndexOptions struct {
62 type keepstore struct {
63 cluster *arvados.Cluster
64 logger logrus.FieldLogger
65 serviceURL arvados.URL
66 mounts map[string]*mount
69 bufferPool *bufferPool
71 iostats map[volume]*ioStats
73 remoteClients map[string]*keepclient.KeepClient
74 remoteClientsMtx sync.Mutex
77 func newKeepstore(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) (*keepstore, error) {
78 logger := ctxlog.FromContext(ctx)
80 if cluster.API.MaxConcurrentRequests > 0 && cluster.API.MaxConcurrentRequests < cluster.API.MaxKeepBlobBuffers {
81 logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", cluster.API.MaxKeepBlobBuffers, cluster.API.MaxConcurrentRequests)
84 if cluster.Collections.BlobSigningKey != "" {
85 } else if cluster.Collections.BlobSigning {
86 return nil, errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey")
88 logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.")
91 if cluster.API.MaxKeepBlobBuffers <= 0 {
92 return nil, fmt.Errorf("API.MaxKeepBlobBuffers must be greater than zero")
94 bufferPool := newBufferPool(logger, cluster.API.MaxKeepBlobBuffers, reg)
99 serviceURL: serviceURL,
100 bufferPool: bufferPool,
101 remoteClients: make(map[string]*keepclient.KeepClient),
104 err := ks.setupMounts(newVolumeMetricsVecs(reg))
112 func (ks *keepstore) setupMounts(metrics *volumeMetricsVecs) error {
113 ks.mounts = make(map[string]*mount)
114 if len(ks.cluster.Volumes) == 0 {
115 return errors.New("no volumes configured")
117 for uuid, cfgvol := range ks.cluster.Volumes {
118 va, ok := cfgvol.AccessViaHosts[ks.serviceURL]
119 if !ok && len(cfgvol.AccessViaHosts) > 0 {
122 dri, ok := driver[cfgvol.Driver]
124 return fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
126 vol, err := dri(newVolumeParams{
129 ConfigVolume: cfgvol,
131 MetricsVecs: metrics,
132 BufferPool: ks.bufferPool,
135 return fmt.Errorf("error initializing volume %s: %s", uuid, err)
137 sc := cfgvol.StorageClasses
139 sc = map[string]bool{"default": true}
141 repl := cfgvol.Replication
146 for class, in := range cfgvol.StorageClasses {
147 p := ks.cluster.StorageClasses[class].Priority
155 KeepMount: arvados.KeepMount{
157 DeviceID: vol.DeviceID(),
158 AllowWrite: !va.ReadOnly && !cfgvol.ReadOnly,
159 AllowTrash: !va.ReadOnly && (!cfgvol.ReadOnly || cfgvol.AllowTrashWhenReadOnly),
164 ks.mounts[uuid] = mnt
165 ks.logger.Printf("started volume %s (%s), AllowWrite=%v, AllowTrash=%v", uuid, vol.DeviceID(), mnt.AllowWrite, mnt.AllowTrash)
167 if len(ks.mounts) == 0 {
168 return fmt.Errorf("no volumes configured for %s", ks.serviceURL)
173 for _, mnt := range ks.mounts {
174 ks.mountsR = append(ks.mountsR, mnt)
176 ks.mountsW = append(ks.mountsW, mnt)
179 // Sorting mounts by UUID makes behavior more predictable, and
180 // is convenient for testing -- for example, "index all
181 // volumes" and "trash block on all volumes" will visit
182 // volumes in predictable order.
183 sort.Slice(ks.mountsR, func(i, j int) bool { return ks.mountsR[i].UUID < ks.mountsR[j].UUID })
184 sort.Slice(ks.mountsW, func(i, j int) bool { return ks.mountsW[i].UUID < ks.mountsW[j].UUID })
188 // checkLocatorSignature checks that locator has a valid signature.
189 // If the BlobSigning config is false, it returns nil even if the
190 // signature is invalid or missing.
191 func (ks *keepstore) checkLocatorSignature(ctx context.Context, locator string) error {
192 if !ks.cluster.Collections.BlobSigning {
195 token := ctxToken(ctx)
197 return errNoTokenProvided
199 err := arvados.VerifySignature(locator, token, ks.cluster.Collections.BlobSigningTTL.Duration(), []byte(ks.cluster.Collections.BlobSigningKey))
200 if err == arvados.ErrSignatureExpired {
201 return errExpiredSignature
202 } else if err != nil {
203 return errInvalidSignature
208 // signLocator signs the locator for the given token, if possible.
209 // Note this signs if the BlobSigningKey config is available, even if
210 // the BlobSigning config is false.
211 func (ks *keepstore) signLocator(token, locator string) string {
212 if token == "" || len(ks.cluster.Collections.BlobSigningKey) == 0 {
215 ttl := ks.cluster.Collections.BlobSigningTTL.Duration()
216 return arvados.SignLocator(locator, token, time.Now().Add(ttl), ttl, []byte(ks.cluster.Collections.BlobSigningKey))
219 func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (n int, err error) {
220 li, err := parseLocator(opts.Locator)
225 if rw, ok := out.(http.ResponseWriter); ok && li.size > 0 {
226 out = &setSizeOnWrite{ResponseWriter: rw, size: li.size}
228 if li.remote && !li.signed {
229 return ks.blockReadRemote(ctx, opts)
231 if err := ks.checkLocatorSignature(ctx, opts.Locator); err != nil {
234 hashcheck := md5.New()
236 out = newHashCheckWriter(out, hashcheck, int64(li.size), li.hash)
238 out = io.MultiWriter(out, hashcheck)
240 var errToCaller error = os.ErrNotExist
241 for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
242 if ctx.Err() != nil {
245 n, err = mnt.BlockRead(ctx, li.hash, out)
246 if err == nil && li.size > 0 && n != li.size {
247 // If the backend read fewer bytes than
248 // expected but returns no error, we can
249 // classify this as a checksum error (even
250 // though hashCheckWriter doesn't know that
251 // yet, it's just waiting for the next
252 // write). If our caller is serving a GET
253 // request it's too late to do anything about
254 // it anyway, but if it's a HEAD request the
255 // caller can still change the response status
257 return n, errChecksum
259 if err == nil && li.size == 0 {
260 // hashCheckingWriter isn't in use because we
261 // don't know the expected size. All we can do
262 // is check after writing all the data, and
263 // trust the caller is doing a HEAD request so
264 // it's not too late to set an error code in
265 // the response header.
266 if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash {
267 return n, errChecksum
270 if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size == 0 && err == nil {
271 // We didn't set the content-length header
272 // above because we didn't know the block size
274 rw.Header().Set("Content-Length", fmt.Sprintf("%d", n))
276 if n > 0 || err == nil {
277 // success, or there's an error but we can't
278 // retry because we've already sent some data.
281 if !os.IsNotExist(err) {
282 // If some volume returns a transient error,
283 // return it to the caller instead of "Not
284 // found" so it can retry.
288 return 0, errToCaller
291 func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
292 ks.logger.Infof("blockReadRemote(%s)", opts.Locator)
293 token := ctxToken(ctx)
295 return 0, errNoTokenProvided
297 var remoteClient *keepclient.KeepClient
300 for i, part := range strings.Split(opts.Locator, "+") {
303 // don't try to parse hash part as hint
304 case strings.HasPrefix(part, "A"):
305 // drop local permission hint
307 case len(part) > 7 && part[0] == 'R' && part[6] == '-':
308 remoteID := part[1:6]
309 remote, ok := ks.cluster.RemoteClusters[remoteID]
311 return 0, httpserver.ErrorWithStatus(errors.New("remote cluster not configured"), http.StatusBadRequest)
313 kc, err := ks.remoteClient(remoteID, remote, token)
314 if err == auth.ErrObsoleteToken {
315 return 0, httpserver.ErrorWithStatus(err, http.StatusBadRequest)
316 } else if err != nil {
320 part = "A" + part[7:]
321 case len(part) > 0 && part[0] >= '0' && part[0] <= '9':
322 size, _ = strconv.Atoi(part)
324 parts = append(parts, part)
326 if remoteClient == nil {
327 return 0, httpserver.ErrorWithStatus(errors.New("invalid remote hint"), http.StatusBadRequest)
329 locator := strings.Join(parts, "+")
330 if opts.LocalLocator == nil {
331 // Read from remote cluster and stream response back
333 if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && size > 0 {
334 rw.Header().Set("Content-Length", fmt.Sprintf("%d", size))
336 return remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
338 WriteTo: opts.WriteTo,
341 // We must call LocalLocator before writing any data to
342 // opts.WriteTo, otherwise the caller can't put the local
343 // locator in a response header. So we copy into memory,
344 // generate the local signature, then copy from memory to
346 buf, err := ks.bufferPool.GetContext(ctx)
350 defer ks.bufferPool.Put(buf)
351 writebuf := bytes.NewBuffer(buf[:0])
352 ks.logger.Infof("blockReadRemote(%s): remote read(%s)", opts.Locator, locator)
353 _, err = remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
360 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
362 Data: writebuf.Bytes(),
367 opts.LocalLocator(resp.Locator)
368 if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
369 rw.Header().Set("Content-Length", fmt.Sprintf("%d", writebuf.Len()))
371 n, err := io.Copy(opts.WriteTo, bytes.NewReader(writebuf.Bytes()))
375 func (ks *keepstore) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
376 ks.remoteClientsMtx.Lock()
377 kc, ok := ks.remoteClients[remoteID]
378 ks.remoteClientsMtx.Unlock()
380 c := &arvados.Client{
381 APIHost: remoteCluster.Host,
383 Insecure: remoteCluster.Insecure,
385 ac, err := arvadosclient.New(c)
389 kc, err = keepclient.MakeKeepClient(ac)
393 kc.DiskCacheSize = keepclient.DiskCacheDisabled
395 ks.remoteClientsMtx.Lock()
396 ks.remoteClients[remoteID] = kc
397 ks.remoteClientsMtx.Unlock()
399 accopy := *kc.Arvados
400 accopy.ApiToken = token
402 kccopy.Arvados = &accopy
403 token, err := auth.SaltToken(token, remoteID)
407 kccopy.Arvados.ApiToken = token
411 // BlockWrite writes a block to one or more volumes.
412 func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
413 var resp arvados.BlockWriteResponse
415 if opts.Data == nil {
416 buf, err := ks.bufferPool.GetContext(ctx)
420 defer ks.bufferPool.Put(buf)
421 w := bytes.NewBuffer(buf[:0])
423 limitedReader := &io.LimitedReader{R: opts.Reader, N: BlockSize}
424 n, err := io.Copy(io.MultiWriter(w, h), limitedReader)
428 if limitedReader.N == 0 {
429 // Data size is either exactly BlockSize, or too big.
430 n, err := opts.Reader.Read(make([]byte, 1))
432 return resp, httpserver.ErrorWithStatus(err, http.StatusRequestEntityTooLarge)
439 if opts.DataSize != 0 && int(n) != opts.DataSize {
440 return resp, httpserver.ErrorWithStatus(fmt.Errorf("content length %d did not match specified data size %d", n, opts.DataSize), http.StatusBadRequest)
442 hash = fmt.Sprintf("%x", h.Sum(nil))
444 hash = fmt.Sprintf("%x", md5.Sum(opts.Data))
446 if opts.Hash != "" && !strings.HasPrefix(opts.Hash, hash) {
447 return resp, httpserver.ErrorWithStatus(fmt.Errorf("content hash %s did not match specified locator %s", hash, opts.Hash), http.StatusBadRequest)
449 rvzmounts := ks.rendezvous(hash, ks.mountsW)
450 result := newPutProgress(opts.StorageClasses)
451 for _, mnt := range rvzmounts {
452 if !result.Want(mnt) {
455 cmp := &checkEqual{Expect: opts.Data}
456 if _, err := mnt.BlockRead(ctx, hash, cmp); err == nil {
458 return resp, errCollision
460 err := mnt.BlockTouch(hash)
466 var allFull atomic.Bool
468 // pending tracks what result will be if all outstanding
470 pending := result.Copy()
471 cond := sync.NewCond(new(sync.Mutex))
473 var wg sync.WaitGroup
475 for _, mnt := range rvzmounts {
477 if result.Done() || ctx.Err() != nil {
480 if !result.Want(mnt) {
483 if pending.Want(mnt) {
486 // This mount might not be needed, depending
487 // on the outcome of pending writes. Wait for
488 // a pending write to finish, then check
493 logger := ks.logger.WithField("mount", mnt.UUID)
498 logger.Debug("start write")
499 err := mnt.BlockWrite(ctx, hash, opts.Data)
501 defer cond.L.Unlock()
502 defer cond.Broadcast()
504 logger.Debug("write failed")
517 if ctx.Err() != nil {
518 return resp, ctx.Err()
520 if result.Done() || result.totalReplication > 0 {
521 resp = arvados.BlockWriteResponse{
522 Locator: ks.signLocator(ctxToken(ctx), fmt.Sprintf("%s+%d", hash, len(opts.Data))),
523 Replicas: result.totalReplication,
524 StorageClasses: result.classDone,
531 return resp, errVolumeUnavailable
534 // rendezvous sorts the given mounts by descending priority, then by
535 // rendezvous order for the given locator.
536 func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount {
541 // copy the provided []*mount before doing an in-place sort
542 mnts = append([]*mount(nil), mnts...)
543 weight := make(map[*mount]string)
544 for _, mnt := range mnts {
546 if len(uuidpart) == 27 {
547 // strip zzzzz-yyyyy- prefixes
548 uuidpart = uuidpart[12:]
550 weight[mnt] = fmt.Sprintf("%x", md5.Sum([]byte(hash+uuidpart)))
552 sort.Slice(mnts, func(i, j int) bool {
553 if p := mnts[i].priority - mnts[j].priority; p != 0 {
556 return weight[mnts[i]] < weight[mnts[j]]
561 // checkEqual reports whether the data written to it (via io.Writer
562 // interface) is equal to the expected data.
564 // Expect should not be changed after the first Write.
565 type checkEqual struct {
570 func (ce *checkEqual) Equal() bool {
571 return ce.equalUntil == len(ce.Expect)
574 func (ce *checkEqual) Write(p []byte) (int, error) {
575 endpos := ce.equalUntil + len(p)
576 if ce.equalUntil >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[ce.equalUntil:endpos]) {
577 ce.equalUntil = endpos
584 func (ks *keepstore) BlockUntrash(ctx context.Context, locator string) error {
585 li, err := parseLocator(locator)
589 var errToCaller error = os.ErrNotExist
590 for _, mnt := range ks.mountsW {
591 if ctx.Err() != nil {
594 err := mnt.BlockUntrash(li.hash)
597 } else if !os.IsNotExist(err) && errToCaller != nil {
604 func (ks *keepstore) BlockTouch(ctx context.Context, locator string) error {
605 li, err := parseLocator(locator)
609 var errToCaller error = os.ErrNotExist
610 for _, mnt := range ks.mountsW {
611 if ctx.Err() != nil {
614 err := mnt.BlockTouch(li.hash)
618 if !os.IsNotExist(err) {
625 func (ks *keepstore) BlockTrash(ctx context.Context, locator string) error {
626 if !ks.cluster.Collections.BlobTrash {
627 return errMethodNotAllowed
629 li, err := parseLocator(locator)
633 var errToCaller error = os.ErrNotExist
634 for _, mnt := range ks.mounts {
638 if ctx.Err() != nil {
641 t, err := mnt.Mtime(li.hash)
642 if err == nil && time.Now().Sub(t) > ks.cluster.Collections.BlobSigningTTL.Duration() {
643 err = mnt.BlockTrash(li.hash)
645 if os.IsNotExist(errToCaller) || (errToCaller == nil && !os.IsNotExist(err)) {
652 func (ks *keepstore) Mounts() []*mount {
656 func (ks *keepstore) Index(ctx context.Context, opts IndexOptions) error {
658 if opts.MountUUID != "" {
659 mnt, ok := ks.mounts[opts.MountUUID]
661 return os.ErrNotExist
663 mounts = []*mount{mnt}
665 for _, mnt := range mounts {
666 err := mnt.Index(ctx, opts.Prefix, opts.WriteTo)
674 func ctxToken(ctx context.Context) string {
675 if c, ok := auth.FromContext(ctx); ok && len(c.Tokens) > 0 {
682 type locatorInfo struct {
689 func parseLocator(loc string) (locatorInfo, error) {
691 for i, part := range strings.Split(loc, "+") {
694 return li, errInvalidLocator
700 if size, err := strconv.Atoi(part); err == nil {
706 return li, errInvalidLocator
714 if part[0] >= '0' && part[0] <= '9' {
715 // size, if present at all, must come first
716 return li, errInvalidLocator