X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c0f9c128aabb366435d751a3ea1a63b76c177f5b..9a34f5ed292fb8d2f262e4c23e758cd689d81db7:/services/keepstore/handlers.go diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index a86bb6a5b5..e6129a7376 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -8,7 +8,6 @@ package main // StatusHandler (GET /status.json) import ( - "bytes" "container/list" "crypto/md5" "encoding/json" @@ -74,7 +73,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) { } } - block, err := GetBlock(mux.Vars(req)["hash"], false) + block, err := GetBlock(mux.Vars(req)["hash"]) if err != nil { // This type assertion is safe because the only errors // GetBlock can return are DiskHashError or NotFoundError. @@ -211,8 +210,8 @@ func StatusHandler(resp http.ResponseWriter, req *http.Request) { if err == nil { resp.Write(jstat) } else { - log.Printf("json.Marshal: %s\n", err) - log.Printf("NodeStatus = %v\n", &st) + log.Printf("json.Marshal: %s", err) + log.Printf("NodeStatus = %v", &st) http.Error(resp, err.Error(), 500) } } @@ -322,7 +321,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) { if body, err := json.Marshal(result); err == nil { resp.Write(body) } else { - log.Printf("json.Marshal: %s (result = %v)\n", err, result) + log.Printf("json.Marshal: %s (result = %v)", err, result) http.Error(resp, err.Error(), 500) } } @@ -442,10 +441,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) { // which volume to check for fetching blocks, storing blocks, etc. // ============================== -// GetBlock fetches and returns the block identified by "hash". If -// the update_timestamp argument is true, GetBlock also updates the -// block's file modification time (for the sake of PutBlock, which -// must update the file's timestamp when the block already exists). +// GetBlock fetches and returns the block identified by "hash". // // On success, GetBlock returns a byte slice with the block data, and // a nil error. @@ -456,22 +452,11 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) { // DiskHashError. // -func GetBlock(hash string, update_timestamp bool) ([]byte, error) { +func GetBlock(hash string) ([]byte, error) { // Attempt to read the requested hash from a keep volume. error_to_caller := NotFoundError - var vols []Volume - if update_timestamp { - // Pointless to find the block on an unwritable volume - // because Touch() will fail -- this is as good as - // "not found" for purposes of callers who need to - // update_timestamp. - vols = KeepVM.AllWritable() - } else { - vols = KeepVM.AllReadable() - } - - for _, vol := range vols { + for _, vol := range KeepVM.AllReadable() { buf, err := vol.Get(hash) if err != nil { // IsNotExist is an expected error and may be @@ -480,7 +465,7 @@ func GetBlock(hash string, update_timestamp bool) ([]byte, error) { // volumes. If all volumes report IsNotExist, // we return a NotFoundError. if !os.IsNotExist(err) { - log.Printf("GetBlock: reading %s: %s\n", hash, err) + log.Printf("%s: Get(%s): %s", vol, hash, err) } continue } @@ -490,7 +475,7 @@ func GetBlock(hash string, update_timestamp bool) ([]byte, error) { if filehash != hash { // TODO: Try harder to tell a sysadmin about // this. - log.Printf("%s: checksum mismatch for request %s (actual %s)\n", + log.Printf("%s: checksum mismatch for request %s (actual %s)", vol, hash, filehash) error_to_caller = DiskHashError bufs.Put(buf) @@ -500,15 +485,6 @@ func GetBlock(hash string, update_timestamp bool) ([]byte, error) { log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned", vol, hash) } - if update_timestamp { - if err := vol.Touch(hash); err != nil { - error_to_caller = GenericError - log.Printf("%s: Touch %s failed: %s", - vol, hash, error_to_caller) - bufs.Put(buf) - continue - } - } return buf, nil } return nil, error_to_caller @@ -548,21 +524,11 @@ func PutBlock(block []byte, hash string) error { return RequestHashError } - // If we already have a block on disk under this identifier, return - // success (but check for MD5 collisions). While fetching the block, - // update its timestamp. - // The only errors that GetBlock can return are DiskHashError and NotFoundError. - // In either case, we want to write our new (good) block to disk, - // so there is nothing special to do if err != nil. - // - if oldblock, err := GetBlock(hash, true); err == nil { - defer bufs.Put(oldblock) - if bytes.Compare(block, oldblock) == 0 { - // The block already exists; return success. - return nil - } else { - return CollisionError - } + // If we already have this data, it's intact on disk, and we + // can update its timestamp, return success. If we have + // different data with the same hash, return failure. + if err := CompareAndTouch(hash, block); err == nil || err == CollisionError { + return err } // Choose a Keep volume to write to. @@ -590,7 +556,7 @@ func PutBlock(block []byte, hash string) error { // write did not succeed. Report the // error and continue trying. allFull = false - log.Printf("%s: Write(%s): %s\n", vol, hash, err) + log.Printf("%s: Write(%s): %s", vol, hash, err) } } @@ -603,6 +569,43 @@ func PutBlock(block []byte, hash string) error { } } +// CompareAndTouch returns nil if one of the volumes already has the +// given content and it successfully updates the relevant block's +// modification time in order to protect it from premature garbage +// collection. +func CompareAndTouch(hash string, buf []byte) error { + var bestErr error = NotFoundError + for _, vol := range KeepVM.AllWritable() { + if err := vol.Compare(hash, buf); err == CollisionError { + // Stop if we have a block with same hash but + // different content. (It will be impossible + // to tell which one is wanted if we have + // both, so there's no point writing it even + // on a different volume.) + log.Printf("%s: Compare(%s): %s", vol, hash, err) + return err + } else if os.IsNotExist(err) { + // Block does not exist. This is the only + // "normal" error: we don't log anything. + continue + } else if err != nil { + // Couldn't open file, data is corrupt on + // disk, etc.: log this abnormal condition, + // and try the next volume. + log.Printf("%s: Compare(%s): %s", vol, hash, err) + continue + } + if err := vol.Touch(hash); err != nil { + log.Printf("%s: Touch %s failed: %s", vol, hash, err) + bestErr = err + continue + } + // Compare and Touch both worked --> done. + return nil + } + return bestErr +} + var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`) // IsValidLocator @@ -634,7 +637,7 @@ func GetApiToken(req *http.Request) string { func IsExpired(timestamp_hex string) bool { ts, err := strconv.ParseInt(timestamp_hex, 16, 0) if err != nil { - log.Printf("IsExpired: %s\n", err) + log.Printf("IsExpired: %s", err) return true } return time.Unix(ts, 0).Before(time.Now())