//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"container/list"
"sync/atomic"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
}
func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
-
locator := req.URL.Path[1:]
if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
- rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+ rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
return
}
// isn't here, we can return 404 now instead of waiting for a
// buffer.
- buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+ buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
defer bufs.Put(buf)
- size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
+ size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
if err != nil {
code := http.StatusInternalServerError
if err, ok := err.(*KeepError); ok {
resp.Write(buf[:size])
}
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
- ctx, cancel := context.WithCancel(parent)
- if cn, ok := resp.(http.CloseNotifier); ok {
- go func(c <-chan bool) {
- select {
- case <-c:
- cancel()
- case <-ctx.Done():
- }
- }(cn.CloseNotify())
- }
- return ctx, cancel
-}
-
// Get a buffer from the pool -- but give up and return a non-nil
// error if ctx ends before we get a buffer.
func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
}
func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
-
hash := mux.Vars(req)["hash"]
// Detect as many error conditions as possible before reading
}
}
- buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+ buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
return
}
- result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
+ result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
bufs.Put(buf)
if err != nil {
// populate the given NodeStatus struct with current values.
func (rtr *router) readNodeStatus(st *NodeStatus) {
- st.Version = version
+ st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
vols := rtr.volmgr.AllReadable()
if cap(st.Volumes) < len(vols) {
st.Volumes = make([]*volumeStatusEnt, len(vols))
// Otherwise, the response code is 200 OK, with a response body
// consisting of the JSON message
//
-// {"copies_deleted":d,"copies_failed":f}
+// {"copies_deleted":d,"copies_failed":f}
//
// where d and f are integers representing the number of blocks that
// were successfully and unsuccessfully deleted.
-//
func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
//
// If the block found does not have the correct MD5 hash, returns
// DiskHashError.
-//
func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
log := ctxlog.FromContext(ctx)
if filehash != hash {
// TODO: Try harder to tell a sysadmin about
// this.
- log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
+ log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
errorToCaller = DiskHashError
continue
}
return pr
}
-// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
+// PutBlock stores the given block on one or more volumes.
//
-// PutBlock(ctx, block, hash)
-// Stores the BLOCK (identified by the content id HASH) in Keep.
+// The MD5 checksum of the block must match the given hash.
//
-// The MD5 checksum of the block must be identical to the content id HASH.
-// If not, an error is returned.
+// The block is written to each writable volume (ordered by priority
+// and then UUID, see volume.go) until at least one replica has been
+// stored in each of the requested storage classes.
//
-// PutBlock stores the BLOCK on the first Keep volume with free space.
-// A failure code is returned to the user only if all volumes fail.
+// The returned error, if any, is a KeepError with one of the
+// following codes:
//
-// On success, PutBlock returns nil.
-// On failure, it returns a KeepError with one of the following codes:
+// 500 Collision
//
-// 500 Collision
-// A different block with the same hash already exists on this
-// Keep server.
-// 422 MD5Fail
-// The MD5 hash of the BLOCK does not match the argument HASH.
-// 503 Full
-// There was not enough space left in any Keep volume to store
-// the object.
-// 500 Fail
-// The object could not be stored for some other reason (e.g.
-// all writes failed). The text of the error message should
-// provide as much detail as possible.
+// A different block with the same hash already exists on this
+// Keep server.
//
+// 422 MD5Fail
+//
+// The MD5 hash of the BLOCK does not match the argument HASH.
+//
+// 503 Full
+//
+// There was not enough space left in any Keep volume to store
+// the object.
+//
+// 500 Fail
+//
+// The object could not be stored for some other reason (e.g.
+// all writes failed). The text of the error message should
+// provide as much detail as possible.
func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
log := ctxlog.FromContext(ctx)
pending := result.Copy()
var allFull atomic.Value
allFull.Store(true)
+
+ // We hold the lock for the duration of the "each volume" loop
+ // below, except when it is released during cond.Wait().
mtx.Lock()
+
for _, mnt := range writables {
// Wait until our decision to use this mount does not
// depend on the outcome of pending writes.
// to tell which one is wanted if we have
// both, so there's no point writing it even
// on a different volume.)
- log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
+ log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
return CollisionError
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
-// IsValidLocator returns true if the specified string is a valid Keep locator.
-// When Keep is extended to support hash types other than MD5,
-// this should be updated to cover those as well.
-//
+// IsValidLocator returns true if the specified string is a valid Keep
+// locator. When Keep is extended to support hash types other than
+// MD5, this should be updated to cover those as well.
func IsValidLocator(loc string) bool {
return validLocatorRe.MatchString(loc)
}