return
}
- err = PutBlock(buf, hash)
+ replication, err := PutBlock(buf, hash)
bufs.Put(buf)
if err != nil {
returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
apiToken := GetApiToken(req)
if PermissionSecret != nil && apiToken != "" {
- expiry := time.Now().Add(blob_signature_ttl)
+ expiry := time.Now().Add(blobSignatureTTL)
returnHash = SignLocator(returnHash, apiToken, expiry)
}
+ resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
resp.Write([]byte(returnHash + "\n"))
}
return
}
- if never_delete {
+ if neverDelete {
http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
return
}
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(block []byte, hash string) error {
+func PutBlock(block []byte, hash string) (int, error) {
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return RequestHashError
+ return 0, RequestHashError
}
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
- return err
+ if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+ return n, err
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if vol := KeepVM.NextWritable(); vol != nil {
if err := vol.Put(hash, block); err == nil {
- return nil // success!
+ return vol.Replication(), nil // success!
}
}
writables := KeepVM.AllWritable()
if len(writables) == 0 {
log.Print("No writable volumes.")
- return FullError
+ return 0, FullError
}
allFull := true
for _, vol := range writables {
err := vol.Put(hash, block)
if err == nil {
- return nil // success!
+ return vol.Replication(), nil // success!
}
if err != FullError {
// The volume is not full but the
if allFull {
log.Print("All volumes are full.")
- return FullError
+ return 0, FullError
}
// Already logged the non-full errors.
- return GenericError
+ return 0, GenericError
}
-// CompareAndTouch returns nil if one of the volumes already has the
-// given content and it successfully updates the relevant block's
-// modification time in order to protect it from premature garbage
-// collection.
-func CompareAndTouch(hash string, buf []byte) error {
+// CompareAndTouch returns the current replication level if one of the
+// volumes already has the given content and it successfully updates
+// the relevant block's modification time in order to protect it from
+// premature garbage collection. Otherwise, it returns a non-nil
+// error.
+func CompareAndTouch(hash string, buf []byte) (int, error) {
var bestErr error = NotFoundError
for _, vol := range KeepVM.AllWritable() {
if err := vol.Compare(hash, buf); err == CollisionError {
// both, so there's no point writing it even
// on a different volume.)
log.Printf("%s: Compare(%s): %s", vol, hash, err)
- return err
+ return 0, err
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// "normal" error: we don't log anything.
continue
}
// Compare and Touch both worked --> done.
- return nil
+ return vol.Replication(), nil
}
- return bestErr
+ return 0, bestErr
}
var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
-// GetAPIToken returns the OAuth2 token from the Authorization
+// GetApiToken returns the OAuth2 token from the Authorization
// header of a HTTP request, or an empty string if no matching
// token is found.
func GetApiToken(req *http.Request) string {
// IsDataManagerToken returns true if apiToken represents the data
// manager's token.
func IsDataManagerToken(apiToken string) bool {
- return data_manager_token != "" && apiToken == data_manager_token
+ return dataManagerToken != "" && apiToken == dataManagerToken
}