X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3a3d67ccee068a85aa3b79c5abd40170223071e3..022107bd52092c658208e74161581c6bedda4a5f:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index fb1e1ea545..c9a8023059 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -2,246 +2,748 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +// Package keepstore implements the keepstore service component and +// back-end storage drivers. +// +// It is an internal module, only intended to be imported by +// /cmd/arvados-server and other server-side components in this +// repository. +package keepstore import ( - "flag" + "bytes" + "context" + "crypto/md5" + "errors" "fmt" - "net" + "io" + "net/http" "os" - "os/signal" - "syscall" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/config" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "github.com/coreos/go-systemd/daemon" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/auth" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/sdk/go/keepclient" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) -var version = "dev" - -// A Keep "block" is 64MB. -const BlockSize = 64 * 1024 * 1024 - -// A Keep volume must have at least MinFreeKilobytes available -// in order to permit writes. -const MinFreeKilobytes = BlockSize / 1024 +// Maximum size of a keep block is 64 MiB. +const BlockSize = 1 << 26 -// ProcMounts /proc/mounts -var ProcMounts = "/proc/mounts" +var ( + errChecksum = httpserver.ErrorWithStatus(errors.New("checksum mismatch in stored data"), http.StatusBadGateway) + errNoTokenProvided = httpserver.ErrorWithStatus(errors.New("no token provided in Authorization header"), http.StatusUnauthorized) + errMethodNotAllowed = httpserver.ErrorWithStatus(errors.New("method not allowed"), http.StatusMethodNotAllowed) + errVolumeUnavailable = httpserver.ErrorWithStatus(errors.New("volume unavailable"), http.StatusServiceUnavailable) + errCollision = httpserver.ErrorWithStatus(errors.New("hash collision"), http.StatusInternalServerError) + errExpiredSignature = httpserver.ErrorWithStatus(errors.New("expired signature"), http.StatusUnauthorized) + errInvalidSignature = httpserver.ErrorWithStatus(errors.New("invalid signature"), http.StatusBadRequest) + errInvalidLocator = httpserver.ErrorWithStatus(errors.New("invalid locator"), http.StatusBadRequest) + errFull = httpserver.ErrorWithStatus(errors.New("insufficient storage"), http.StatusInsufficientStorage) + errTooLarge = httpserver.ErrorWithStatus(errors.New("request entity too large"), http.StatusRequestEntityTooLarge) + driver = make(map[string]volumeDriver) +) -var bufs *bufferPool +type indexOptions struct { + MountUUID string + Prefix string + WriteTo io.Writer +} -// KeepError types. -// -type KeepError struct { - HTTPCode int - ErrMsg string +type mount struct { + arvados.KeepMount + volume + priority int } -var ( - BadRequestError = &KeepError{400, "Bad Request"} - UnauthorizedError = &KeepError{401, "Unauthorized"} - CollisionError = &KeepError{500, "Collision"} - RequestHashError = &KeepError{422, "Hash mismatch in request"} - PermissionError = &KeepError{403, "Forbidden"} - DiskHashError = &KeepError{500, "Hash mismatch in stored data"} - ExpiredError = &KeepError{401, "Expired permission signature"} - NotFoundError = &KeepError{404, "Not Found"} - GenericError = &KeepError{500, "Fail"} - FullError = &KeepError{503, "Full"} - SizeRequiredError = &KeepError{411, "Missing Content-Length"} - TooLongError = &KeepError{413, "Block is too large"} - MethodDisabledError = &KeepError{405, "Method disabled"} - ErrNotImplemented = &KeepError{500, "Unsupported configuration"} - ErrClientDisconnect = &KeepError{503, "Client disconnected"} -) +type keepstore struct { + cluster *arvados.Cluster + logger logrus.FieldLogger + serviceURL arvados.URL + mounts map[string]*mount + mountsR []*mount + mountsW []*mount + bufferPool *bufferPool + + iostats map[volume]*ioStats -func (e *KeepError) Error() string { - return e.ErrMsg + remoteClients map[string]*keepclient.KeepClient + remoteClientsMtx sync.Mutex } -// ======================== -// Internal data structures -// -// These global variables are used by multiple parts of the -// program. They are good candidates for moving into their own -// packages. +func newKeepstore(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry, serviceURL arvados.URL) (*keepstore, error) { + logger := ctxlog.FromContext(ctx) -// The Keep VolumeManager maintains a list of available volumes. -// Initialized by the --volumes flag (or by FindKeepVolumes). -var KeepVM VolumeManager + if cluster.API.MaxConcurrentRequests > 0 && cluster.API.MaxConcurrentRequests < cluster.API.MaxKeepBlobBuffers { + logger.Warnf("Possible configuration mistake: not useful to set API.MaxKeepBlobBuffers (%d) higher than API.MaxConcurrentRequests (%d)", cluster.API.MaxKeepBlobBuffers, cluster.API.MaxConcurrentRequests) + } -// The pull list manager and trash queue are threadsafe queues which -// support atomic update operations. The PullHandler and TrashHandler -// store results from Data Manager /pull and /trash requests here. -// -// See the Keep and Data Manager design documents for more details: -// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc -// https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc -// -var pullq *WorkQueue -var trashq *WorkQueue + if cluster.Collections.BlobSigningKey != "" { + } else if cluster.Collections.BlobSigning { + return nil, errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey") + } else { + logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.") + } -func main() { - deprecated.beforeFlagParse(theConfig) + if cluster.API.MaxKeepBlobBuffers <= 0 { + return nil, fmt.Errorf("API.MaxKeepBlobBuffers must be greater than zero") + } + bufferPool := newBufferPool(logger, cluster.API.MaxKeepBlobBuffers, reg) + + ks := &keepstore{ + cluster: cluster, + logger: logger, + serviceURL: serviceURL, + bufferPool: bufferPool, + remoteClients: make(map[string]*keepclient.KeepClient), + } - dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)") - getVersion := flag.Bool("version", false, "Print version information and exit.") + err := ks.setupMounts(newVolumeMetricsVecs(reg)) + if err != nil { + return nil, err + } - defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml" - var configPath string - flag.StringVar( - &configPath, - "config", - defaultConfigPath, - "YAML or JSON configuration file `path`") - flag.Usage = usage - flag.Parse() + return ks, nil +} - // Print version information if requested - if *getVersion { - fmt.Printf("keepstore %s\n", version) - return +func (ks *keepstore) setupMounts(metrics *volumeMetricsVecs) error { + ks.mounts = make(map[string]*mount) + if len(ks.cluster.Volumes) == 0 { + return errors.New("no volumes configured") + } + for uuid, cfgvol := range ks.cluster.Volumes { + va, ok := cfgvol.AccessViaHosts[ks.serviceURL] + if !ok && len(cfgvol.AccessViaHosts) > 0 { + continue + } + dri, ok := driver[cfgvol.Driver] + if !ok { + return fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver) + } + vol, err := dri(newVolumeParams{ + UUID: uuid, + Cluster: ks.cluster, + ConfigVolume: cfgvol, + Logger: ks.logger, + MetricsVecs: metrics, + BufferPool: ks.bufferPool, + }) + if err != nil { + return fmt.Errorf("error initializing volume %s: %s", uuid, err) + } + sc := cfgvol.StorageClasses + if len(sc) == 0 { + sc = map[string]bool{"default": true} + } + repl := cfgvol.Replication + if repl < 1 { + repl = 1 + } + pri := 0 + for class, in := range cfgvol.StorageClasses { + p := ks.cluster.StorageClasses[class].Priority + if in && p > pri { + pri = p + } + } + mnt := &mount{ + volume: vol, + priority: pri, + KeepMount: arvados.KeepMount{ + UUID: uuid, + DeviceID: vol.DeviceID(), + AllowWrite: !va.ReadOnly && !cfgvol.ReadOnly, + AllowTrash: !va.ReadOnly && (!cfgvol.ReadOnly || cfgvol.AllowTrashWhenReadOnly), + Replication: repl, + StorageClasses: sc, + }, + } + ks.mounts[uuid] = mnt + ks.logger.Printf("started volume %s (%s), AllowWrite=%v, AllowTrash=%v", uuid, vol.DeviceID(), mnt.AllowWrite, mnt.AllowTrash) + } + if len(ks.mounts) == 0 { + return fmt.Errorf("no volumes configured for %s", ks.serviceURL) } - deprecated.afterFlagParse(theConfig) + ks.mountsR = nil + ks.mountsW = nil + for _, mnt := range ks.mounts { + ks.mountsR = append(ks.mountsR, mnt) + if mnt.AllowWrite { + ks.mountsW = append(ks.mountsW, mnt) + } + } + // Sorting mounts by UUID makes behavior more predictable, and + // is convenient for testing -- for example, "index all + // volumes" and "trash block on all volumes" will visit + // volumes in predictable order. + sort.Slice(ks.mountsR, func(i, j int) bool { return ks.mountsR[i].UUID < ks.mountsR[j].UUID }) + sort.Slice(ks.mountsW, func(i, j int) bool { return ks.mountsW[i].UUID < ks.mountsW[j].UUID }) + return nil +} - err := config.LoadFile(theConfig, configPath) - if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) { - log.Fatal(err) +// checkLocatorSignature checks that locator has a valid signature. +// If the BlobSigning config is false, it returns nil even if the +// signature is invalid or missing. +func (ks *keepstore) checkLocatorSignature(ctx context.Context, locator string) error { + if !ks.cluster.Collections.BlobSigning { + return nil + } + token := ctxToken(ctx) + if token == "" { + return errNoTokenProvided + } + err := arvados.VerifySignature(locator, token, ks.cluster.Collections.BlobSigningTTL.Duration(), []byte(ks.cluster.Collections.BlobSigningKey)) + if err == arvados.ErrSignatureExpired { + return errExpiredSignature + } else if err != nil { + return errInvalidSignature } + return nil +} - if *dumpConfig { - log.Fatal(config.DumpAndExit(theConfig)) +// signLocator signs the locator for the given token, if possible. +// Note this signs if the BlobSigningKey config is available, even if +// the BlobSigning config is false. +func (ks *keepstore) signLocator(token, locator string) string { + if token == "" || len(ks.cluster.Collections.BlobSigningKey) == 0 { + return locator } + ttl := ks.cluster.Collections.BlobSigningTTL.Duration() + return arvados.SignLocator(locator, token, time.Now().Add(ttl), ttl, []byte(ks.cluster.Collections.BlobSigningKey)) +} - log.Printf("keepstore %s started", version) +func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (n int, err error) { + li, err := parseLocator(opts.Locator) + if err != nil { + return 0, err + } + out := opts.WriteTo + if rw, ok := out.(http.ResponseWriter); ok && li.size > 0 { + out = &setSizeOnWrite{ResponseWriter: rw, size: li.size} + } + if li.remote && !li.signed { + return ks.blockReadRemote(ctx, opts) + } + if err := ks.checkLocatorSignature(ctx, opts.Locator); err != nil { + return 0, err + } + hashcheck := md5.New() + if li.size > 0 { + out = newHashCheckWriter(out, hashcheck, int64(li.size), li.hash) + } else { + out = io.MultiWriter(out, hashcheck) + } - metricsRegistry := prometheus.NewRegistry() + buf, err := ks.bufferPool.GetContext(ctx) + if err != nil { + return 0, err + } + defer ks.bufferPool.Put(buf) + streamer := newStreamWriterAt(out, 65536, buf) + defer streamer.Close() + + var errToCaller error = os.ErrNotExist + for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) { + if ctx.Err() != nil { + return 0, ctx.Err() + } + err := mnt.BlockRead(ctx, li.hash, streamer) + if err != nil { + if streamer.WroteAt() != 0 { + // BlockRead encountered an error + // after writing some data, so it's + // too late to try another + // volume. Flush streamer before + // calling Wrote() to ensure our + // return value accurately reflects + // the number of bytes written to + // opts.WriteTo. + streamer.Close() + return streamer.Wrote(), err + } + if !os.IsNotExist(err) { + errToCaller = err + } + continue + } + if li.size == 0 { + // hashCheckingWriter isn't in use because we + // don't know the expected size. All we can do + // is check after writing all the data, and + // trust the caller is doing a HEAD request so + // it's not too late to set an error code in + // the response header. + err = streamer.Close() + if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil { + err = errChecksum + } + if rw, ok := opts.WriteTo.(http.ResponseWriter); ok { + // We didn't set the content-length header + // above because we didn't know the block size + // until now. + rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt())) + } + return streamer.WroteAt(), err + } else if streamer.WroteAt() != li.size { + // If the backend read fewer bytes than + // expected but returns no error, we can + // classify this as a checksum error (even + // though hashCheckWriter doesn't know that + // yet, it's just waiting for the next + // write). If our caller is serving a GET + // request it's too late to do anything about + // it anyway, but if it's a HEAD request the + // caller can still change the response status + // code. + return streamer.WroteAt(), errChecksum + } + // Ensure streamer flushes all buffered data without + // errors. + err = streamer.Close() + return streamer.Wrote(), err + } + return 0, errToCaller +} - err = theConfig.Start(metricsRegistry) +func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) { + token := ctxToken(ctx) + if token == "" { + return 0, errNoTokenProvided + } + var remoteClient *keepclient.KeepClient + var parts []string + var size int + for i, part := range strings.Split(opts.Locator, "+") { + switch { + case i == 0: + // don't try to parse hash part as hint + case strings.HasPrefix(part, "A"): + // drop local permission hint + continue + case len(part) > 7 && part[0] == 'R' && part[6] == '-': + remoteID := part[1:6] + remote, ok := ks.cluster.RemoteClusters[remoteID] + if !ok { + return 0, httpserver.ErrorWithStatus(errors.New("remote cluster not configured"), http.StatusBadRequest) + } + kc, err := ks.remoteClient(remoteID, remote, token) + if err == auth.ErrObsoleteToken { + return 0, httpserver.ErrorWithStatus(err, http.StatusBadRequest) + } else if err != nil { + return 0, err + } + remoteClient = kc + part = "A" + part[7:] + case len(part) > 0 && part[0] >= '0' && part[0] <= '9': + size, _ = strconv.Atoi(part) + } + parts = append(parts, part) + } + if remoteClient == nil { + return 0, httpserver.ErrorWithStatus(errors.New("invalid remote hint"), http.StatusBadRequest) + } + locator := strings.Join(parts, "+") + if opts.LocalLocator == nil { + // Read from remote cluster and stream response back + // to caller + if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && size > 0 { + rw.Header().Set("Content-Length", fmt.Sprintf("%d", size)) + } + return remoteClient.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: locator, + WriteTo: opts.WriteTo, + }) + } + // We must call LocalLocator before writing any data to + // opts.WriteTo, otherwise the caller can't put the local + // locator in a response header. So we copy into memory, + // generate the local signature, then copy from memory to + // opts.WriteTo. + buf, err := ks.bufferPool.GetContext(ctx) if err != nil { - log.Fatal(err) + return 0, err } + defer ks.bufferPool.Put(buf) + writebuf := bytes.NewBuffer(buf[:0]) + ks.logger.Infof("blockReadRemote(%s): remote read(%s)", opts.Locator, locator) + _, err = remoteClient.BlockRead(ctx, arvados.BlockReadOptions{ + Locator: locator, + WriteTo: writebuf, + }) + if err != nil { + return 0, err + } + resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{ + Hash: locator, + Data: writebuf.Bytes(), + }) + if err != nil { + return 0, err + } + opts.LocalLocator(resp.Locator) + if rw, ok := opts.WriteTo.(http.ResponseWriter); ok { + rw.Header().Set("Content-Length", fmt.Sprintf("%d", writebuf.Len())) + } + n, err := io.Copy(opts.WriteTo, bytes.NewReader(writebuf.Bytes())) + return int(n), err +} - if pidfile := theConfig.PIDFile; pidfile != "" { - f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777) - if err != nil { - log.Fatalf("open pidfile (%s): %s", pidfile, err) +func (ks *keepstore) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) { + ks.remoteClientsMtx.Lock() + kc, ok := ks.remoteClients[remoteID] + ks.remoteClientsMtx.Unlock() + if !ok { + c := &arvados.Client{ + APIHost: remoteCluster.Host, + AuthToken: "xxx", + Insecure: remoteCluster.Insecure, } - defer f.Close() - err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + ac, err := arvadosclient.New(c) if err != nil { - log.Fatalf("flock pidfile (%s): %s", pidfile, err) + return nil, err } - defer os.Remove(pidfile) - err = f.Truncate(0) + kc, err = keepclient.MakeKeepClient(ac) if err != nil { - log.Fatalf("truncate pidfile (%s): %s", pidfile, err) + return nil, err } - _, err = fmt.Fprint(f, os.Getpid()) + kc.DiskCacheSize = keepclient.DiskCacheDisabled + + ks.remoteClientsMtx.Lock() + ks.remoteClients[remoteID] = kc + ks.remoteClientsMtx.Unlock() + } + accopy := *kc.Arvados + accopy.ApiToken = token + kccopy := kc.Clone() + kccopy.Arvados = &accopy + token, err := auth.SaltToken(token, remoteID) + if err != nil { + return nil, err + } + kccopy.Arvados.ApiToken = token + return kccopy, nil +} + +// BlockWrite writes a block to one or more volumes. +func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) { + var resp arvados.BlockWriteResponse + var hash string + if opts.Data == nil { + buf, err := ks.bufferPool.GetContext(ctx) if err != nil { - log.Fatalf("write pidfile (%s): %s", pidfile, err) + return resp, err } - err = f.Sync() + defer ks.bufferPool.Put(buf) + w := bytes.NewBuffer(buf[:0]) + h := md5.New() + limitedReader := &io.LimitedReader{R: opts.Reader, N: BlockSize} + n, err := io.Copy(io.MultiWriter(w, h), limitedReader) if err != nil { - log.Fatalf("sync pidfile (%s): %s", pidfile, err) + return resp, err } - } - - var cluster *arvados.Cluster - cfg, err := arvados.GetConfig(arvados.DefaultConfigFile) - if err != nil && os.IsNotExist(err) { - log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err) - cluster = &arvados.Cluster{ - ClusterID: "xxxxx", + if limitedReader.N == 0 { + // Data size is either exactly BlockSize, or too big. + n, err := opts.Reader.Read(make([]byte, 1)) + if n > 0 { + return resp, httpserver.ErrorWithStatus(err, http.StatusRequestEntityTooLarge) + } + if err != io.EOF { + return resp, err + } } - } else if err != nil { - log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err) + opts.Data = buf[:n] + if opts.DataSize != 0 && int(n) != opts.DataSize { + return resp, httpserver.ErrorWithStatus(fmt.Errorf("content length %d did not match specified data size %d", n, opts.DataSize), http.StatusBadRequest) + } + hash = fmt.Sprintf("%x", h.Sum(nil)) } else { - cluster, err = cfg.GetCluster("") - if err != nil { - log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err) + hash = fmt.Sprintf("%x", md5.Sum(opts.Data)) + } + if opts.Hash != "" && !strings.HasPrefix(opts.Hash, hash) { + return resp, httpserver.ErrorWithStatus(fmt.Errorf("content hash %s did not match specified locator %s", hash, opts.Hash), http.StatusBadRequest) + } + rvzmounts := ks.rendezvous(hash, ks.mountsW) + result := newPutProgress(opts.StorageClasses) + for _, mnt := range rvzmounts { + if !result.Want(mnt) { + continue + } + cmp := &checkEqual{Expect: opts.Data} + if err := mnt.BlockRead(ctx, hash, cmp); err == nil { + if !cmp.Equal() { + return resp, errCollision + } + err := mnt.BlockTouch(hash) + if err == nil { + result.Add(mnt) + } + } + } + var allFull atomic.Bool + allFull.Store(true) + // pending tracks what result will be if all outstanding + // writes succeed. + pending := result.Copy() + cond := sync.NewCond(new(sync.Mutex)) + cond.L.Lock() + var wg sync.WaitGroup +nextmnt: + for _, mnt := range rvzmounts { + for { + if result.Done() || ctx.Err() != nil { + break nextmnt + } + if !result.Want(mnt) { + continue nextmnt + } + if pending.Want(mnt) { + break + } + // This mount might not be needed, depending + // on the outcome of pending writes. Wait for + // a pending write to finish, then check + // again. + cond.Wait() + } + mnt := mnt + logger := ks.logger.WithField("mount", mnt.UUID) + pending.Add(mnt) + wg.Add(1) + go func() { + defer wg.Done() + logger.Debug("start write") + err := mnt.BlockWrite(ctx, hash, opts.Data) + cond.L.Lock() + defer cond.L.Unlock() + defer cond.Broadcast() + if err != nil { + logger.Debug("write failed") + pending.Sub(mnt) + if err != errFull { + allFull.Store(false) + } + } else { + result.Add(mnt) + pending.Sub(mnt) + } + }() + } + cond.L.Unlock() + wg.Wait() + if ctx.Err() != nil { + return resp, ctx.Err() + } + if result.Done() || result.totalReplication > 0 { + resp = arvados.BlockWriteResponse{ + Locator: ks.signLocator(ctxToken(ctx), fmt.Sprintf("%s+%d", hash, len(opts.Data))), + Replicas: result.totalReplication, + StorageClasses: result.classDone, } + return resp, nil + } + if allFull.Load() { + return resp, errFull } + return resp, errVolumeUnavailable +} - log.Println("keepstore starting, pid", os.Getpid()) - defer log.Println("keepstore exiting, pid", os.Getpid()) +// rendezvous sorts the given mounts by descending priority, then by +// rendezvous order for the given locator. +func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount { + hash := locator + if len(hash) > 32 { + hash = hash[:32] + } + // copy the provided []*mount before doing an in-place sort + mnts = append([]*mount(nil), mnts...) + weight := make(map[*mount]string) + for _, mnt := range mnts { + uuidpart := mnt.UUID + if len(uuidpart) == 27 { + // strip zzzzz-yyyyy- prefixes + uuidpart = uuidpart[12:] + } + weight[mnt] = fmt.Sprintf("%x", md5.Sum([]byte(hash+uuidpart))) + } + sort.Slice(mnts, func(i, j int) bool { + if p := mnts[i].priority - mnts[j].priority; p != 0 { + return p > 0 + } + return weight[mnts[i]] < weight[mnts[j]] + }) + return mnts +} - // Start a round-robin VolumeManager with the volumes we have found. - KeepVM = MakeRRVolumeManager(theConfig.Volumes) +// checkEqual reports whether the data written to it (via io.WriterAt +// interface) is equal to the expected data. +// +// Expect should not be changed after the first Write. +// +// Results are undefined if WriteAt is called with overlapping ranges. +type checkEqual struct { + Expect []byte + equal atomic.Int64 + notequal atomic.Bool +} - // Middleware/handler stack - router := MakeRESTRouter(cluster, metricsRegistry) +func (ce *checkEqual) Equal() bool { + return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect)) +} - // Set up a TCP listener. - listener, err := net.Listen("tcp", theConfig.Listen) - if err != nil { - log.Fatal(err) +func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) { + endpos := int(offset) + len(p) + if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) { + ce.equal.Add(int64(len(p))) + } else { + ce.notequal.Store(true) } + return len(p), nil +} - // Initialize keepclient for pull workers - keepClient := &keepclient.KeepClient{ - Arvados: &arvadosclient.ArvadosClient{}, - Want_replicas: 1, +func (ks *keepstore) BlockUntrash(ctx context.Context, locator string) error { + li, err := parseLocator(locator) + if err != nil { + return err + } + var errToCaller error = os.ErrNotExist + for _, mnt := range ks.mountsW { + if ctx.Err() != nil { + return ctx.Err() + } + err := mnt.BlockUntrash(li.hash) + if err == nil { + errToCaller = nil + } else if !os.IsNotExist(err) && errToCaller != nil { + errToCaller = err + } } + return errToCaller +} - // Initialize the pullq and workers - pullq = NewWorkQueue() - for i := 0; i < 1 || i < theConfig.PullWorkers; i++ { - go RunPullWorker(pullq, keepClient) +func (ks *keepstore) BlockTouch(ctx context.Context, locator string) error { + li, err := parseLocator(locator) + if err != nil { + return err } + var errToCaller error = os.ErrNotExist + for _, mnt := range ks.mountsW { + if ctx.Err() != nil { + return ctx.Err() + } + err := mnt.BlockTouch(li.hash) + if err == nil { + return nil + } + if !os.IsNotExist(err) { + errToCaller = err + } + } + return errToCaller +} - // Initialize the trashq and workers - trashq = NewWorkQueue() - for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ { - go RunTrashWorker(trashq) +func (ks *keepstore) BlockTrash(ctx context.Context, locator string) error { + if !ks.cluster.Collections.BlobTrash { + return errMethodNotAllowed + } + li, err := parseLocator(locator) + if err != nil { + return err } + var errToCaller error = os.ErrNotExist + for _, mnt := range ks.mounts { + if !mnt.AllowTrash { + continue + } + if ctx.Err() != nil { + return ctx.Err() + } + t, err := mnt.Mtime(li.hash) + if err == nil && time.Now().Sub(t) > ks.cluster.Collections.BlobSigningTTL.Duration() { + err = mnt.BlockTrash(li.hash) + } + if os.IsNotExist(errToCaller) || (errToCaller == nil && !os.IsNotExist(err)) { + errToCaller = err + } + } + return errToCaller +} - // Start emptyTrash goroutine - doneEmptyingTrash := make(chan bool) - go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration()) +func (ks *keepstore) Mounts() []*mount { + return ks.mountsR +} - // Shut down the server gracefully (by closing the listener) - // if SIGTERM is received. - term := make(chan os.Signal, 1) - go func(sig <-chan os.Signal) { - s := <-sig - log.Println("caught signal:", s) - doneEmptyingTrash <- true - listener.Close() - }(term) - signal.Notify(term, syscall.SIGTERM) - signal.Notify(term, syscall.SIGINT) +func (ks *keepstore) Index(ctx context.Context, opts indexOptions) error { + mounts := ks.mountsR + if opts.MountUUID != "" { + mnt, ok := ks.mounts[opts.MountUUID] + if !ok { + return os.ErrNotExist + } + mounts = []*mount{mnt} + } + for _, mnt := range mounts { + err := mnt.Index(ctx, opts.Prefix, opts.WriteTo) + if err != nil { + return err + } + } + return nil +} - if _, err := daemon.SdNotify(false, "READY=1"); err != nil { - log.Printf("Error notifying init daemon: %v", err) +func ctxToken(ctx context.Context) string { + if c, ok := auth.FromContext(ctx); ok && len(c.Tokens) > 0 { + return c.Tokens[0] + } else { + return "" } - log.Println("listening at", listener.Addr()) - srv := &server{} - srv.Handler = router - srv.Serve(listener) } -// Periodically (once per interval) invoke EmptyTrash on all volumes. -func emptyTrash(done <-chan bool, interval time.Duration) { - ticker := time.NewTicker(interval) +type locatorInfo struct { + hash string + size int + remote bool + signed bool +} - for { - select { - case <-ticker.C: - for _, v := range theConfig.Volumes { - if v.Writable() { - v.EmptyTrash() - } +func parseLocator(loc string) (locatorInfo, error) { + var li locatorInfo + for i, part := range strings.Split(loc, "+") { + if i == 0 { + if len(part) != 32 { + return li, errInvalidLocator + } + li.hash = part + continue + } + if i == 1 { + if size, err := strconv.Atoi(part); err == nil { + li.size = size + continue } - case <-done: - ticker.Stop() - return + } + if len(part) == 0 { + return li, errInvalidLocator + } + if part[0] == 'A' { + li.signed = true + } + if part[0] == 'R' { + li.remote = true + } + if part[0] >= '0' && part[0] <= '9' { + // size, if present at all, must come first + return li, errInvalidLocator } } + return li, nil }