//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"container/list"
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/health"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/health"
+ "git.arvados.org/arvados.git/sdk/go/httpserver"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
type router struct {
*mux.Router
- limiter httpserver.RequestCounter
cluster *arvados.Cluster
+ logger logrus.FieldLogger
remoteProxy remoteProxy
metrics *nodeMetrics
+ volmgr *RRVolumeManager
+ pullq *WorkQueue
+ trashq *WorkQueue
}
// MakeRESTRouter returns a new router that forwards all Keep requests
// to the appropriate handlers.
-func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
+func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
rtr := &router{
Router: mux.NewRouter(),
cluster: cluster,
+ logger: ctxlog.FromContext(ctx),
metrics: &nodeMetrics{reg: reg},
+ volmgr: volmgr,
+ pullq: pullq,
+ trashq: trashq,
}
rtr.HandleFunc(
rtr.handleGET).Methods("GET", "HEAD")
rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
- rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
+ rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
// List all blocks stored here. Privileged client only.
- rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
+ rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
// List blocks stored here whose hash has the given prefix.
// Privileged client only.
- rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
+ rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
+ // Update timestamp on existing block. Privileged client only.
+ rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
// Internals/debugging info (runtime.MemStats)
rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
// List mounts: UUID, readonly, tier, device ID, ...
rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
- rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
- rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
+ rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
+ rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
// Replace the current pull queue.
- rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+ rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
// Replace the current trash queue.
- rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+ rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
// Untrash moves blocks from trash back into store
- rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+ rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
rtr.Handle("/_health/{check}", &health.Handler{
- Token: theConfig.ManagementToken,
+ Token: cluster.ManagementToken,
Prefix: "/_health/",
}).Methods("GET")
// 400 Bad Request.
rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
- rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
rtr.metrics.setupBufferPoolMetrics(bufs)
- rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
- rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
- rtr.metrics.setupRequestMetrics(rtr.limiter)
+ rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
+ rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
- instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
- httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
- return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
+ return rtr
}
// BadRequestHandler is a HandleFunc to address bad requests.
}
func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
-
locator := req.URL.Path[1:]
if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
- rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
+ rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
return
}
- if theConfig.RequireSignatures {
+ if rtr.cluster.Collections.BlobSigning {
locator := req.URL.Path[1:] // strip leading slash
- if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
+ if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
return
}
// isn't here, we can return 404 now instead of waiting for a
// buffer.
- buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+ buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
defer bufs.Put(buf)
- size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
+ size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
if err != nil {
code := http.StatusInternalServerError
if err, ok := err.(*KeepError); ok {
resp.Write(buf[:size])
}
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
- ctx, cancel := context.WithCancel(parent)
- if cn, ok := resp.(http.CloseNotifier); ok {
- go func(c <-chan bool) {
- select {
- case <-c:
- theConfig.debugLogf("cancel context")
- cancel()
- case <-ctx.Done():
- }
- }(cn.CloseNotify())
- }
- return ctx, cancel
-}
-
// Get a buffer from the pool -- but give up and return a non-nil
// error if ctx ends before we get a buffer.
func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
}
}
-func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
+func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+ hash := mux.Vars(req)["hash"]
+ vols := rtr.volmgr.AllWritable()
+ if len(vols) == 0 {
+ http.Error(resp, "no volumes", http.StatusNotFound)
+ return
+ }
+ var err error
+ for _, mnt := range vols {
+ err = mnt.Touch(hash)
+ if err == nil {
+ break
+ }
+ }
+ switch {
+ case err == nil:
+ return
+ case os.IsNotExist(err):
+ http.Error(resp, err.Error(), http.StatusNotFound)
+ default:
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ }
+}
+func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
// Detect as many error conditions as possible before reading
return
}
- if len(KeepVM.AllWritable()) == 0 {
+ if len(rtr.volmgr.AllWritable()) == 0 {
http.Error(resp, FullError.Error(), FullError.HTTPCode)
return
}
- buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+ var wantStorageClasses []string
+ if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
+ wantStorageClasses = strings.Split(hdr, ",")
+ for i, sc := range wantStorageClasses {
+ wantStorageClasses[i] = strings.TrimSpace(sc)
+ }
+ } else {
+ // none specified -- use configured default
+ for class, cfg := range rtr.cluster.StorageClasses {
+ if cfg.Default {
+ wantStorageClasses = append(wantStorageClasses, class)
+ }
+ }
+ }
+
+ buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
return
}
- replication, err := PutBlock(ctx, buf, hash)
+ result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
bufs.Put(buf)
if err != nil {
// return it to the client.
returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
apiToken := GetAPIToken(req)
- if theConfig.blobSigningKey != nil && apiToken != "" {
- expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
- returnHash = SignLocator(returnHash, apiToken, expiry)
+ if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
+ expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
+ returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
}
- resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
+ resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
+ resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
resp.Write([]byte(returnHash + "\n"))
}
// IndexHandler responds to "/index", "/index/{prefix}", and
// "/mounts/{uuid}/blocks" requests.
-func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
- if !IsSystemAuth(GetAPIToken(req)) {
+func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
uuid := mux.Vars(req)["uuid"]
- var vols []Volume
+ var vols []*VolumeMount
if uuid == "" {
- vols = KeepVM.AllReadable()
- } else if v := KeepVM.Lookup(uuid, false); v == nil {
+ vols = rtr.volmgr.AllReadable()
+ } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
http.Error(resp, "mount not found", http.StatusNotFound)
return
} else {
- vols = []Volume{v}
+ vols = []*VolumeMount{mnt}
}
for _, v := range vols {
if err := v.IndexTo(prefix, resp); err != nil {
- // The only errors returned by IndexTo are
- // write errors returned by resp.Write(),
- // which probably means the client has
- // disconnected and this error will never be
- // reported to the client -- but it will
- // appear in our own error log.
- http.Error(resp, err.Error(), http.StatusInternalServerError)
+ // We can't send an error status/message to
+ // the client because IndexTo() might have
+ // already written body content. All we can do
+ // is log the error in our own logs.
+ //
+ // The client must notice the lack of trailing
+ // newline as an indication that the response
+ // is incomplete.
+ ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
return
}
}
// MountsHandler responds to "GET /mounts" requests.
func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
- err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
+ err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
if err != nil {
- http.Error(resp, err.Error(), http.StatusInternalServerError)
+ httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
}
}
}
var ds debugStats
runtime.ReadMemStats(&ds.MemStats)
- err := json.NewEncoder(resp).Encode(&ds)
+ data, err := json.Marshal(&ds)
if err != nil {
- http.Error(resp, err.Error(), 500)
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(data)
}
// StatusHandler addresses /status.json requests.
func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
stLock.Lock()
rtr.readNodeStatus(&st)
- jstat, err := json.Marshal(&st)
+ data, err := json.Marshal(&st)
stLock.Unlock()
- if err == nil {
- resp.Write(jstat)
- } else {
- log.Printf("json.Marshal: %s", err)
- log.Printf("NodeStatus = %v", &st)
- http.Error(resp, err.Error(), 500)
+ if err != nil {
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(data)
}
// populate the given NodeStatus struct with current values.
func (rtr *router) readNodeStatus(st *NodeStatus) {
- st.Version = version
- vols := KeepVM.AllReadable()
+ st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
+ vols := rtr.volmgr.AllReadable()
if cap(st.Volumes) < len(vols) {
st.Volumes = make([]*volumeStatusEnt, len(vols))
}
st.Volumes = st.Volumes[:0]
for _, vol := range vols {
var internalStats interface{}
- if vol, ok := vol.(InternalStatser); ok {
+ if vol, ok := vol.Volume.(InternalStatser); ok {
internalStats = vol.InternalStats()
}
st.Volumes = append(st.Volumes, &volumeStatusEnt{
Label: vol.String(),
Status: vol.Status(),
InternalStats: internalStats,
- //VolumeStats: KeepVM.VolumeStats(vol),
+ //VolumeStats: rtr.volmgr.VolumeStats(vol),
})
}
st.BufferPool.Alloc = bufs.Alloc()
st.BufferPool.Cap = bufs.Cap()
st.BufferPool.Len = bufs.Len()
- st.PullQueue = getWorkQueueStatus(pullq)
- st.TrashQueue = getWorkQueueStatus(trashq)
- if rtr.limiter != nil {
- st.RequestsCurrent = rtr.limiter.Current()
- st.RequestsMax = rtr.limiter.Max()
- }
+ st.PullQueue = getWorkQueueStatus(rtr.pullq)
+ st.TrashQueue = getWorkQueueStatus(rtr.trashq)
}
// return a WorkQueueStatus for the given queue. If q is nil (which
return q.Status()
}
-// DeleteHandler processes DELETE requests.
+// handleDELETE processes DELETE requests.
//
// DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
// from all connected volumes.
// a PermissionError.
//
// Upon receiving a valid request from an authorized user,
-// DeleteHandler deletes all copies of the specified block on local
+// handleDELETE deletes all copies of the specified block on local
// writable volumes.
//
// Response format:
// Otherwise, the response code is 200 OK, with a response body
// consisting of the JSON message
//
-// {"copies_deleted":d,"copies_failed":f}
+// {"copies_deleted":d,"copies_failed":f}
//
// where d and f are integers representing the number of blocks that
// were successfully and unsuccessfully deleted.
-//
-func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
// Confirm that this user is an admin and has a token with unlimited scope.
var tok = GetAPIToken(req)
- if tok == "" || !CanDelete(tok) {
+ if tok == "" || !rtr.canDelete(tok) {
http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
return
}
- if !theConfig.EnableDelete {
+ if !rtr.cluster.Collections.BlobTrash {
http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
return
}
Deleted int `json:"copies_deleted"`
Failed int `json:"copies_failed"`
}
- for _, vol := range KeepVM.AllWritable() {
+ for _, vol := range rtr.volmgr.AllWritable() {
if err := vol.Trash(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
continue
} else {
result.Failed++
- log.Println("DeleteHandler:", err)
+ ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
}
}
-
- var st int
-
if result.Deleted == 0 && result.Failed == 0 {
- st = http.StatusNotFound
- } else {
- st = http.StatusOK
+ resp.WriteHeader(http.StatusNotFound)
+ return
}
-
- resp.WriteHeader(st)
-
- if st == http.StatusOK {
- if body, err := json.Marshal(result); err == nil {
- resp.Write(body)
- } else {
- log.Printf("json.Marshal: %s (result = %v)", err, result)
- http.Error(resp, err.Error(), 500)
- }
+ body, err := json.Marshal(result)
+ if err != nil {
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(body)
}
/* PullHandler processes "PUT /pull" requests for the data manager.
}
// PullHandler processes "PUT /pull" requests for the data manager.
-func PullHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsSystemAuth(GetAPIToken(req)) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
for _, p := range pr {
plist.PushBack(p)
}
- pullq.ReplaceQueue(plist)
+ rtr.pullq.ReplaceQueue(plist)
}
// TrashRequest consists of a block locator and its Mtime
}
// TrashHandler processes /trash requests.
-func TrashHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsSystemAuth(GetAPIToken(req)) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
for _, t := range trash {
tlist.PushBack(t)
}
- trashq.ReplaceQueue(tlist)
+ rtr.trashq.ReplaceQueue(tlist)
}
// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
-func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsSystemAuth(GetAPIToken(req)) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
+ log := ctxlog.FromContext(req.Context())
hash := mux.Vars(req)["hash"]
- if len(KeepVM.AllWritable()) == 0 {
+ if len(rtr.volmgr.AllWritable()) == 0 {
http.Error(resp, "No writable volumes", http.StatusNotFound)
return
}
var untrashedOn, failedOn []string
var numNotFound int
- for _, vol := range KeepVM.AllWritable() {
+ for _, vol := range rtr.volmgr.AllWritable() {
err := vol.Untrash(hash)
if os.IsNotExist(err) {
numNotFound++
} else if err != nil {
- log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+ log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
failedOn = append(failedOn, vol.String())
} else {
- log.Printf("Untrashed %v on volume %v", hash, vol.String())
+ log.Infof("Untrashed %v on volume %v", hash, vol.String())
untrashedOn = append(untrashedOn, vol.String())
}
}
- if numNotFound == len(KeepVM.AllWritable()) {
+ if numNotFound == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
- return
- }
-
- if len(failedOn) == len(KeepVM.AllWritable()) {
+ } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
} else {
- respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+ respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
if len(failedOn) > 0 {
- respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+ respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
+ http.Error(resp, respBody, http.StatusInternalServerError)
+ } else {
+ fmt.Fprintln(resp, respBody)
}
- resp.Write([]byte(respBody))
}
}
//
// If the block found does not have the correct MD5 hash, returns
// DiskHashError.
-//
-func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+ log := ctxlog.FromContext(ctx)
+
// Attempt to read the requested hash from a keep volume.
errorToCaller := NotFoundError
- for _, vol := range KeepVM.AllReadable() {
+ for _, vol := range volmgr.AllReadable() {
size, err := vol.Get(ctx, hash, buf)
select {
case <-ctx.Done():
// volumes. If all volumes report IsNotExist,
// we return a NotFoundError.
if !os.IsNotExist(err) {
- log.Printf("%s: Get(%s): %s", vol, hash, err)
+ log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
}
// If some volume returns a transient error, return it to the caller
// instead of "Not found" so it can retry.
continue
}
// Check the file checksum.
- //
filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
if filehash != hash {
// TODO: Try harder to tell a sysadmin about
// this.
- log.Printf("%s: checksum mismatch for request %s (actual %s)",
- vol, hash, filehash)
+ log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
errorToCaller = DiskHashError
continue
}
if errorToCaller == DiskHashError {
- log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
- vol, hash)
+ log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
}
return size, nil
}
return 0, errorToCaller
}
-// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
+type putProgress struct {
+ classNeeded map[string]bool
+ classTodo map[string]bool
+ mountUsed map[*VolumeMount]bool
+ totalReplication int
+ classDone map[string]int
+}
+
+// Number of distinct replicas stored. "2" can mean the block was
+// stored on 2 different volumes with replication 1, or on 1 volume
+// with replication 2.
+func (pr putProgress) TotalReplication() string {
+ return strconv.Itoa(pr.totalReplication)
+}
+
+// Number of replicas satisfying each storage class, formatted like
+// "default=2; special=1".
+func (pr putProgress) ClassReplication() string {
+ s := ""
+ for k, v := range pr.classDone {
+ if len(s) > 0 {
+ s += ", "
+ }
+ s += k + "=" + strconv.Itoa(v)
+ }
+ return s
+}
+
+func (pr *putProgress) Add(mnt *VolumeMount) {
+ if pr.mountUsed[mnt] {
+ logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
+ return
+ }
+ pr.mountUsed[mnt] = true
+ pr.totalReplication += mnt.Replication
+ for class := range mnt.StorageClasses {
+ pr.classDone[class] += mnt.Replication
+ delete(pr.classTodo, class)
+ }
+}
+
+func (pr *putProgress) Sub(mnt *VolumeMount) {
+ if !pr.mountUsed[mnt] {
+ logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
+ return
+ }
+ pr.mountUsed[mnt] = false
+ pr.totalReplication -= mnt.Replication
+ for class := range mnt.StorageClasses {
+ pr.classDone[class] -= mnt.Replication
+ if pr.classNeeded[class] {
+ pr.classTodo[class] = true
+ }
+ }
+}
+
+func (pr *putProgress) Done() bool {
+ return len(pr.classTodo) == 0 && pr.totalReplication > 0
+}
+
+func (pr *putProgress) Want(mnt *VolumeMount) bool {
+ if pr.Done() || pr.mountUsed[mnt] {
+ return false
+ }
+ if len(pr.classTodo) == 0 {
+ // none specified == "any"
+ return true
+ }
+ for class := range mnt.StorageClasses {
+ if pr.classTodo[class] {
+ return true
+ }
+ }
+ return false
+}
+
+func (pr *putProgress) Copy() *putProgress {
+ cp := putProgress{
+ classNeeded: pr.classNeeded,
+ classTodo: make(map[string]bool, len(pr.classTodo)),
+ classDone: make(map[string]int, len(pr.classDone)),
+ mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)),
+ totalReplication: pr.totalReplication,
+ }
+ for k, v := range pr.classTodo {
+ cp.classTodo[k] = v
+ }
+ for k, v := range pr.classDone {
+ cp.classDone[k] = v
+ }
+ for k, v := range pr.mountUsed {
+ cp.mountUsed[k] = v
+ }
+ return &cp
+}
+
+func newPutProgress(classes []string) putProgress {
+ pr := putProgress{
+ classNeeded: make(map[string]bool, len(classes)),
+ classTodo: make(map[string]bool, len(classes)),
+ classDone: map[string]int{},
+ mountUsed: map[*VolumeMount]bool{},
+ }
+ for _, c := range classes {
+ if c != "" {
+ pr.classNeeded[c] = true
+ pr.classTodo[c] = true
+ }
+ }
+ return pr
+}
+
+// PutBlock stores the given block on one or more volumes.
+//
+// The MD5 checksum of the block must match the given hash.
//
-// PutBlock(ctx, block, hash)
-// Stores the BLOCK (identified by the content id HASH) in Keep.
+// The block is written to each writable volume (ordered by priority
+// and then UUID, see volume.go) until at least one replica has been
+// stored in each of the requested storage classes.
//
-// The MD5 checksum of the block must be identical to the content id HASH.
-// If not, an error is returned.
+// The returned error, if any, is a KeepError with one of the
+// following codes:
//
-// PutBlock stores the BLOCK on the first Keep volume with free space.
-// A failure code is returned to the user only if all volumes fail.
+// 500 Collision
//
-// On success, PutBlock returns nil.
-// On failure, it returns a KeepError with one of the following codes:
+// A different block with the same hash already exists on this
+// Keep server.
//
-// 500 Collision
-// A different block with the same hash already exists on this
-// Keep server.
-// 422 MD5Fail
-// The MD5 hash of the BLOCK does not match the argument HASH.
-// 503 Full
-// There was not enough space left in any Keep volume to store
-// the object.
-// 500 Fail
-// The object could not be stored for some other reason (e.g.
-// all writes failed). The text of the error message should
-// provide as much detail as possible.
+// 422 MD5Fail
//
-func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
+// The MD5 hash of the BLOCK does not match the argument HASH.
+//
+// 503 Full
+//
+// There was not enough space left in any Keep volume to store
+// the object.
+//
+// 500 Fail
+//
+// The object could not be stored for some other reason (e.g.
+// all writes failed). The text of the error message should
+// provide as much detail as possible.
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
+ log := ctxlog.FromContext(ctx)
+
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return 0, RequestHashError
+ return putProgress{}, RequestHashError
}
+ result := newPutProgress(wantStorageClasses)
+
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
- return n, err
- } else if ctx.Err() != nil {
- return 0, ErrClientDisconnect
+ if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
+ return result, err
}
-
- // Choose a Keep volume to write to.
- // If this volume fails, try all of the volumes in order.
- if vol := KeepVM.NextWritable(); vol != nil {
- if err := vol.Put(ctx, hash, block); err == nil {
- return vol.Replication(), nil // success!
- }
- if ctx.Err() != nil {
- return 0, ErrClientDisconnect
- }
+ if ctx.Err() != nil {
+ return result, ErrClientDisconnect
}
- writables := KeepVM.AllWritable()
+ writables := volmgr.NextWritable()
if len(writables) == 0 {
- log.Print("No writable volumes.")
- return 0, FullError
+ log.Error("no writable volumes")
+ return result, FullError
}
- allFull := true
- for _, vol := range writables {
- err := vol.Put(ctx, hash, block)
- if ctx.Err() != nil {
- return 0, ErrClientDisconnect
- }
- if err == nil {
- return vol.Replication(), nil // success!
+ var wg sync.WaitGroup
+ var mtx sync.Mutex
+ cond := sync.Cond{L: &mtx}
+ // pending predicts what result will be if all pending writes
+ // succeed.
+ pending := result.Copy()
+ var allFull atomic.Value
+ allFull.Store(true)
+
+ // We hold the lock for the duration of the "each volume" loop
+ // below, except when it is released during cond.Wait().
+ mtx.Lock()
+
+ for _, mnt := range writables {
+ // Wait until our decision to use this mount does not
+ // depend on the outcome of pending writes.
+ for result.Want(mnt) && !pending.Want(mnt) {
+ cond.Wait()
}
- if err != FullError {
- // The volume is not full but the
- // write did not succeed. Report the
- // error and continue trying.
- allFull = false
- log.Printf("%s: Write(%s): %s", vol, hash, err)
+ if !result.Want(mnt) {
+ continue
}
+ mnt := mnt
+ pending.Add(mnt)
+ wg.Add(1)
+ go func() {
+ log.Debugf("PutBlock: start write to %s", mnt.UUID)
+ defer wg.Done()
+ err := mnt.Put(ctx, hash, block)
+
+ mtx.Lock()
+ if err != nil {
+ log.Debugf("PutBlock: write to %s failed", mnt.UUID)
+ pending.Sub(mnt)
+ } else {
+ log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
+ result.Add(mnt)
+ }
+ cond.Broadcast()
+ mtx.Unlock()
+
+ if err != nil && err != FullError && ctx.Err() == nil {
+ // The volume is not full but the
+ // write did not succeed. Report the
+ // error and continue trying.
+ allFull.Store(false)
+ log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+ }
+ }()
+ }
+ mtx.Unlock()
+ wg.Wait()
+ if ctx.Err() != nil {
+ return result, ErrClientDisconnect
+ }
+ if result.Done() {
+ return result, nil
}
- if allFull {
- log.Print("All volumes are full.")
- return 0, FullError
+ if result.totalReplication > 0 {
+ // Some, but not all, of the storage classes were
+ // satisfied. This qualifies as success.
+ return result, nil
+ } else if allFull.Load().(bool) {
+ log.Error("all volumes with qualifying storage classes are full")
+ return putProgress{}, FullError
+ } else {
+ // Already logged the non-full errors.
+ return putProgress{}, GenericError
}
- // Already logged the non-full errors.
- return 0, GenericError
}
-// CompareAndTouch returns the current replication level if one of the
-// volumes already has the given content and it successfully updates
-// the relevant block's modification time in order to protect it from
-// premature garbage collection. Otherwise, it returns a non-nil
-// error.
-func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
- var bestErr error = NotFoundError
- for _, vol := range KeepVM.AllWritable() {
- err := vol.Compare(ctx, hash, buf)
+// CompareAndTouch looks for volumes where the given content already
+// exists and its modification time can be updated (i.e., it is
+// protected from garbage collection), and updates result accordingly.
+// It returns when the result is Done() or all volumes have been
+// checked.
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
+ log := ctxlog.FromContext(ctx)
+ for _, mnt := range volmgr.AllWritable() {
+ if !result.Want(mnt) {
+ continue
+ }
+ err := mnt.Compare(ctx, hash, buf)
if ctx.Err() != nil {
- return 0, ctx.Err()
+ return nil
} else if err == CollisionError {
// Stop if we have a block with same hash but
// different content. (It will be impossible
// to tell which one is wanted if we have
// both, so there's no point writing it even
// on a different volume.)
- log.Printf("%s: Compare(%s): %s", vol, hash, err)
- return 0, err
+ log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
+ return CollisionError
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// "normal" error: we don't log anything.
// Couldn't open file, data is corrupt on
// disk, etc.: log this abnormal condition,
// and try the next volume.
- log.Printf("%s: Compare(%s): %s", vol, hash, err)
+ log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
continue
}
- if err := vol.Touch(hash); err != nil {
- log.Printf("%s: Touch %s failed: %s", vol, hash, err)
- bestErr = err
+ if err := mnt.Touch(hash); err != nil {
+ log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
continue
}
// Compare and Touch both worked --> done.
- return vol.Replication(), nil
+ result.Add(mnt)
+ if result.Done() {
+ return nil
+ }
}
- return 0, bestErr
+ return nil
}
var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
-// IsValidLocator returns true if the specified string is a valid Keep locator.
-// When Keep is extended to support hash types other than MD5,
-// this should be updated to cover those as well.
-//
+// IsValidLocator returns true if the specified string is a valid Keep
+// locator. When Keep is extended to support hash types other than
+// MD5, this should be updated to cover those as well.
func IsValidLocator(loc string) bool {
return validLocatorRe.MatchString(loc)
}
return ""
}
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestampHex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestampHex string) bool {
- ts, err := strconv.ParseInt(timestampHex, 16, 0)
- if err != nil {
- log.Printf("IsExpired: %s", err)
- return true
- }
- return time.Unix(ts, 0).Before(time.Now())
-}
-
-// CanDelete returns true if the user identified by apiToken is
+// canDelete returns true if the user identified by apiToken is
// allowed to delete blocks.
-func CanDelete(apiToken string) bool {
+func (rtr *router) canDelete(apiToken string) bool {
if apiToken == "" {
return false
}
// Blocks may be deleted only when Keep has been configured with a
// data manager.
- if IsSystemAuth(apiToken) {
+ if rtr.isSystemAuth(apiToken) {
return true
}
// TODO(twp): look up apiToken with the API server
return false
}
-// IsSystemAuth returns true if the given token is allowed to perform
+// isSystemAuth returns true if the given token is allowed to perform
// system level actions like deleting data.
-func IsSystemAuth(token string) bool {
- return token != "" && token == theConfig.systemAuthToken
+func (rtr *router) isSystemAuth(token string) bool {
+ return token != "" && token == rtr.cluster.SystemRootToken
}