X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8b03f0b0e66190b35c55ce1f0917dd2104b8a0b9..04704ea80b294655fe14d0c8cddf4ec1a6b21b4d:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index a04bc0b6fd..60d062e1e3 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -1,390 +1,765 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +// Package keepstore implements the keepstore service component and +// back-end storage drivers. +// +// It is an internal module, only intended to be imported by +// /cmd/arvados-server and other server-side components in this +// repository. +package keepstore import ( "bytes" - "flag" + "context" + "crypto/md5" + "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/httpserver" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "io/ioutil" - "log" - "net" + "io" "net/http" "os" - "os/signal" + "sort" + "strconv" "strings" - "syscall" + "sync" + "sync/atomic" "time" + + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/auth" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) -// ====================== -// Configuration settings -// -// TODO(twp): make all of these configurable via command line flags -// and/or configuration file settings. +// Maximum size of a keep block is 64 MiB. +const BlockSize = 1 << 26 -// Default TCP address on which to listen for requests. -// Initialized by the --listen flag. -const DefaultAddr = ":25107" +var ( + errChecksum = httpserver.ErrorWithStatus(errors.New("checksum mismatch in stored data"), http.StatusBadGateway) + errNoTokenProvided = httpserver.ErrorWithStatus(errors.New("no token provided in Authorization header"), http.StatusUnauthorized) + errMethodNotAllowed = httpserver.ErrorWithStatus(errors.New("method not allowed"), http.StatusMethodNotAllowed) + errVolumeUnavailable = httpserver.ErrorWithStatus(errors.New("volume unavailable"), http.StatusServiceUnavailable) + errCollision = httpserver.ErrorWithStatus(errors.New("hash collision"), http.StatusInternalServerError) + errExpiredSignature = httpserver.ErrorWithStatus(errors.New("expired signature"), http.StatusUnauthorized) + errInvalidSignature = httpserver.ErrorWithStatus(errors.New("invalid signature"), http.StatusBadRequest) + errInvalidLocator = httpserver.ErrorWithStatus(errors.New("invalid locator"), http.StatusBadRequest) + errFull = httpserver.ErrorWithStatus(errors.New("insufficient storage"), http.StatusInsufficientStorage) + errTooLarge = httpserver.ErrorWithStatus(errors.New("request entity too large"), http.StatusRequestEntityTooLarge) + driver = make(map[string]volumeDriver) +) -// A Keep "block" is 64MB. -const BlockSize = 64 * 1024 * 1024 +type indexOptions struct { + MountUUID string + Prefix string + WriteTo io.Writer +} -// A Keep volume must have at least MinFreeKilobytes available -// in order to permit writes. -const MinFreeKilobytes = BlockSize / 1024 +type mount struct { + arvados.KeepMount + volume + priority int +} -// ProcMounts /proc/mounts -var ProcMounts = "/proc/mounts" +type keepstore struct { + cluster *arvados.Cluster + logger logrus.FieldLogger + serviceURL arvados.URL + mounts map[string]*mount + mountsR []*mount + mountsW []*mount + bufferPool *bufferPool -// enforcePermissions controls whether permission signatures -// should be enforced (affecting GET and DELETE requests). -// Initialized by the -enforce-permissions flag. -var enforcePermissions bool + iostats map[volume]*ioStats -// blobSignatureTTL is the time duration for which new permission -// signatures (returned by PUT requests) will be valid. -// Initialized by the -permission-ttl flag. -var blobSignatureTTL time.Duration + remoteClients map[string]*keepclient.KeepClient + remoteClientsMtx sync.Mutex +} -// dataManagerToken represents the API token used by the -// Data Manager, and is required on certain privileged operations. -// Initialized by the -data-manager-token-file flag. -var dataManagerToken string +func newKeepstore(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) (*keepstore, error) { + logger := ctxlog.FromContext(ctx) -// neverDelete can be used to prevent the DELETE handler from -// actually deleting anything. -var neverDelete = true + if cluster.API.MaxConcurrentRequests > 0 && cluster.API.MaxConcurrentRequests < cluster.API.MaxKeepBlobBuffers { + logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", cluster.API.MaxKeepBlobBuffers, cluster.API.MaxConcurrentRequests) + } -// trashLifetime is the time duration after a block is trashed -// during which it can be recovered using an /untrash request -// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively. -var trashLifetime time.Duration + if cluster.Collections.BlobSigningKey != "" { + } else if cluster.Collections.BlobSigning { + return nil, errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey") + } else { + logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.") + } -// trashCheckInterval is the time duration at which the emptyTrash goroutine -// will check and delete expired trashed blocks. Default is one day. -// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively. -var trashCheckInterval time.Duration + if cluster.API.MaxKeepBlobBuffers <= 0 { + return nil, fmt.Errorf("API.MaxKeepBlobBuffers must be greater than zero") + } + bufferPool := newBufferPool(logger, cluster.API.MaxKeepBlobBuffers, reg) + + ks := &keepstore{ + cluster: cluster, + logger: logger, + serviceURL: serviceURL, + bufferPool: bufferPool, + remoteClients: make(map[string]*keepclient.KeepClient), + } -var maxBuffers = 128 -var bufs *bufferPool + err := ks.setupMounts(newVolumeMetricsVecs(reg)) + if err != nil { + return nil, err + } -// KeepError types. -// -type KeepError struct { - HTTPCode int - ErrMsg string + return ks, nil } -var ( - BadRequestError = &KeepError{400, "Bad Request"} - UnauthorizedError = &KeepError{401, "Unauthorized"} - CollisionError = &KeepError{500, "Collision"} - RequestHashError = &KeepError{422, "Hash mismatch in request"} - PermissionError = &KeepError{403, "Forbidden"} - DiskHashError = &KeepError{500, "Hash mismatch in stored data"} - ExpiredError = &KeepError{401, "Expired permission signature"} - NotFoundError = &KeepError{404, "Not Found"} - GenericError = &KeepError{500, "Fail"} - FullError = &KeepError{503, "Full"} - SizeRequiredError = &KeepError{411, "Missing Content-Length"} - TooLongError = &KeepError{413, "Block is too large"} - MethodDisabledError = &KeepError{405, "Method disabled"} - ErrNotImplemented = &KeepError{500, "Unsupported configuration"} - ErrClientDisconnect = &KeepError{503, "Client disconnected"} -) +func (ks *keepstore) setupMounts(metrics *volumeMetricsVecs) error { + ks.mounts = make(map[string]*mount) + if len(ks.cluster.Volumes) == 0 { + return errors.New("no volumes configured") + } + for uuid, cfgvol := range ks.cluster.Volumes { + va, ok := cfgvol.AccessViaHosts[ks.serviceURL] + if !ok && len(cfgvol.AccessViaHosts) > 0 { + continue + } + dri, ok := driver[cfgvol.Driver] + if !ok { + return fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver) + } + vol, err := dri(newVolumeParams{ + UUID: uuid, + Cluster: ks.cluster, + ConfigVolume: cfgvol, + Logger: ks.logger, + MetricsVecs: metrics, + BufferPool: ks.bufferPool, + }) + if err != nil { + return fmt.Errorf("error initializing volume %s: %s", uuid, err) + } + sc := cfgvol.StorageClasses + if len(sc) == 0 { + sc = map[string]bool{"default": true} + } + repl := cfgvol.Replication + if repl < 1 { + repl = 1 + } + pri := 0 + for class, in := range cfgvol.StorageClasses { + p := ks.cluster.StorageClasses[class].Priority + if in && p > pri { + pri = p + } + } + mnt := &mount{ + volume: vol, + priority: pri, + KeepMount: arvados.KeepMount{ + UUID: uuid, + DeviceID: vol.DeviceID(), + AllowWrite: !va.ReadOnly && !cfgvol.ReadOnly, + AllowTrash: !va.ReadOnly && (!cfgvol.ReadOnly || cfgvol.AllowTrashWhenReadOnly), + Replication: repl, + StorageClasses: sc, + }, + } + ks.mounts[uuid] = mnt + ks.logger.Printf("started volume %s (%s), AllowWrite=%v, AllowTrash=%v", uuid, vol.DeviceID(), mnt.AllowWrite, mnt.AllowTrash) + } + if len(ks.mounts) == 0 { + return fmt.Errorf("no volumes configured for %s", ks.serviceURL) + } -func (e *KeepError) Error() string { - return e.ErrMsg + ks.mountsR = nil + ks.mountsW = nil + for _, mnt := range ks.mounts { + ks.mountsR = append(ks.mountsR, mnt) + if mnt.AllowWrite { + ks.mountsW = append(ks.mountsW, mnt) + } + } + // Sorting mounts by UUID makes behavior more predictable, and + // is convenient for testing -- for example, "index all + // volumes" and "trash block on all volumes" will visit + // volumes in predictable order. + sort.Slice(ks.mountsR, func(i, j int) bool { return ks.mountsR[i].UUID < ks.mountsR[j].UUID }) + sort.Slice(ks.mountsW, func(i, j int) bool { return ks.mountsW[i].UUID < ks.mountsW[j].UUID }) + return nil } -// ======================== -// Internal data structures -// -// These global variables are used by multiple parts of the -// program. They are good candidates for moving into their own -// packages. - -// The Keep VolumeManager maintains a list of available volumes. -// Initialized by the --volumes flag (or by FindKeepVolumes). -var KeepVM VolumeManager +// checkLocatorSignature checks that locator has a valid signature. +// If the BlobSigning config is false, it returns nil even if the +// signature is invalid or missing. +func (ks *keepstore) checkLocatorSignature(ctx context.Context, locator string) error { + if !ks.cluster.Collections.BlobSigning { + return nil + } + token := ctxToken(ctx) + if token == "" { + return errNoTokenProvided + } + err := arvados.VerifySignature(locator, token, ks.cluster.Collections.BlobSigningTTL.Duration(), []byte(ks.cluster.Collections.BlobSigningKey)) + if err == arvados.ErrSignatureExpired { + return errExpiredSignature + } else if err != nil { + return errInvalidSignature + } + return nil +} -// The pull list manager and trash queue are threadsafe queues which -// support atomic update operations. The PullHandler and TrashHandler -// store results from Data Manager /pull and /trash requests here. -// -// See the Keep and Data Manager design documents for more details: -// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc -// https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc -// -var pullq *WorkQueue -var trashq *WorkQueue +// signLocator signs the locator for the given token, if possible. +// Note this signs if the BlobSigningKey config is available, even if +// the BlobSigning config is false. +func (ks *keepstore) signLocator(token, locator string) string { + if token == "" || len(ks.cluster.Collections.BlobSigningKey) == 0 { + return locator + } + ttl := ks.cluster.Collections.BlobSigningTTL.Duration() + return arvados.SignLocator(locator, token, time.Now().Add(ttl), ttl, []byte(ks.cluster.Collections.BlobSigningKey)) +} -type volumeSet []Volume +func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (n int, err error) { + li, err := getLocatorInfo(opts.Locator) + if err != nil { + return 0, err + } + out := opts.WriteTo + if rw, ok := out.(http.ResponseWriter); ok && li.size > 0 { + out = &setSizeOnWrite{ResponseWriter: rw, size: li.size} + } + if li.remote && !li.signed { + return ks.blockReadRemote(ctx, opts) + } + if err := ks.checkLocatorSignature(ctx, opts.Locator); err != nil { + return 0, err + } + hashcheck := md5.New() + if li.size > 0 { + out = newHashCheckWriter(out, hashcheck, int64(li.size), li.hash) + } else { + out = io.MultiWriter(out, hashcheck) + } -var ( - flagSerializeIO bool - flagReadonly bool - volumes volumeSet -) + buf, err := ks.bufferPool.GetContext(ctx) + if err != nil { + return 0, err + } + defer ks.bufferPool.Put(buf) + streamer := newStreamWriterAt(out, 65536, buf) + defer streamer.Close() + + var errToCaller error = os.ErrNotExist + for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) { + if ctx.Err() != nil { + return 0, ctx.Err() + } + err := mnt.BlockRead(ctx, li.hash, streamer) + if err != nil { + if streamer.WroteAt() != 0 { + // BlockRead encountered an error + // after writing some data, so it's + // too late to try another + // volume. Flush streamer before + // calling Wrote() to ensure our + // return value accurately reflects + // the number of bytes written to + // opts.WriteTo. + streamer.Close() + return streamer.Wrote(), err + } + if !os.IsNotExist(err) { + errToCaller = err + } + continue + } + if li.size == 0 { + // hashCheckingWriter isn't in use because we + // don't know the expected size. All we can do + // is check after writing all the data, and + // trust the caller is doing a HEAD request so + // it's not too late to set an error code in + // the response header. + err = streamer.Close() + if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil { + err = errChecksum + } + if rw, ok := opts.WriteTo.(http.ResponseWriter); ok { + // We didn't set the content-length header + // above because we didn't know the block size + // until now. + rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt())) + } + return streamer.WroteAt(), err + } else if streamer.WroteAt() != li.size { + // If the backend read fewer bytes than + // expected but returns no error, we can + // classify this as a checksum error (even + // though hashCheckWriter doesn't know that + // yet, it's just waiting for the next + // write). If our caller is serving a GET + // request it's too late to do anything about + // it anyway, but if it's a HEAD request the + // caller can still change the response status + // code. + return streamer.WroteAt(), errChecksum + } + // Ensure streamer flushes all buffered data without + // errors. + err = streamer.Close() + return streamer.Wrote(), err + } + return 0, errToCaller +} -func (vs *volumeSet) String() string { - if vs == nil { - return "[]" +func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) { + token := ctxToken(ctx) + if token == "" { + return 0, errNoTokenProvided + } + var remoteClient *keepclient.KeepClient + var parts []string + li, err := getLocatorInfo(opts.Locator) + if err != nil { + return 0, err + } + for i, part := range strings.Split(opts.Locator, "+") { + switch { + case i == 0: + // don't try to parse hash part as hint + case strings.HasPrefix(part, "A"): + // drop local permission hint + continue + case len(part) > 7 && part[0] == 'R' && part[6] == '-': + remoteID := part[1:6] + remote, ok := ks.cluster.RemoteClusters[remoteID] + if !ok { + return 0, httpserver.ErrorWithStatus(errors.New("remote cluster not configured"), http.StatusBadRequest) + } + kc, err := ks.remoteClient(remoteID, remote, token) + if err == auth.ErrObsoleteToken { + return 0, httpserver.ErrorWithStatus(err, http.StatusBadRequest) + } else if err != nil { + return 0, err + } + remoteClient = kc + part = "A" + part[7:] + } + parts = append(parts, part) + } + if remoteClient == nil { + return 0, httpserver.ErrorWithStatus(errors.New("invalid remote hint"), http.StatusBadRequest) } - return fmt.Sprintf("%+v", (*vs)[:]) + locator := strings.Join(parts, "+") + if opts.LocalLocator == nil { + // Read from remote cluster and stream response back + // to caller + if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size > 0 { + rw.Header().Set("Content-Length", fmt.Sprintf("%d", li.size)) + } + return remoteClient.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: locator, + WriteTo: opts.WriteTo, + }) + } + // We must call LocalLocator before writing any data to + // opts.WriteTo, otherwise the caller can't put the local + // locator in a response header. So we copy into memory, + // generate the local signature, then copy from memory to + // opts.WriteTo. + buf, err := ks.bufferPool.GetContext(ctx) + if err != nil { + return 0, err + } + defer ks.bufferPool.Put(buf) + writebuf := bytes.NewBuffer(buf[:0]) + ks.logger.Infof("blockReadRemote(%s): remote read(%s)", opts.Locator, locator) + _, err = remoteClient.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: locator, + WriteTo: writebuf, + }) + if err != nil { + return 0, err + } + resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{ + Hash: locator, + Data: writebuf.Bytes(), + }) + if err != nil { + return 0, err + } + opts.LocalLocator(resp.Locator) + if rw, ok := opts.WriteTo.(http.ResponseWriter); ok { + rw.Header().Set("Content-Length", fmt.Sprintf("%d", writebuf.Len())) + } + n, err := io.Copy(opts.WriteTo, bytes.NewReader(writebuf.Bytes())) + return int(n), err } -// TODO(twp): continue moving as much code as possible out of main -// so it can be effectively tested. Esp. handling and postprocessing -// of command line flags (identifying Keep volumes and initializing -// permission arguments). - -func main() { - log.Println("keepstore starting, pid", os.Getpid()) - defer log.Println("keepstore exiting, pid", os.Getpid()) - - var ( - dataManagerTokenFile string - listen string - blobSigningKeyFile string - permissionTTLSec int - pidfile string - maxRequests int - ) - flag.StringVar( - &dataManagerTokenFile, - "data-manager-token-file", - "", - "File with the API token used by the Data Manager. All DELETE "+ - "requests or GET /index requests must carry this token.") - flag.BoolVar( - &enforcePermissions, - "enforce-permissions", - false, - "Enforce permission signatures on requests.") - flag.StringVar( - &listen, - "listen", - DefaultAddr, - "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.") - flag.IntVar( - &maxRequests, - "max-requests", - 0, - "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)") - flag.BoolVar( - &neverDelete, - "never-delete", - true, - "If true, nothing will be deleted. "+ - "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+ - "You should leave this option alone unless you can afford to lose data.") - flag.StringVar( - &blobSigningKeyFile, - "permission-key-file", - "", - "Synonym for -blob-signing-key-file.") - flag.StringVar( - &blobSigningKeyFile, - "blob-signing-key-file", - "", - "File containing the secret key for generating and verifying "+ - "blob permission signatures.") - flag.IntVar( - &permissionTTLSec, - "permission-ttl", - 0, - "Synonym for -blob-signature-ttl.") - flag.IntVar( - &permissionTTLSec, - "blob-signature-ttl", - 2*7*24*3600, - "Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. "+ - "See services/api/config/application.default.yml.") - flag.BoolVar( - &flagSerializeIO, - "serialize", - false, - "Serialize read and write operations on the following volumes.") - flag.BoolVar( - &flagReadonly, - "readonly", - false, - "Do not write, delete, or touch anything on the following volumes.") - flag.StringVar( - &pidfile, - "pid", - "", - "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.") - flag.IntVar( - &maxBuffers, - "max-buffers", - maxBuffers, - fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20)) - flag.DurationVar( - &trashLifetime, - "trash-lifetime", - 0, - "Time duration after a block is trashed during which it can be recovered using an /untrash request") - flag.DurationVar( - &trashCheckInterval, - "trash-check-interval", - 24*time.Hour, - "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.") - - flag.Parse() - - if maxBuffers < 0 { - log.Fatal("-max-buffers must be greater than zero.") - } - bufs = newBufferPool(maxBuffers, BlockSize) - - if pidfile != "" { - f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777) - if err != nil { - log.Fatalf("open pidfile (%s): %s", pidfile, err) +func (ks *keepstore) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) { + ks.remoteClientsMtx.Lock() + kc, ok := ks.remoteClients[remoteID] + ks.remoteClientsMtx.Unlock() + if !ok { + c := &arvados.Client{ + APIHost: remoteCluster.Host, + AuthToken: "xxx", + Insecure: remoteCluster.Insecure, } - err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + ac, err := arvadosclient.New(c) if err != nil { - log.Fatalf("flock pidfile (%s): %s", pidfile, err) + return nil, err } - err = f.Truncate(0) + kc, err = keepclient.MakeKeepClient(ac) if err != nil { - log.Fatalf("truncate pidfile (%s): %s", pidfile, err) + return nil, err } - _, err = fmt.Fprint(f, os.Getpid()) + kc.DiskCacheSize = keepclient.DiskCacheDisabled + + ks.remoteClientsMtx.Lock() + ks.remoteClients[remoteID] = kc + ks.remoteClientsMtx.Unlock() + } + accopy := *kc.Arvados + accopy.ApiToken = token + kccopy := kc.Clone() + kccopy.Arvados = &accopy + token, err := auth.SaltToken(token, remoteID) + if err != nil { + return nil, err + } + kccopy.Arvados.ApiToken = token + return kccopy, nil +} + +// BlockWrite writes a block to one or more volumes. +func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) { + var resp arvados.BlockWriteResponse + var hash string + if opts.Data == nil { + buf, err := ks.bufferPool.GetContext(ctx) if err != nil { - log.Fatalf("write pidfile (%s): %s", pidfile, err) + return resp, err } - err = f.Sync() + defer ks.bufferPool.Put(buf) + w := bytes.NewBuffer(buf[:0]) + h := md5.New() + limitedReader := &io.LimitedReader{R: opts.Reader, N: BlockSize} + n, err := io.Copy(io.MultiWriter(w, h), limitedReader) if err != nil { - log.Fatalf("sync pidfile (%s): %s", pidfile, err) + return resp, err + } + if limitedReader.N == 0 { + // Data size is either exactly BlockSize, or too big. + n, err := opts.Reader.Read(make([]byte, 1)) + if n > 0 { + return resp, httpserver.ErrorWithStatus(err, http.StatusRequestEntityTooLarge) + } + if err != io.EOF { + return resp, err + } + } + opts.Data = buf[:n] + if opts.DataSize != 0 && int(n) != opts.DataSize { + return resp, httpserver.ErrorWithStatus(fmt.Errorf("content length %d did not match specified data size %d", n, opts.DataSize), http.StatusBadRequest) } - defer f.Close() - defer os.Remove(pidfile) + hash = fmt.Sprintf("%x", h.Sum(nil)) + } else { + hash = fmt.Sprintf("%x", md5.Sum(opts.Data)) } - - if len(volumes) == 0 { - if (&unixVolumeAdder{&volumes}).Discover() == 0 { - log.Fatal("No volumes found.") + if opts.Hash != "" && !strings.HasPrefix(opts.Hash, hash) { + return resp, httpserver.ErrorWithStatus(fmt.Errorf("content hash %s did not match specified locator %s", hash, opts.Hash), http.StatusBadRequest) + } + rvzmounts := ks.rendezvous(hash, ks.mountsW) + result := newPutProgress(opts.StorageClasses) + for _, mnt := range rvzmounts { + if !result.Want(mnt) { + continue + } + cmp := &checkEqual{Expect: opts.Data} + if err := mnt.BlockRead(ctx, hash, cmp); err == nil { + if !cmp.Equal() { + return resp, errCollision + } + err := mnt.BlockTouch(hash) + if err == nil { + result.Add(mnt) + } } } - - for _, v := range volumes { - log.Printf("Using volume %v (writable=%v)", v, v.Writable()) + var allFull atomic.Bool + allFull.Store(true) + // pending tracks what result will be if all outstanding + // writes succeed. + pending := result.Copy() + cond := sync.NewCond(new(sync.Mutex)) + cond.L.Lock() + var wg sync.WaitGroup +nextmnt: + for _, mnt := range rvzmounts { + for { + if result.Done() || ctx.Err() != nil { + break nextmnt + } + if !result.Want(mnt) { + continue nextmnt + } + if pending.Want(mnt) { + break + } + // This mount might not be needed, depending + // on the outcome of pending writes. Wait for + // a pending write to finish, then check + // again. + cond.Wait() + } + mnt := mnt + logger := ks.logger.WithField("mount", mnt.UUID) + pending.Add(mnt) + wg.Add(1) + go func() { + defer wg.Done() + logger.Debug("start write") + err := mnt.BlockWrite(ctx, hash, opts.Data) + cond.L.Lock() + defer cond.L.Unlock() + defer cond.Broadcast() + if err != nil { + logger.Debug("write failed") + pending.Sub(mnt) + if err != errFull { + allFull.Store(false) + } + } else { + result.Add(mnt) + pending.Sub(mnt) + } + }() } + cond.L.Unlock() + wg.Wait() + if ctx.Err() != nil { + return resp, ctx.Err() + } + if result.Done() || result.totalReplication > 0 { + resp = arvados.BlockWriteResponse{ + Locator: ks.signLocator(ctxToken(ctx), fmt.Sprintf("%s+%d", hash, len(opts.Data))), + Replicas: result.totalReplication, + StorageClasses: result.classDone, + } + return resp, nil + } + if allFull.Load() { + return resp, errFull + } + return resp, errVolumeUnavailable +} - // Initialize data manager token and permission key. - // If these tokens are specified but cannot be read, - // raise a fatal error. - if dataManagerTokenFile != "" { - if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil { - dataManagerToken = strings.TrimSpace(string(buf)) - } else { - log.Fatalf("reading data manager token: %s\n", err) +// rendezvous sorts the given mounts by descending priority, then by +// rendezvous order for the given locator. +func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount { + hash := locator + if len(hash) > 32 { + hash = hash[:32] + } + // copy the provided []*mount before doing an in-place sort + mnts = append([]*mount(nil), mnts...) + weight := make(map[*mount]string) + for _, mnt := range mnts { + uuidpart := mnt.UUID + if len(uuidpart) == 27 { + // strip zzzzz-yyyyy- prefixes + uuidpart = uuidpart[12:] } + weight[mnt] = fmt.Sprintf("%x", md5.Sum([]byte(hash+uuidpart))) } + sort.Slice(mnts, func(i, j int) bool { + if p := mnts[i].priority - mnts[j].priority; p != 0 { + return p > 0 + } + return weight[mnts[i]] < weight[mnts[j]] + }) + return mnts +} + +// checkEqual reports whether the data written to it (via io.WriterAt +// interface) is equal to the expected data. +// +// Expect should not be changed after the first Write. +// +// Results are undefined if WriteAt is called with overlapping ranges. +type checkEqual struct { + Expect []byte + equal atomic.Int64 + notequal atomic.Bool +} + +func (ce *checkEqual) Equal() bool { + return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect)) +} - if neverDelete != true { - log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " + - "been extensively tested. You should leave this option alone unless you can afford to lose data.") +func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) { + endpos := int(offset) + len(p) + if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) { + ce.equal.Add(int64(len(p))) + } else { + ce.notequal.Store(true) } + return len(p), nil +} - if blobSigningKeyFile != "" { - if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil { - PermissionSecret = bytes.TrimSpace(buf) - } else { - log.Fatalf("reading permission key: %s\n", err) +func (ks *keepstore) BlockUntrash(ctx context.Context, locator string) error { + li, err := getLocatorInfo(locator) + if err != nil { + return err + } + var errToCaller error = os.ErrNotExist + for _, mnt := range ks.mountsW { + if ctx.Err() != nil { + return ctx.Err() + } + err := mnt.BlockUntrash(li.hash) + if err == nil { + errToCaller = nil + } else if !os.IsNotExist(err) && errToCaller != nil { + errToCaller = err } } + return errToCaller +} - blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second - - if PermissionSecret == nil { - if enforcePermissions { - log.Fatal("-enforce-permissions requires a permission key") - } else { - log.Println("Running without a PermissionSecret. Block locators " + - "returned by this server will not be signed, and will be rejected " + - "by a server that enforces permissions.") - log.Println("To fix this, use the -blob-signing-key-file flag " + - "to specify the file containing the permission key.") +func (ks *keepstore) BlockTouch(ctx context.Context, locator string) error { + li, err := getLocatorInfo(locator) + if err != nil { + return err + } + var errToCaller error = os.ErrNotExist + for _, mnt := range ks.mountsW { + if ctx.Err() != nil { + return ctx.Err() + } + err := mnt.BlockTouch(li.hash) + if err == nil { + return nil + } + if !os.IsNotExist(err) { + errToCaller = err } } + return errToCaller +} - if maxRequests <= 0 { - maxRequests = maxBuffers * 2 - log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests) +func (ks *keepstore) BlockTrash(ctx context.Context, locator string) error { + if !ks.cluster.Collections.BlobTrash { + return errMethodNotAllowed + } + li, err := getLocatorInfo(locator) + if err != nil { + return err + } + var errToCaller error = os.ErrNotExist + for _, mnt := range ks.mounts { + if !mnt.AllowTrash { + continue + } + if ctx.Err() != nil { + return ctx.Err() + } + t, err := mnt.Mtime(li.hash) + if err == nil && time.Now().Sub(t) > ks.cluster.Collections.BlobSigningTTL.Duration() { + err = mnt.BlockTrash(li.hash) + } + if os.IsNotExist(errToCaller) || (errToCaller == nil && !os.IsNotExist(err)) { + errToCaller = err + } } + return errToCaller +} - // Start a round-robin VolumeManager with the volumes we have found. - KeepVM = MakeRRVolumeManager(volumes) +func (ks *keepstore) Mounts() []*mount { + return ks.mountsR +} - // Middleware stack: logger, maxRequests limiter, method handlers - http.Handle("/", &LoggingRESTRouter{ - httpserver.NewRequestLimiter(maxRequests, - MakeRESTRouter()), - }) +func (ks *keepstore) Index(ctx context.Context, opts indexOptions) error { + mounts := ks.mountsR + if opts.MountUUID != "" { + mnt, ok := ks.mounts[opts.MountUUID] + if !ok { + return os.ErrNotExist + } + mounts = []*mount{mnt} + } + for _, mnt := range mounts { + err := mnt.Index(ctx, opts.Prefix, opts.WriteTo) + if err != nil { + return err + } + } + return nil +} - // Set up a TCP listener. - listener, err := net.Listen("tcp", listen) - if err != nil { - log.Fatal(err) - } - - // Initialize Pull queue and worker - keepClient := &keepclient.KeepClient{ - Arvados: &arvadosclient.ArvadosClient{}, - Want_replicas: 1, - Client: &http.Client{}, - } - - // Initialize the pullq and worker - pullq = NewWorkQueue() - go RunPullWorker(pullq, keepClient) - - // Initialize the trashq and worker - trashq = NewWorkQueue() - go RunTrashWorker(trashq) - - // Start emptyTrash goroutine - doneEmptyingTrash := make(chan bool) - go emptyTrash(doneEmptyingTrash, trashCheckInterval) - - // Shut down the server gracefully (by closing the listener) - // if SIGTERM is received. - term := make(chan os.Signal, 1) - go func(sig <-chan os.Signal) { - s := <-sig - log.Println("caught signal:", s) - doneEmptyingTrash <- true - listener.Close() - }(term) - signal.Notify(term, syscall.SIGTERM) - signal.Notify(term, syscall.SIGINT) - - log.Println("listening at", listen) - srv := &http.Server{Addr: listen} - srv.Serve(listener) +func ctxToken(ctx context.Context) string { + if c, ok := auth.FromContext(ctx); ok && len(c.Tokens) > 0 { + return c.Tokens[0] + } else { + return "" + } } -// At every trashCheckInterval tick, invoke EmptyTrash on all volumes. -func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) { - ticker := time.NewTicker(trashCheckInterval) +// locatorInfo expresses the attributes of a locator that are relevant +// for keepstore decision-making. +type locatorInfo struct { + hash string + size int + remote bool // locator has a +R hint + signed bool // locator has a +A hint +} - for { - select { - case <-ticker.C: - for _, v := range volumes { - if v.Writable() { - v.EmptyTrash() +func getLocatorInfo(loc string) (locatorInfo, error) { + var li locatorInfo + plus := 0 // number of '+' chars seen so far + partlen := 0 // chars since last '+' + for i, c := range loc + "+" { + if c == '+' { + if partlen == 0 { + // double/leading/trailing '+' + return li, errInvalidLocator + } + if plus == 0 { + if i != 32 { + return li, errInvalidLocator } + li.hash = loc[:i] } - case <-doneEmptyingTrash: - ticker.Stop() - return + if plus == 1 { + if size, err := strconv.Atoi(loc[i-partlen : i]); err == nil { + li.size = size + } + } + plus++ + partlen = 0 + continue + } + partlen++ + if partlen == 1 { + if c == 'A' { + li.signed = true + } + if c == 'R' { + li.remote = true + } + if plus > 1 && c >= '0' && c <= '9' { + // size, if present at all, must come first + return li, errInvalidLocator + } + } + if plus == 0 && !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) { + // non-hexadecimal char in hash part + return li, errInvalidLocator } } + return li, nil }