//
// SPDX-License-Identifier: AGPL-3.0
-package main
+// Package keepstore implements the keepstore service component and
+// back-end storage drivers.
+//
+// It is an internal module, only intended to be imported by
+// /cmd/arvados-server and other server-side components in this
+// repository.
+package keepstore
import (
- "flag"
+ "bytes"
+ "context"
+ "crypto/md5"
+ "errors"
"fmt"
- "net"
+ "io"
+ "net/http"
"os"
- "os/signal"
- "syscall"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/config"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "github.com/coreos/go-systemd/daemon"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/httpserver"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
-var version = "dev"
-
-// A Keep "block" is 64MB.
-const BlockSize = 64 * 1024 * 1024
-
-// A Keep volume must have at least MinFreeKilobytes available
-// in order to permit writes.
-const MinFreeKilobytes = BlockSize / 1024
+// Maximum size of a keep block is 64 MiB.
+const BlockSize = 1 << 26
-// ProcMounts /proc/mounts
-var ProcMounts = "/proc/mounts"
+var (
+ errChecksum = httpserver.ErrorWithStatus(errors.New("checksum mismatch in stored data"), http.StatusBadGateway)
+ errNoTokenProvided = httpserver.ErrorWithStatus(errors.New("no token provided in Authorization header"), http.StatusUnauthorized)
+ errMethodNotAllowed = httpserver.ErrorWithStatus(errors.New("method not allowed"), http.StatusMethodNotAllowed)
+ errVolumeUnavailable = httpserver.ErrorWithStatus(errors.New("volume unavailable"), http.StatusServiceUnavailable)
+ errCollision = httpserver.ErrorWithStatus(errors.New("hash collision"), http.StatusInternalServerError)
+ errExpiredSignature = httpserver.ErrorWithStatus(errors.New("expired signature"), http.StatusUnauthorized)
+ errInvalidSignature = httpserver.ErrorWithStatus(errors.New("invalid signature"), http.StatusBadRequest)
+ errInvalidLocator = httpserver.ErrorWithStatus(errors.New("invalid locator"), http.StatusBadRequest)
+ errFull = httpserver.ErrorWithStatus(errors.New("insufficient storage"), http.StatusInsufficientStorage)
+ errTooLarge = httpserver.ErrorWithStatus(errors.New("request entity too large"), http.StatusRequestEntityTooLarge)
+ driver = make(map[string]volumeDriver)
+)
-var bufs *bufferPool
+type indexOptions struct {
+ MountUUID string
+ Prefix string
+ WriteTo io.Writer
+}
-// KeepError types.
-//
-type KeepError struct {
- HTTPCode int
- ErrMsg string
+type mount struct {
+ arvados.KeepMount
+ volume
+ priority int
}
-var (
- BadRequestError = &KeepError{400, "Bad Request"}
- UnauthorizedError = &KeepError{401, "Unauthorized"}
- CollisionError = &KeepError{500, "Collision"}
- RequestHashError = &KeepError{422, "Hash mismatch in request"}
- PermissionError = &KeepError{403, "Forbidden"}
- DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
- ExpiredError = &KeepError{401, "Expired permission signature"}
- NotFoundError = &KeepError{404, "Not Found"}
- VolumeBusyError = &KeepError{503, "Volume backend busy"}
- GenericError = &KeepError{500, "Fail"}
- FullError = &KeepError{503, "Full"}
- SizeRequiredError = &KeepError{411, "Missing Content-Length"}
- TooLongError = &KeepError{413, "Block is too large"}
- MethodDisabledError = &KeepError{405, "Method disabled"}
- ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
- ErrClientDisconnect = &KeepError{503, "Client disconnected"}
-)
+type keepstore struct {
+ cluster *arvados.Cluster
+ logger logrus.FieldLogger
+ serviceURL arvados.URL
+ mounts map[string]*mount
+ mountsR []*mount
+ mountsW []*mount
+ bufferPool *bufferPool
+
+ iostats map[volume]*ioStats
-func (e *KeepError) Error() string {
- return e.ErrMsg
+ remoteClients map[string]*keepclient.KeepClient
+ remoteClientsMtx sync.Mutex
}
-// ========================
-// Internal data structures
-//
-// These global variables are used by multiple parts of the
-// program. They are good candidates for moving into their own
-// packages.
+func newKeepstore(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) (*keepstore, error) {
+ logger := ctxlog.FromContext(ctx)
-// The Keep VolumeManager maintains a list of available volumes.
-// Initialized by the --volumes flag (or by FindKeepVolumes).
-var KeepVM VolumeManager
+ if cluster.API.MaxConcurrentRequests > 0 && cluster.API.MaxConcurrentRequests < cluster.API.MaxKeepBlobBuffers {
+ logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", cluster.API.MaxKeepBlobBuffers, cluster.API.MaxConcurrentRequests)
+ }
-// The pull list manager and trash queue are threadsafe queues which
-// support atomic update operations. The PullHandler and TrashHandler
-// store results from Data Manager /pull and /trash requests here.
-//
-// See the Keep and Data Manager design documents for more details:
-// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
-// https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
-//
-var pullq *WorkQueue
-var trashq *WorkQueue
+ if cluster.Collections.BlobSigningKey != "" {
+ } else if cluster.Collections.BlobSigning {
+ return nil, errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey")
+ } else {
+ logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.")
+ }
-func main() {
- deprecated.beforeFlagParse(theConfig)
+ if cluster.API.MaxKeepBlobBuffers <= 0 {
+ return nil, fmt.Errorf("API.MaxKeepBlobBuffers must be greater than zero")
+ }
+ bufferPool := newBufferPool(logger, cluster.API.MaxKeepBlobBuffers, reg)
+
+ ks := &keepstore{
+ cluster: cluster,
+ logger: logger,
+ serviceURL: serviceURL,
+ bufferPool: bufferPool,
+ remoteClients: make(map[string]*keepclient.KeepClient),
+ }
- dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
- getVersion := flag.Bool("version", false, "Print version information and exit.")
+ err := ks.setupMounts(newVolumeMetricsVecs(reg))
+ if err != nil {
+ return nil, err
+ }
- defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
- var configPath string
- flag.StringVar(
- &configPath,
- "config",
- defaultConfigPath,
- "YAML or JSON configuration file `path`")
- flag.Usage = usage
- flag.Parse()
+ return ks, nil
+}
- // Print version information if requested
- if *getVersion {
- fmt.Printf("keepstore %s\n", version)
- return
+func (ks *keepstore) setupMounts(metrics *volumeMetricsVecs) error {
+ ks.mounts = make(map[string]*mount)
+ if len(ks.cluster.Volumes) == 0 {
+ return errors.New("no volumes configured")
+ }
+ for uuid, cfgvol := range ks.cluster.Volumes {
+ va, ok := cfgvol.AccessViaHosts[ks.serviceURL]
+ if !ok && len(cfgvol.AccessViaHosts) > 0 {
+ continue
+ }
+ dri, ok := driver[cfgvol.Driver]
+ if !ok {
+ return fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
+ }
+ vol, err := dri(newVolumeParams{
+ UUID: uuid,
+ Cluster: ks.cluster,
+ ConfigVolume: cfgvol,
+ Logger: ks.logger,
+ MetricsVecs: metrics,
+ BufferPool: ks.bufferPool,
+ })
+ if err != nil {
+ return fmt.Errorf("error initializing volume %s: %s", uuid, err)
+ }
+ sc := cfgvol.StorageClasses
+ if len(sc) == 0 {
+ sc = map[string]bool{"default": true}
+ }
+ repl := cfgvol.Replication
+ if repl < 1 {
+ repl = 1
+ }
+ pri := 0
+ for class, in := range cfgvol.StorageClasses {
+ p := ks.cluster.StorageClasses[class].Priority
+ if in && p > pri {
+ pri = p
+ }
+ }
+ mnt := &mount{
+ volume: vol,
+ priority: pri,
+ KeepMount: arvados.KeepMount{
+ UUID: uuid,
+ DeviceID: vol.DeviceID(),
+ AllowWrite: !va.ReadOnly && !cfgvol.ReadOnly,
+ AllowTrash: !va.ReadOnly && (!cfgvol.ReadOnly || cfgvol.AllowTrashWhenReadOnly),
+ Replication: repl,
+ StorageClasses: sc,
+ },
+ }
+ ks.mounts[uuid] = mnt
+ ks.logger.Printf("started volume %s (%s), AllowWrite=%v, AllowTrash=%v", uuid, vol.DeviceID(), mnt.AllowWrite, mnt.AllowTrash)
+ }
+ if len(ks.mounts) == 0 {
+ return fmt.Errorf("no volumes configured for %s", ks.serviceURL)
}
- deprecated.afterFlagParse(theConfig)
-
- err := config.LoadFile(theConfig, configPath)
- if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
- log.Fatal(err)
+ ks.mountsR = nil
+ ks.mountsW = nil
+ for _, mnt := range ks.mounts {
+ ks.mountsR = append(ks.mountsR, mnt)
+ if mnt.AllowWrite {
+ ks.mountsW = append(ks.mountsW, mnt)
+ }
}
+ // Sorting mounts by UUID makes behavior more predictable, and
+ // is convenient for testing -- for example, "index all
+ // volumes" and "trash block on all volumes" will visit
+ // volumes in predictable order.
+ sort.Slice(ks.mountsR, func(i, j int) bool { return ks.mountsR[i].UUID < ks.mountsR[j].UUID })
+ sort.Slice(ks.mountsW, func(i, j int) bool { return ks.mountsW[i].UUID < ks.mountsW[j].UUID })
+ return nil
+}
- if *dumpConfig {
- log.Fatal(config.DumpAndExit(theConfig))
+// checkLocatorSignature checks that locator has a valid signature.
+// If the BlobSigning config is false, it returns nil even if the
+// signature is invalid or missing.
+func (ks *keepstore) checkLocatorSignature(ctx context.Context, locator string) error {
+ if !ks.cluster.Collections.BlobSigning {
+ return nil
}
+ token := ctxToken(ctx)
+ if token == "" {
+ return errNoTokenProvided
+ }
+ err := arvados.VerifySignature(locator, token, ks.cluster.Collections.BlobSigningTTL.Duration(), []byte(ks.cluster.Collections.BlobSigningKey))
+ if err == arvados.ErrSignatureExpired {
+ return errExpiredSignature
+ } else if err != nil {
+ return errInvalidSignature
+ }
+ return nil
+}
- log.Printf("keepstore %s started", version)
+// signLocator signs the locator for the given token, if possible.
+// Note this signs if the BlobSigningKey config is available, even if
+// the BlobSigning config is false.
+func (ks *keepstore) signLocator(token, locator string) string {
+ if token == "" || len(ks.cluster.Collections.BlobSigningKey) == 0 {
+ return locator
+ }
+ ttl := ks.cluster.Collections.BlobSigningTTL.Duration()
+ return arvados.SignLocator(locator, token, time.Now().Add(ttl), ttl, []byte(ks.cluster.Collections.BlobSigningKey))
+}
- err = theConfig.Start()
+func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (n int, err error) {
+ li, err := getLocatorInfo(opts.Locator)
if err != nil {
- log.Fatal(err)
+ return 0, err
+ }
+ out := opts.WriteTo
+ if rw, ok := out.(http.ResponseWriter); ok && li.size > 0 {
+ out = &setSizeOnWrite{ResponseWriter: rw, size: li.size}
+ }
+ if li.remote && !li.signed {
+ return ks.blockReadRemote(ctx, opts)
+ }
+ if err := ks.checkLocatorSignature(ctx, opts.Locator); err != nil {
+ return 0, err
+ }
+ hashcheck := md5.New()
+ if li.size > 0 {
+ out = newHashCheckWriter(out, hashcheck, int64(li.size), li.hash)
+ } else {
+ out = io.MultiWriter(out, hashcheck)
}
- if pidfile := theConfig.PIDFile; pidfile != "" {
- f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
- if err != nil {
- log.Fatalf("open pidfile (%s): %s", pidfile, err)
+ buf, err := ks.bufferPool.GetContext(ctx)
+ if err != nil {
+ return 0, err
+ }
+ defer ks.bufferPool.Put(buf)
+ streamer := newStreamWriterAt(out, 65536, buf)
+ defer streamer.Close()
+
+ var errToCaller error = os.ErrNotExist
+ for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
+ if ctx.Err() != nil {
+ return 0, ctx.Err()
}
- defer f.Close()
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+ err := mnt.BlockRead(ctx, li.hash, streamer)
if err != nil {
- log.Fatalf("flock pidfile (%s): %s", pidfile, err)
+ if streamer.WroteAt() != 0 {
+ // BlockRead encountered an error
+ // after writing some data, so it's
+ // too late to try another
+ // volume. Flush streamer before
+ // calling Wrote() to ensure our
+ // return value accurately reflects
+ // the number of bytes written to
+ // opts.WriteTo.
+ streamer.Close()
+ return streamer.Wrote(), err
+ }
+ if !os.IsNotExist(err) {
+ errToCaller = err
+ }
+ continue
}
- defer os.Remove(pidfile)
- err = f.Truncate(0)
- if err != nil {
- log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
+ if li.size == 0 {
+ // hashCheckingWriter isn't in use because we
+ // don't know the expected size. All we can do
+ // is check after writing all the data, and
+ // trust the caller is doing a HEAD request so
+ // it's not too late to set an error code in
+ // the response header.
+ err = streamer.Close()
+ if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil {
+ err = errChecksum
+ }
+ if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
+ // We didn't set the content-length header
+ // above because we didn't know the block size
+ // until now.
+ rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt()))
+ }
+ return streamer.WroteAt(), err
+ } else if streamer.WroteAt() != li.size {
+ // If the backend read fewer bytes than
+ // expected but returns no error, we can
+ // classify this as a checksum error (even
+ // though hashCheckWriter doesn't know that
+ // yet, it's just waiting for the next
+ // write). If our caller is serving a GET
+ // request it's too late to do anything about
+ // it anyway, but if it's a HEAD request the
+ // caller can still change the response status
+ // code.
+ return streamer.WroteAt(), errChecksum
}
- _, err = fmt.Fprint(f, os.Getpid())
+ // Ensure streamer flushes all buffered data without
+ // errors.
+ err = streamer.Close()
+ return streamer.Wrote(), err
+ }
+ return 0, errToCaller
+}
+
+func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
+ token := ctxToken(ctx)
+ if token == "" {
+ return 0, errNoTokenProvided
+ }
+ var remoteClient *keepclient.KeepClient
+ var parts []string
+ li, err := getLocatorInfo(opts.Locator)
+ if err != nil {
+ return 0, err
+ }
+ for i, part := range strings.Split(opts.Locator, "+") {
+ switch {
+ case i == 0:
+ // don't try to parse hash part as hint
+ case strings.HasPrefix(part, "A"):
+ // drop local permission hint
+ continue
+ case len(part) > 7 && part[0] == 'R' && part[6] == '-':
+ remoteID := part[1:6]
+ remote, ok := ks.cluster.RemoteClusters[remoteID]
+ if !ok {
+ return 0, httpserver.ErrorWithStatus(errors.New("remote cluster not configured"), http.StatusBadRequest)
+ }
+ kc, err := ks.remoteClient(remoteID, remote, token)
+ if err == auth.ErrObsoleteToken {
+ return 0, httpserver.ErrorWithStatus(err, http.StatusBadRequest)
+ } else if err != nil {
+ return 0, err
+ }
+ remoteClient = kc
+ part = "A" + part[7:]
+ }
+ parts = append(parts, part)
+ }
+ if remoteClient == nil {
+ return 0, httpserver.ErrorWithStatus(errors.New("invalid remote hint"), http.StatusBadRequest)
+ }
+ locator := strings.Join(parts, "+")
+ if opts.LocalLocator == nil {
+ // Read from remote cluster and stream response back
+ // to caller
+ if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size > 0 {
+ rw.Header().Set("Content-Length", fmt.Sprintf("%d", li.size))
+ }
+ return remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
+ Locator: locator,
+ WriteTo: opts.WriteTo,
+ })
+ }
+ // We must call LocalLocator before writing any data to
+ // opts.WriteTo, otherwise the caller can't put the local
+ // locator in a response header. So we copy into memory,
+ // generate the local signature, then copy from memory to
+ // opts.WriteTo.
+ buf, err := ks.bufferPool.GetContext(ctx)
+ if err != nil {
+ return 0, err
+ }
+ defer ks.bufferPool.Put(buf)
+ writebuf := bytes.NewBuffer(buf[:0])
+ ks.logger.Infof("blockReadRemote(%s): remote read(%s)", opts.Locator, locator)
+ _, err = remoteClient.BlockRead(ctx, arvados.BlockReadOptions{
+ Locator: locator,
+ WriteTo: writebuf,
+ })
+ if err != nil {
+ return 0, err
+ }
+ resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+ Hash: locator,
+ Data: writebuf.Bytes(),
+ })
+ if err != nil {
+ return 0, err
+ }
+ opts.LocalLocator(resp.Locator)
+ if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
+ rw.Header().Set("Content-Length", fmt.Sprintf("%d", writebuf.Len()))
+ }
+ n, err := io.Copy(opts.WriteTo, bytes.NewReader(writebuf.Bytes()))
+ return int(n), err
+}
+
+func (ks *keepstore) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
+ ks.remoteClientsMtx.Lock()
+ kc, ok := ks.remoteClients[remoteID]
+ ks.remoteClientsMtx.Unlock()
+ if !ok {
+ c := &arvados.Client{
+ APIHost: remoteCluster.Host,
+ AuthToken: "xxx",
+ Insecure: remoteCluster.Insecure,
+ }
+ ac, err := arvadosclient.New(c)
if err != nil {
- log.Fatalf("write pidfile (%s): %s", pidfile, err)
+ return nil, err
}
- err = f.Sync()
+ kc, err = keepclient.MakeKeepClient(ac)
if err != nil {
- log.Fatalf("sync pidfile (%s): %s", pidfile, err)
+ return nil, err
}
+ kc.DiskCacheSize = keepclient.DiskCacheDisabled
+
+ ks.remoteClientsMtx.Lock()
+ ks.remoteClients[remoteID] = kc
+ ks.remoteClientsMtx.Unlock()
}
+ accopy := *kc.Arvados
+ accopy.ApiToken = token
+ kccopy := kc.Clone()
+ kccopy.Arvados = &accopy
+ token, err := auth.SaltToken(token, remoteID)
+ if err != nil {
+ return nil, err
+ }
+ kccopy.Arvados.ApiToken = token
+ return kccopy, nil
+}
- var cluster *arvados.Cluster
- cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
- if err != nil && os.IsNotExist(err) {
- log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
- cluster = &arvados.Cluster{
- ClusterID: "xxxxx",
+// BlockWrite writes a block to one or more volumes.
+func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ var resp arvados.BlockWriteResponse
+ var hash string
+ if opts.Data == nil {
+ buf, err := ks.bufferPool.GetContext(ctx)
+ if err != nil {
+ return resp, err
}
- } else if err != nil {
- log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
- } else {
- cluster, err = cfg.GetCluster("")
+ defer ks.bufferPool.Put(buf)
+ w := bytes.NewBuffer(buf[:0])
+ h := md5.New()
+ limitedReader := &io.LimitedReader{R: opts.Reader, N: BlockSize}
+ n, err := io.Copy(io.MultiWriter(w, h), limitedReader)
if err != nil {
- log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+ return resp, err
+ }
+ if limitedReader.N == 0 {
+ // Data size is either exactly BlockSize, or too big.
+ n, err := opts.Reader.Read(make([]byte, 1))
+ if n > 0 {
+ return resp, httpserver.ErrorWithStatus(err, http.StatusRequestEntityTooLarge)
+ }
+ if err != io.EOF {
+ return resp, err
+ }
+ }
+ opts.Data = buf[:n]
+ if opts.DataSize != 0 && int(n) != opts.DataSize {
+ return resp, httpserver.ErrorWithStatus(fmt.Errorf("content length %d did not match specified data size %d", n, opts.DataSize), http.StatusBadRequest)
+ }
+ hash = fmt.Sprintf("%x", h.Sum(nil))
+ } else {
+ hash = fmt.Sprintf("%x", md5.Sum(opts.Data))
+ }
+ if opts.Hash != "" && !strings.HasPrefix(opts.Hash, hash) {
+ return resp, httpserver.ErrorWithStatus(fmt.Errorf("content hash %s did not match specified locator %s", hash, opts.Hash), http.StatusBadRequest)
+ }
+ rvzmounts := ks.rendezvous(hash, ks.mountsW)
+ result := newPutProgress(opts.StorageClasses)
+ for _, mnt := range rvzmounts {
+ if !result.Want(mnt) {
+ continue
+ }
+ cmp := &checkEqual{Expect: opts.Data}
+ if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
+ if !cmp.Equal() {
+ return resp, errCollision
+ }
+ err := mnt.BlockTouch(hash)
+ if err == nil {
+ result.Add(mnt)
+ }
}
}
+ var allFull atomic.Bool
+ allFull.Store(true)
+ // pending tracks what result will be if all outstanding
+ // writes succeed.
+ pending := result.Copy()
+ cond := sync.NewCond(new(sync.Mutex))
+ cond.L.Lock()
+ var wg sync.WaitGroup
+nextmnt:
+ for _, mnt := range rvzmounts {
+ for {
+ if result.Done() || ctx.Err() != nil {
+ break nextmnt
+ }
+ if !result.Want(mnt) {
+ continue nextmnt
+ }
+ if pending.Want(mnt) {
+ break
+ }
+ // This mount might not be needed, depending
+ // on the outcome of pending writes. Wait for
+ // a pending write to finish, then check
+ // again.
+ cond.Wait()
+ }
+ mnt := mnt
+ logger := ks.logger.WithField("mount", mnt.UUID)
+ pending.Add(mnt)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ logger.Debug("start write")
+ err := mnt.BlockWrite(ctx, hash, opts.Data)
+ cond.L.Lock()
+ defer cond.L.Unlock()
+ defer cond.Broadcast()
+ if err != nil {
+ logger.Debug("write failed")
+ pending.Sub(mnt)
+ if err != errFull {
+ allFull.Store(false)
+ }
+ } else {
+ result.Add(mnt)
+ pending.Sub(mnt)
+ }
+ }()
+ }
+ cond.L.Unlock()
+ wg.Wait()
+ if ctx.Err() != nil {
+ return resp, ctx.Err()
+ }
+ if result.Done() || result.totalReplication > 0 {
+ resp = arvados.BlockWriteResponse{
+ Locator: ks.signLocator(ctxToken(ctx), fmt.Sprintf("%s+%d", hash, len(opts.Data))),
+ Replicas: result.totalReplication,
+ StorageClasses: result.classDone,
+ }
+ return resp, nil
+ }
+ if allFull.Load() {
+ return resp, errFull
+ }
+ return resp, errVolumeUnavailable
+}
- log.Println("keepstore starting, pid", os.Getpid())
- defer log.Println("keepstore exiting, pid", os.Getpid())
+// rendezvous sorts the given mounts by descending priority, then by
+// rendezvous order for the given locator.
+func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount {
+ hash := locator
+ if len(hash) > 32 {
+ hash = hash[:32]
+ }
+ // copy the provided []*mount before doing an in-place sort
+ mnts = append([]*mount(nil), mnts...)
+ weight := make(map[*mount]string)
+ for _, mnt := range mnts {
+ uuidpart := mnt.UUID
+ if len(uuidpart) == 27 {
+ // strip zzzzz-yyyyy- prefixes
+ uuidpart = uuidpart[12:]
+ }
+ weight[mnt] = fmt.Sprintf("%x", md5.Sum([]byte(hash+uuidpart)))
+ }
+ sort.Slice(mnts, func(i, j int) bool {
+ if p := mnts[i].priority - mnts[j].priority; p != 0 {
+ return p > 0
+ }
+ return weight[mnts[i]] < weight[mnts[j]]
+ })
+ return mnts
+}
- // Start a round-robin VolumeManager with the volumes we have found.
- KeepVM = MakeRRVolumeManager(theConfig.Volumes)
+// checkEqual reports whether the data written to it (via io.WriterAt
+// interface) is equal to the expected data.
+//
+// Expect should not be changed after the first Write.
+//
+// Results are undefined if WriteAt is called with overlapping ranges.
+type checkEqual struct {
+ Expect []byte
+ equal atomic.Int64
+ notequal atomic.Bool
+}
- // Middleware/handler stack
- router := MakeRESTRouter(cluster)
+func (ce *checkEqual) Equal() bool {
+ return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect))
+}
- // Set up a TCP listener.
- listener, err := net.Listen("tcp", theConfig.Listen)
- if err != nil {
- log.Fatal(err)
+func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) {
+ endpos := int(offset) + len(p)
+ if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) {
+ ce.equal.Add(int64(len(p)))
+ } else {
+ ce.notequal.Store(true)
}
+ return len(p), nil
+}
- // Initialize keepclient for pull workers
- keepClient := &keepclient.KeepClient{
- Arvados: &arvadosclient.ArvadosClient{},
- Want_replicas: 1,
+func (ks *keepstore) BlockUntrash(ctx context.Context, locator string) error {
+ li, err := getLocatorInfo(locator)
+ if err != nil {
+ return err
+ }
+ var errToCaller error = os.ErrNotExist
+ for _, mnt := range ks.mountsW {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ err := mnt.BlockUntrash(li.hash)
+ if err == nil {
+ errToCaller = nil
+ } else if !os.IsNotExist(err) && errToCaller != nil {
+ errToCaller = err
+ }
}
+ return errToCaller
+}
- // Initialize the pullq and workers
- pullq = NewWorkQueue()
- for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
- go RunPullWorker(pullq, keepClient)
+func (ks *keepstore) BlockTouch(ctx context.Context, locator string) error {
+ li, err := getLocatorInfo(locator)
+ if err != nil {
+ return err
}
+ var errToCaller error = os.ErrNotExist
+ for _, mnt := range ks.mountsW {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ err := mnt.BlockTouch(li.hash)
+ if err == nil {
+ return nil
+ }
+ if !os.IsNotExist(err) {
+ errToCaller = err
+ }
+ }
+ return errToCaller
+}
- // Initialize the trashq and workers
- trashq = NewWorkQueue()
- for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
- go RunTrashWorker(trashq)
+func (ks *keepstore) BlockTrash(ctx context.Context, locator string) error {
+ if !ks.cluster.Collections.BlobTrash {
+ return errMethodNotAllowed
+ }
+ li, err := getLocatorInfo(locator)
+ if err != nil {
+ return err
+ }
+ var errToCaller error = os.ErrNotExist
+ for _, mnt := range ks.mounts {
+ if !mnt.AllowTrash {
+ continue
+ }
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ t, err := mnt.Mtime(li.hash)
+ if err == nil && time.Now().Sub(t) > ks.cluster.Collections.BlobSigningTTL.Duration() {
+ err = mnt.BlockTrash(li.hash)
+ }
+ if os.IsNotExist(errToCaller) || (errToCaller == nil && !os.IsNotExist(err)) {
+ errToCaller = err
+ }
}
+ return errToCaller
+}
- // Start emptyTrash goroutine
- doneEmptyingTrash := make(chan bool)
- go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
+func (ks *keepstore) Mounts() []*mount {
+ return ks.mountsR
+}
- // Shut down the server gracefully (by closing the listener)
- // if SIGTERM is received.
- term := make(chan os.Signal, 1)
- go func(sig <-chan os.Signal) {
- s := <-sig
- log.Println("caught signal:", s)
- doneEmptyingTrash <- true
- listener.Close()
- }(term)
- signal.Notify(term, syscall.SIGTERM)
- signal.Notify(term, syscall.SIGINT)
+func (ks *keepstore) Index(ctx context.Context, opts indexOptions) error {
+ mounts := ks.mountsR
+ if opts.MountUUID != "" {
+ mnt, ok := ks.mounts[opts.MountUUID]
+ if !ok {
+ return os.ErrNotExist
+ }
+ mounts = []*mount{mnt}
+ }
+ for _, mnt := range mounts {
+ err := mnt.Index(ctx, opts.Prefix, opts.WriteTo)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
- if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
- log.Printf("Error notifying init daemon: %v", err)
+func ctxToken(ctx context.Context) string {
+ if c, ok := auth.FromContext(ctx); ok && len(c.Tokens) > 0 {
+ return c.Tokens[0]
+ } else {
+ return ""
}
- log.Println("listening at", listener.Addr())
- srv := &server{}
- srv.Handler = router
- srv.Serve(listener)
}
-// Periodically (once per interval) invoke EmptyTrash on all volumes.
-func emptyTrash(done <-chan bool, interval time.Duration) {
- ticker := time.NewTicker(interval)
+// locatorInfo expresses the attributes of a locator that are relevant
+// for keepstore decision-making.
+type locatorInfo struct {
+ hash string
+ size int
+ remote bool // locator has a +R hint
+ signed bool // locator has a +A hint
+}
- for {
- select {
- case <-ticker.C:
- for _, v := range theConfig.Volumes {
- if v.Writable() {
- v.EmptyTrash()
+func getLocatorInfo(loc string) (locatorInfo, error) {
+ var li locatorInfo
+ plus := 0 // number of '+' chars seen so far
+ partlen := 0 // chars since last '+'
+ for i, c := range loc + "+" {
+ if c == '+' {
+ if partlen == 0 {
+ // double/leading/trailing '+'
+ return li, errInvalidLocator
+ }
+ if plus == 0 {
+ if i != 32 {
+ return li, errInvalidLocator
}
+ li.hash = loc[:i]
}
- case <-done:
- ticker.Stop()
- return
+ if plus == 1 {
+ if size, err := strconv.Atoi(loc[i-partlen : i]); err == nil {
+ li.size = size
+ }
+ }
+ plus++
+ partlen = 0
+ continue
+ }
+ partlen++
+ if partlen == 1 {
+ if c == 'A' {
+ li.signed = true
+ }
+ if c == 'R' {
+ li.remote = true
+ }
+ if plus > 1 && c >= '0' && c <= '9' {
+ // size, if present at all, must come first
+ return li, errInvalidLocator
+ }
+ }
+ if plus == 0 && !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) {
+ // non-hexadecimal char in hash part
+ return li, errInvalidLocator
}
}
+ return li, nil
}