X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0ad0ce519da703b3f13c0ad8a74ea8235b7d90e5..47c7337cc9c33a19208946deceecc5ce266ae741:/services/keepstore/handlers.go?ds=sidebyside diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 0974549fdb..95af1b4870 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -120,7 +120,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) { return } - err = PutBlock(buf, hash) + replication, err := PutBlock(buf, hash) bufs.Put(buf) if err != nil { @@ -134,9 +134,10 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) { returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength) apiToken := GetApiToken(req) if PermissionSecret != nil && apiToken != "" { - expiry := time.Now().Add(blob_signature_ttl) + expiry := time.Now().Add(blobSignatureTTL) returnHash = SignLocator(returnHash, apiToken, expiry) } + resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication)) resp.Write([]byte(returnHash + "\n")) } @@ -281,7 +282,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) { return } - if never_delete { + if neverDelete { http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode) return } @@ -517,40 +518,40 @@ func GetBlock(hash string) ([]byte, error) { // all writes failed). The text of the error message should // provide as much detail as possible. // -func PutBlock(block []byte, hash string) error { +func PutBlock(block []byte, hash string) (int, error) { // Check that BLOCK's checksum matches HASH. blockhash := fmt.Sprintf("%x", md5.Sum(block)) if blockhash != hash { log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash) - return RequestHashError + return 0, RequestHashError } // If we already have this data, it's intact on disk, and we // can update its timestamp, return success. If we have // different data with the same hash, return failure. - if err := CompareAndTouch(hash, block); err == nil || err == CollisionError { - return err + if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError { + return n, err } // Choose a Keep volume to write to. // If this volume fails, try all of the volumes in order. if vol := KeepVM.NextWritable(); vol != nil { if err := vol.Put(hash, block); err == nil { - return nil // success! + return vol.Replication(), nil // success! } } writables := KeepVM.AllWritable() if len(writables) == 0 { log.Print("No writable volumes.") - return FullError + return 0, FullError } allFull := true for _, vol := range writables { err := vol.Put(hash, block) if err == nil { - return nil // success! + return vol.Replication(), nil // success! } if err != FullError { // The volume is not full but the @@ -563,17 +564,18 @@ func PutBlock(block []byte, hash string) error { if allFull { log.Print("All volumes are full.") - return FullError + return 0, FullError } // Already logged the non-full errors. - return GenericError + return 0, GenericError } -// CompareAndTouch returns nil if one of the volumes already has the -// given content and it successfully updates the relevant block's -// modification time in order to protect it from premature garbage -// collection. -func CompareAndTouch(hash string, buf []byte) error { +// CompareAndTouch returns the current replication level if one of the +// volumes already has the given content and it successfully updates +// the relevant block's modification time in order to protect it from +// premature garbage collection. Otherwise, it returns a non-nil +// error. +func CompareAndTouch(hash string, buf []byte) (int, error) { var bestErr error = NotFoundError for _, vol := range KeepVM.AllWritable() { if err := vol.Compare(hash, buf); err == CollisionError { @@ -583,7 +585,7 @@ func CompareAndTouch(hash string, buf []byte) error { // both, so there's no point writing it even // on a different volume.) log.Printf("%s: Compare(%s): %s", vol, hash, err) - return err + return 0, err } else if os.IsNotExist(err) { // Block does not exist. This is the only // "normal" error: we don't log anything. @@ -601,9 +603,9 @@ func CompareAndTouch(hash string, buf []byte) error { continue } // Compare and Touch both worked --> done. - return nil + return vol.Replication(), nil } - return bestErr + return 0, bestErr } var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`) @@ -618,7 +620,7 @@ func IsValidLocator(loc string) bool { var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`) -// GetAPIToken returns the OAuth2 token from the Authorization +// GetApiToken returns the OAuth2 token from the Authorization // header of a HTTP request, or an empty string if no matching // token is found. func GetApiToken(req *http.Request) string { @@ -662,5 +664,5 @@ func CanDelete(apiToken string) bool { // IsDataManagerToken returns true if apiToken represents the data // manager's token. func IsDataManagerToken(apiToken string) bool { - return data_manager_token != "" && apiToken == data_manager_token + return dataManagerToken != "" && apiToken == dataManagerToken }