+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package keepstore
-
-import (
- "container/list"
- "context"
- "crypto/md5"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "os"
- "regexp"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
-
- "git.arvados.org/arvados.git/lib/cmd"
- "git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
- "git.arvados.org/arvados.git/sdk/go/health"
- "git.arvados.org/arvados.git/sdk/go/httpserver"
- "github.com/gorilla/mux"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/sirupsen/logrus"
-)
-
-type router struct {
- *mux.Router
- cluster *arvados.Cluster
- logger logrus.FieldLogger
- remoteProxy remoteProxy
- metrics *nodeMetrics
- volmgr *RRVolumeManager
- pullq *WorkQueue
- trashq *WorkQueue
-}
-
-// MakeRESTRouter returns a new router that forwards all Keep requests
-// to the appropriate handlers.
-func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
- rtr := &router{
- Router: mux.NewRouter(),
- cluster: cluster,
- logger: ctxlog.FromContext(ctx),
- metrics: &nodeMetrics{reg: reg},
- volmgr: volmgr,
- pullq: pullq,
- trashq: trashq,
- }
-
- rtr.HandleFunc(
- `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
- rtr.HandleFunc(
- `/{hash:[0-9a-f]{32}}+{hints}`,
- rtr.handleGET).Methods("GET", "HEAD")
-
- rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
- rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
- // List all blocks stored here. Privileged client only.
- rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
- // List blocks stored here whose hash has the given prefix.
- // Privileged client only.
- rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
- // Update timestamp on existing block. Privileged client only.
- rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
-
- // Internals/debugging info (runtime.MemStats)
- rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
-
- // List volumes: path, device number, bytes used/avail.
- rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
-
- // List mounts: UUID, readonly, tier, device ID, ...
- rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
- rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
- rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
-
- // Replace the current pull queue.
- rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
-
- // Replace the current trash queue.
- rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
-
- // Untrash moves blocks from trash back into store
- rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
-
- rtr.Handle("/_health/{check}", &health.Handler{
- Token: cluster.ManagementToken,
- Prefix: "/_health/",
- }).Methods("GET")
-
- // Any request which does not match any of these routes gets
- // 400 Bad Request.
- rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
-
- rtr.metrics.setupBufferPoolMetrics(bufs)
- rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
- rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
-
- return rtr
-}
-
-// BadRequestHandler is a HandleFunc to address bad requests.
-func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
- http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
-}
-
-func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
- locator := req.URL.Path[1:]
- if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
- rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
- return
- }
-
- if rtr.cluster.Collections.BlobSigning {
- locator := req.URL.Path[1:] // strip leading slash
- if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
- http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
- return
- }
- }
-
- // TODO: Probe volumes to check whether the block _might_
- // exist. Some volumes/types could support a quick existence
- // check without causing other operations to suffer. If all
- // volumes support that, and assure us the block definitely
- // isn't here, we can return 404 now instead of waiting for a
- // buffer.
-
- buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
- if err != nil {
- http.Error(resp, err.Error(), http.StatusServiceUnavailable)
- return
- }
- defer bufs.Put(buf)
-
- size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
- if err != nil {
- code := http.StatusInternalServerError
- if err, ok := err.(*KeepError); ok {
- code = err.HTTPCode
- }
- http.Error(resp, err.Error(), code)
- return
- }
-
- resp.Header().Set("Content-Length", strconv.Itoa(size))
- resp.Header().Set("Content-Type", "application/octet-stream")
- resp.Write(buf[:size])
-}
-
-// Get a buffer from the pool -- but give up and return a non-nil
-// error if ctx ends before we get a buffer.
-func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
- bufReady := make(chan []byte)
- go func() {
- bufReady <- bufs.Get(bufSize)
- }()
- select {
- case buf := <-bufReady:
- return buf, nil
- case <-ctx.Done():
- go func() {
- // Even if closeNotifier happened first, we
- // need to keep waiting for our buf so we can
- // return it to the pool.
- bufs.Put(<-bufReady)
- }()
- return nil, ErrClientDisconnect
- }
-}
-
-func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
- if !rtr.isSystemAuth(GetAPIToken(req)) {
- http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
- return
- }
- hash := mux.Vars(req)["hash"]
- vols := rtr.volmgr.AllWritable()
- if len(vols) == 0 {
- http.Error(resp, "no volumes", http.StatusNotFound)
- return
- }
- var err error
- for _, mnt := range vols {
- err = mnt.Touch(hash)
- if err == nil {
- break
- }
- }
- switch {
- case err == nil:
- return
- case os.IsNotExist(err):
- http.Error(resp, err.Error(), http.StatusNotFound)
- default:
- http.Error(resp, err.Error(), http.StatusInternalServerError)
- }
-}
-
-func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
- hash := mux.Vars(req)["hash"]
-
- // Detect as many error conditions as possible before reading
- // the body: avoid transmitting data that will not end up
- // being written anyway.
-
- if req.ContentLength == -1 {
- http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
- return
- }
-
- if req.ContentLength > BlockSize {
- http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
- return
- }
-
- if len(rtr.volmgr.AllWritable()) == 0 {
- http.Error(resp, FullError.Error(), FullError.HTTPCode)
- return
- }
-
- var wantStorageClasses []string
- if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
- wantStorageClasses = strings.Split(hdr, ",")
- for i, sc := range wantStorageClasses {
- wantStorageClasses[i] = strings.TrimSpace(sc)
- }
- } else {
- // none specified -- use configured default
- for class, cfg := range rtr.cluster.StorageClasses {
- if cfg.Default {
- wantStorageClasses = append(wantStorageClasses, class)
- }
- }
- }
-
- buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
- if err != nil {
- http.Error(resp, err.Error(), http.StatusServiceUnavailable)
- return
- }
-
- _, err = io.ReadFull(req.Body, buf)
- if err != nil {
- http.Error(resp, err.Error(), 500)
- bufs.Put(buf)
- return
- }
-
- result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
- bufs.Put(buf)
-
- if err != nil {
- code := http.StatusInternalServerError
- if err, ok := err.(*KeepError); ok {
- code = err.HTTPCode
- }
- http.Error(resp, err.Error(), code)
- return
- }
-
- // Success; add a size hint, sign the locator if possible, and
- // return it to the client.
- returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
- apiToken := GetAPIToken(req)
- if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
- expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
- returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
- }
- resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
- resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
- resp.Write([]byte(returnHash + "\n"))
-}
-
-// IndexHandler responds to "/index", "/index/{prefix}", and
-// "/mounts/{uuid}/blocks" requests.
-func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
- if !rtr.isSystemAuth(GetAPIToken(req)) {
- http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
- return
- }
-
- prefix := mux.Vars(req)["prefix"]
- if prefix == "" {
- req.ParseForm()
- prefix = req.Form.Get("prefix")
- }
-
- uuid := mux.Vars(req)["uuid"]
-
- var vols []*VolumeMount
- if uuid == "" {
- vols = rtr.volmgr.AllReadable()
- } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
- http.Error(resp, "mount not found", http.StatusNotFound)
- return
- } else {
- vols = []*VolumeMount{mnt}
- }
-
- for _, v := range vols {
- if err := v.IndexTo(prefix, resp); err != nil {
- // We can't send an error status/message to
- // the client because IndexTo() might have
- // already written body content. All we can do
- // is log the error in our own logs.
- //
- // The client must notice the lack of trailing
- // newline as an indication that the response
- // is incomplete.
- ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
- return
- }
- }
- // An empty line at EOF is the only way the client can be
- // assured the entire index was received.
- resp.Write([]byte{'\n'})
-}
-
-// MountsHandler responds to "GET /mounts" requests.
-func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
- err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
- if err != nil {
- httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
- }
-}
-
-// PoolStatus struct
-type PoolStatus struct {
- Alloc uint64 `json:"BytesAllocatedCumulative"`
- Cap int `json:"BuffersMax"`
- Len int `json:"BuffersInUse"`
-}
-
-type volumeStatusEnt struct {
- Label string
- Status *VolumeStatus `json:",omitempty"`
- VolumeStats *ioStats `json:",omitempty"`
- InternalStats interface{} `json:",omitempty"`
-}
-
-// NodeStatus struct
-type NodeStatus struct {
- Volumes []*volumeStatusEnt
- BufferPool PoolStatus
- PullQueue WorkQueueStatus
- TrashQueue WorkQueueStatus
- RequestsCurrent int
- RequestsMax int
- Version string
-}
-
-var st NodeStatus
-var stLock sync.Mutex
-
-// DebugHandler addresses /debug.json requests.
-func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
- type debugStats struct {
- MemStats runtime.MemStats
- }
- var ds debugStats
- runtime.ReadMemStats(&ds.MemStats)
- data, err := json.Marshal(&ds)
- if err != nil {
- http.Error(resp, err.Error(), http.StatusInternalServerError)
- return
- }
- resp.Write(data)
-}
-
-// StatusHandler addresses /status.json requests.
-func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
- stLock.Lock()
- rtr.readNodeStatus(&st)
- data, err := json.Marshal(&st)
- stLock.Unlock()
- if err != nil {
- http.Error(resp, err.Error(), http.StatusInternalServerError)
- return
- }
- resp.Write(data)
-}
-
-// populate the given NodeStatus struct with current values.
-func (rtr *router) readNodeStatus(st *NodeStatus) {
- st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
- vols := rtr.volmgr.AllReadable()
- if cap(st.Volumes) < len(vols) {
- st.Volumes = make([]*volumeStatusEnt, len(vols))
- }
- st.Volumes = st.Volumes[:0]
- for _, vol := range vols {
- var internalStats interface{}
- if vol, ok := vol.Volume.(InternalStatser); ok {
- internalStats = vol.InternalStats()
- }
- st.Volumes = append(st.Volumes, &volumeStatusEnt{
- Label: vol.String(),
- Status: vol.Status(),
- InternalStats: internalStats,
- //VolumeStats: rtr.volmgr.VolumeStats(vol),
- })
- }
- st.BufferPool.Alloc = bufs.Alloc()
- st.BufferPool.Cap = bufs.Cap()
- st.BufferPool.Len = bufs.Len()
- st.PullQueue = getWorkQueueStatus(rtr.pullq)
- st.TrashQueue = getWorkQueueStatus(rtr.trashq)
-}
-
-// return a WorkQueueStatus for the given queue. If q is nil (which
-// should never happen except in test suites), return a zero status
-// value instead of crashing.
-func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
- if q == nil {
- // This should only happen during tests.
- return WorkQueueStatus{}
- }
- return q.Status()
-}
-
-// handleDELETE processes DELETE requests.
-//
-// DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
-// from all connected volumes.
-//
-// Only the Data Manager, or an Arvados admin with scope "all", are
-// allowed to issue DELETE requests. If a DELETE request is not
-// authenticated or is issued by a non-admin user, the server returns
-// a PermissionError.
-//
-// Upon receiving a valid request from an authorized user,
-// handleDELETE deletes all copies of the specified block on local
-// writable volumes.
-//
-// Response format:
-//
-// If the requested blocks was not found on any volume, the response
-// code is HTTP 404 Not Found.
-//
-// Otherwise, the response code is 200 OK, with a response body
-// consisting of the JSON message
-//
-// {"copies_deleted":d,"copies_failed":f}
-//
-// where d and f are integers representing the number of blocks that
-// were successfully and unsuccessfully deleted.
-func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
- hash := mux.Vars(req)["hash"]
-
- // Confirm that this user is an admin and has a token with unlimited scope.
- var tok = GetAPIToken(req)
- if tok == "" || !rtr.canDelete(tok) {
- http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
- return
- }
-
- if !rtr.cluster.Collections.BlobTrash {
- http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
- return
- }
-
- // Delete copies of this block from all available volumes.
- // Report how many blocks were successfully deleted, and how
- // many were found on writable volumes but not deleted.
- var result struct {
- Deleted int `json:"copies_deleted"`
- Failed int `json:"copies_failed"`
- }
- for _, vol := range rtr.volmgr.Mounts() {
- if !vol.KeepMount.AllowTrash {
- continue
- } else if err := vol.Trash(hash); err == nil {
- result.Deleted++
- } else if os.IsNotExist(err) {
- continue
- } else {
- result.Failed++
- ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
- }
- }
- if result.Deleted == 0 && result.Failed == 0 {
- resp.WriteHeader(http.StatusNotFound)
- return
- }
- body, err := json.Marshal(result)
- if err != nil {
- http.Error(resp, err.Error(), http.StatusInternalServerError)
- return
- }
- resp.Write(body)
-}
-
-/* PullHandler processes "PUT /pull" requests for the data manager.
- The request body is a JSON message containing a list of pull
- requests in the following format:
-
- [
- {
- "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
- "servers":[
- "keep0.qr1hi.arvadosapi.com:25107",
- "keep1.qr1hi.arvadosapi.com:25108"
- ]
- },
- {
- "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
- "servers":[
- "10.0.1.5:25107",
- "10.0.1.6:25107",
- "10.0.1.7:25108"
- ]
- },
- ...
- ]
-
- Each pull request in the list consists of a block locator string
- and an ordered list of servers. Keepstore should try to fetch the
- block from each server in turn.
-
- If the request has not been sent by the Data Manager, return 401
- Unauthorized.
-
- If the JSON unmarshalling fails, return 400 Bad Request.
-*/
-
-// PullRequest consists of a block locator and an ordered list of servers
-type PullRequest struct {
- Locator string `json:"locator"`
- Servers []string `json:"servers"`
-
- // Destination mount, or "" for "anywhere"
- MountUUID string `json:"mount_uuid"`
-}
-
-// PullHandler processes "PUT /pull" requests for the data manager.
-func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
- // Reject unauthorized requests.
- if !rtr.isSystemAuth(GetAPIToken(req)) {
- http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
- return
- }
-
- // Parse the request body.
- var pr []PullRequest
- r := json.NewDecoder(req.Body)
- if err := r.Decode(&pr); err != nil {
- http.Error(resp, err.Error(), BadRequestError.HTTPCode)
- return
- }
-
- // We have a properly formatted pull list sent from the data
- // manager. Report success and send the list to the pull list
- // manager for further handling.
- resp.WriteHeader(http.StatusOK)
- resp.Write([]byte(
- fmt.Sprintf("Received %d pull requests\n", len(pr))))
-
- plist := list.New()
- for _, p := range pr {
- plist.PushBack(p)
- }
- rtr.pullq.ReplaceQueue(plist)
-}
-
-// TrashRequest consists of a block locator and its Mtime
-type TrashRequest struct {
- Locator string `json:"locator"`
- BlockMtime int64 `json:"block_mtime"`
-
- // Target mount, or "" for "everywhere"
- MountUUID string `json:"mount_uuid"`
-}
-
-// TrashHandler processes /trash requests.
-func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
- // Reject unauthorized requests.
- if !rtr.isSystemAuth(GetAPIToken(req)) {
- http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
- return
- }
-
- // Parse the request body.
- var trash []TrashRequest
- r := json.NewDecoder(req.Body)
- if err := r.Decode(&trash); err != nil {
- http.Error(resp, err.Error(), BadRequestError.HTTPCode)
- return
- }
-
- // We have a properly formatted trash list sent from the data
- // manager. Report success and send the list to the trash work
- // queue for further handling.
- resp.WriteHeader(http.StatusOK)
- resp.Write([]byte(
- fmt.Sprintf("Received %d trash requests\n", len(trash))))
-
- tlist := list.New()
- for _, t := range trash {
- tlist.PushBack(t)
- }
- rtr.trashq.ReplaceQueue(tlist)
-}
-
-// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
-func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
- // Reject unauthorized requests.
- if !rtr.isSystemAuth(GetAPIToken(req)) {
- http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
- return
- }
-
- log := ctxlog.FromContext(req.Context())
- hash := mux.Vars(req)["hash"]
-
- if len(rtr.volmgr.AllWritable()) == 0 {
- http.Error(resp, "No writable volumes", http.StatusNotFound)
- return
- }
-
- var untrashedOn, failedOn []string
- var numNotFound int
- for _, vol := range rtr.volmgr.AllWritable() {
- err := vol.Untrash(hash)
-
- if os.IsNotExist(err) {
- numNotFound++
- } else if err != nil {
- log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
- failedOn = append(failedOn, vol.String())
- } else {
- log.Infof("Untrashed %v on volume %v", hash, vol.String())
- untrashedOn = append(untrashedOn, vol.String())
- }
- }
-
- if numNotFound == len(rtr.volmgr.AllWritable()) {
- http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
- } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
- http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
- } else {
- respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
- if len(failedOn) > 0 {
- respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
- http.Error(resp, respBody, http.StatusInternalServerError)
- } else {
- fmt.Fprintln(resp, respBody)
- }
- }
-}
-
-// GetBlock and PutBlock implement lower-level code for handling
-// blocks by rooting through volumes connected to the local machine.
-// Once the handler has determined that system policy permits the
-// request, it calls these methods to perform the actual operation.
-//
-// TODO(twp): this code would probably be better located in the
-// VolumeManager interface. As an abstraction, the VolumeManager
-// should be the only part of the code that cares about which volume a
-// block is stored on, so it should be responsible for figuring out
-// which volume to check for fetching blocks, storing blocks, etc.
-
-// GetBlock fetches the block identified by "hash" into the provided
-// buf, and returns the data size.
-//
-// If the block cannot be found on any volume, returns NotFoundError.
-//
-// If the block found does not have the correct MD5 hash, returns
-// DiskHashError.
-func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
- log := ctxlog.FromContext(ctx)
-
- // Attempt to read the requested hash from a keep volume.
- errorToCaller := NotFoundError
-
- for _, vol := range volmgr.AllReadable() {
- size, err := vol.Get(ctx, hash, buf)
- select {
- case <-ctx.Done():
- return 0, ErrClientDisconnect
- default:
- }
- if err != nil {
- // IsNotExist is an expected error and may be
- // ignored. All other errors are logged. In
- // any case we continue trying to read other
- // volumes. If all volumes report IsNotExist,
- // we return a NotFoundError.
- if !os.IsNotExist(err) {
- log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
- }
- // If some volume returns a transient error, return it to the caller
- // instead of "Not found" so it can retry.
- if err == VolumeBusyError {
- errorToCaller = err.(*KeepError)
- }
- continue
- }
- // Check the file checksum.
- filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
- if filehash != hash {
- // TODO: Try harder to tell a sysadmin about
- // this.
- log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
- errorToCaller = DiskHashError
- continue
- }
- if errorToCaller == DiskHashError {
- log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
- }
- return size, nil
- }
- return 0, errorToCaller
-}
-
-type putProgress struct {
- classNeeded map[string]bool
- classTodo map[string]bool
- mountUsed map[*VolumeMount]bool
- totalReplication int
- classDone map[string]int
-}
-
-// Number of distinct replicas stored. "2" can mean the block was
-// stored on 2 different volumes with replication 1, or on 1 volume
-// with replication 2.
-func (pr putProgress) TotalReplication() string {
- return strconv.Itoa(pr.totalReplication)
-}
-
-// Number of replicas satisfying each storage class, formatted like
-// "default=2; special=1".
-func (pr putProgress) ClassReplication() string {
- s := ""
- for k, v := range pr.classDone {
- if len(s) > 0 {
- s += ", "
- }
- s += k + "=" + strconv.Itoa(v)
- }
- return s
-}
-
-func (pr *putProgress) Add(mnt *VolumeMount) {
- if pr.mountUsed[mnt] {
- logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
- return
- }
- pr.mountUsed[mnt] = true
- pr.totalReplication += mnt.Replication
- for class := range mnt.StorageClasses {
- pr.classDone[class] += mnt.Replication
- delete(pr.classTodo, class)
- }
-}
-
-func (pr *putProgress) Sub(mnt *VolumeMount) {
- if !pr.mountUsed[mnt] {
- logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
- return
- }
- pr.mountUsed[mnt] = false
- pr.totalReplication -= mnt.Replication
- for class := range mnt.StorageClasses {
- pr.classDone[class] -= mnt.Replication
- if pr.classNeeded[class] {
- pr.classTodo[class] = true
- }
- }
-}
-
-func (pr *putProgress) Done() bool {
- return len(pr.classTodo) == 0 && pr.totalReplication > 0
-}
-
-func (pr *putProgress) Want(mnt *VolumeMount) bool {
- if pr.Done() || pr.mountUsed[mnt] {
- return false
- }
- if len(pr.classTodo) == 0 {
- // none specified == "any"
- return true
- }
- for class := range mnt.StorageClasses {
- if pr.classTodo[class] {
- return true
- }
- }
- return false
-}
-
-func (pr *putProgress) Copy() *putProgress {
- cp := putProgress{
- classNeeded: pr.classNeeded,
- classTodo: make(map[string]bool, len(pr.classTodo)),
- classDone: make(map[string]int, len(pr.classDone)),
- mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)),
- totalReplication: pr.totalReplication,
- }
- for k, v := range pr.classTodo {
- cp.classTodo[k] = v
- }
- for k, v := range pr.classDone {
- cp.classDone[k] = v
- }
- for k, v := range pr.mountUsed {
- cp.mountUsed[k] = v
- }
- return &cp
-}
-
-func newPutProgress(classes []string) putProgress {
- pr := putProgress{
- classNeeded: make(map[string]bool, len(classes)),
- classTodo: make(map[string]bool, len(classes)),
- classDone: map[string]int{},
- mountUsed: map[*VolumeMount]bool{},
- }
- for _, c := range classes {
- if c != "" {
- pr.classNeeded[c] = true
- pr.classTodo[c] = true
- }
- }
- return pr
-}
-
-// PutBlock stores the given block on one or more volumes.
-//
-// The MD5 checksum of the block must match the given hash.
-//
-// The block is written to each writable volume (ordered by priority
-// and then UUID, see volume.go) until at least one replica has been
-// stored in each of the requested storage classes.
-//
-// The returned error, if any, is a KeepError with one of the
-// following codes:
-//
-// 500 Collision
-//
-// A different block with the same hash already exists on this
-// Keep server.
-//
-// 422 MD5Fail
-//
-// The MD5 hash of the BLOCK does not match the argument HASH.
-//
-// 503 Full
-//
-// There was not enough space left in any Keep volume to store
-// the object.
-//
-// 500 Fail
-//
-// The object could not be stored for some other reason (e.g.
-// all writes failed). The text of the error message should
-// provide as much detail as possible.
-func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
- log := ctxlog.FromContext(ctx)
-
- // Check that BLOCK's checksum matches HASH.
- blockhash := fmt.Sprintf("%x", md5.Sum(block))
- if blockhash != hash {
- log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return putProgress{}, RequestHashError
- }
-
- result := newPutProgress(wantStorageClasses)
-
- // If we already have this data, it's intact on disk, and we
- // can update its timestamp, return success. If we have
- // different data with the same hash, return failure.
- if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
- return result, err
- }
- if ctx.Err() != nil {
- return result, ErrClientDisconnect
- }
-
- writables := volmgr.NextWritable()
- if len(writables) == 0 {
- log.Error("no writable volumes")
- return result, FullError
- }
-
- var wg sync.WaitGroup
- var mtx sync.Mutex
- cond := sync.Cond{L: &mtx}
- // pending predicts what result will be if all pending writes
- // succeed.
- pending := result.Copy()
- var allFull atomic.Value
- allFull.Store(true)
-
- // We hold the lock for the duration of the "each volume" loop
- // below, except when it is released during cond.Wait().
- mtx.Lock()
-
- for _, mnt := range writables {
- // Wait until our decision to use this mount does not
- // depend on the outcome of pending writes.
- for result.Want(mnt) && !pending.Want(mnt) {
- cond.Wait()
- }
- if !result.Want(mnt) {
- continue
- }
- mnt := mnt
- pending.Add(mnt)
- wg.Add(1)
- go func() {
- log.Debugf("PutBlock: start write to %s", mnt.UUID)
- defer wg.Done()
- err := mnt.Put(ctx, hash, block)
-
- mtx.Lock()
- if err != nil {
- log.Debugf("PutBlock: write to %s failed", mnt.UUID)
- pending.Sub(mnt)
- } else {
- log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
- result.Add(mnt)
- }
- cond.Broadcast()
- mtx.Unlock()
-
- if err != nil && err != FullError && ctx.Err() == nil {
- // The volume is not full but the
- // write did not succeed. Report the
- // error and continue trying.
- allFull.Store(false)
- log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
- }
- }()
- }
- mtx.Unlock()
- wg.Wait()
- if ctx.Err() != nil {
- return result, ErrClientDisconnect
- }
- if result.Done() {
- return result, nil
- }
-
- if result.totalReplication > 0 {
- // Some, but not all, of the storage classes were
- // satisfied. This qualifies as success.
- return result, nil
- } else if allFull.Load().(bool) {
- log.Error("all volumes with qualifying storage classes are full")
- return putProgress{}, FullError
- } else {
- // Already logged the non-full errors.
- return putProgress{}, GenericError
- }
-}
-
-// CompareAndTouch looks for volumes where the given content already
-// exists and its modification time can be updated (i.e., it is
-// protected from garbage collection), and updates result accordingly.
-// It returns when the result is Done() or all volumes have been
-// checked.
-func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
- log := ctxlog.FromContext(ctx)
- for _, mnt := range volmgr.AllWritable() {
- if !result.Want(mnt) {
- continue
- }
- err := mnt.Compare(ctx, hash, buf)
- if ctx.Err() != nil {
- return nil
- } else if err == CollisionError {
- // Stop if we have a block with same hash but
- // different content. (It will be impossible
- // to tell which one is wanted if we have
- // both, so there's no point writing it even
- // on a different volume.)
- log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
- return CollisionError
- } else if os.IsNotExist(err) {
- // Block does not exist. This is the only
- // "normal" error: we don't log anything.
- continue
- } else if err != nil {
- // Couldn't open file, data is corrupt on
- // disk, etc.: log this abnormal condition,
- // and try the next volume.
- log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
- continue
- }
- if err := mnt.Touch(hash); err != nil {
- log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
- continue
- }
- // Compare and Touch both worked --> done.
- result.Add(mnt)
- if result.Done() {
- return nil
- }
- }
- return nil
-}
-
-var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
-
-// IsValidLocator returns true if the specified string is a valid Keep
-// locator. When Keep is extended to support hash types other than
-// MD5, this should be updated to cover those as well.
-func IsValidLocator(loc string) bool {
- return validLocatorRe.MatchString(loc)
-}
-
-var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
-
-// GetAPIToken returns the OAuth2 token from the Authorization
-// header of a HTTP request, or an empty string if no matching
-// token is found.
-func GetAPIToken(req *http.Request) string {
- if auth, ok := req.Header["Authorization"]; ok {
- if match := authRe.FindStringSubmatch(auth[0]); match != nil {
- return match[2]
- }
- }
- return ""
-}
-
-// canDelete returns true if the user identified by apiToken is
-// allowed to delete blocks.
-func (rtr *router) canDelete(apiToken string) bool {
- if apiToken == "" {
- return false
- }
- // Blocks may be deleted only when Keep has been configured with a
- // data manager.
- if rtr.isSystemAuth(apiToken) {
- return true
- }
- // TODO(twp): look up apiToken with the API server
- // return true if is_admin is true and if the token
- // has unlimited scope
- return false
-}
-
-// isSystemAuth returns true if the given token is allowed to perform
-// system level actions like deleting data.
-func (rtr *router) isSystemAuth(token string) bool {
- return token != "" && token == rtr.cluster.SystemRootToken
-}