//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"container/list"
}
func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
-
locator := req.URL.Path[1:]
if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
- rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+ rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
return
}
// isn't here, we can return 404 now instead of waiting for a
// buffer.
- buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+ buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
defer bufs.Put(buf)
- size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
+ size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
if err != nil {
code := http.StatusInternalServerError
if err, ok := err.(*KeepError); ok {
resp.Write(buf[:size])
}
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
- ctx, cancel := context.WithCancel(parent)
- if cn, ok := resp.(http.CloseNotifier); ok {
- go func(c <-chan bool) {
- select {
- case <-c:
- cancel()
- case <-ctx.Done():
- }
- }(cn.CloseNotify())
- }
- return ctx, cancel
-}
-
// Get a buffer from the pool -- but give up and return a non-nil
// error if ctx ends before we get a buffer.
func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
}
func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
-
hash := mux.Vars(req)["hash"]
// Detect as many error conditions as possible before reading
}
}
- buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+ buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
return
}
- result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
+ result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
bufs.Put(buf)
if err != nil {
return &cp
}
-func newPutResult(classes []string) putProgress {
+func newPutProgress(classes []string) putProgress {
pr := putProgress{
classNeeded: make(map[string]bool, len(classes)),
classTodo: make(map[string]bool, len(classes)),
return pr
}
-// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
-//
-// PutBlock(ctx, block, hash)
-// Stores the BLOCK (identified by the content id HASH) in Keep.
-//
-// The MD5 checksum of the block must be identical to the content id HASH.
-// If not, an error is returned.
+// PutBlock stores the given block on one or more volumes.
//
-// PutBlock stores the BLOCK on the first Keep volume with free space.
-// A failure code is returned to the user only if all volumes fail.
+// The MD5 checksum of the block must match the given hash.
//
-// On success, PutBlock returns nil.
-// On failure, it returns a KeepError with one of the following codes:
+// The block is written to each writable volume (ordered by priority
+// and then UUID, see volume.go) until at least one replica has been
+// stored in each of the requested storage classes.
//
-// 500 Collision
-// A different block with the same hash already exists on this
-// Keep server.
-// 422 MD5Fail
-// The MD5 hash of the BLOCK does not match the argument HASH.
-// 503 Full
-// There was not enough space left in any Keep volume to store
-// the object.
-// 500 Fail
-// The object could not be stored for some other reason (e.g.
-// all writes failed). The text of the error message should
-// provide as much detail as possible.
+// The returned error, if any, is a KeepError with one of the
+// following codes:
//
+// 500 Collision
+// A different block with the same hash already exists on this
+// Keep server.
+// 422 MD5Fail
+// The MD5 hash of the BLOCK does not match the argument HASH.
+// 503 Full
+// There was not enough space left in any Keep volume to store
+// the object.
+// 500 Fail
+// The object could not be stored for some other reason (e.g.
+// all writes failed). The text of the error message should
+// provide as much detail as possible.
func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
log := ctxlog.FromContext(ctx)
return putProgress{}, RequestHashError
}
- result := newPutResult(wantStorageClasses)
+ result := newPutProgress(wantStorageClasses)
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
pending := result.Copy()
var allFull atomic.Value
allFull.Store(true)
+
+ // We hold the lock for the duration of the "each volume" loop
+ // below, except when it is released during cond.Wait().
mtx.Lock()
+
for _, mnt := range writables {
// Wait until our decision to use this mount does not
// depend on the outcome of pending writes.