rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
-
- // For IndexHandler we support:
- // /index - returns all locators
- // /index/{prefix} - returns all locators that begin with {prefix}
- // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
- // If {prefix} is the empty string, return an index of all locators
- // (so /index and /index/ behave identically)
- // A client may supply a full 32-digit locator string, in which
- // case the server will return an index with either zero or one
- // entries. This usage allows a client to check whether a block is
- // present, and its size and upload time, without retrieving the
- // entire block.
- //
+ // List all blocks stored here. Privileged client only.
rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
- rest.HandleFunc(
- `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+ // List blocks stored here whose hash has the given prefix.
+ // Privileged client only.
+ rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+
+ // List volumes: path, device number, bytes used/avail.
rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
- // The PullHandler and TrashHandler process "PUT /pull" and "PUT
- // /trash" requests from Data Manager. These requests instruct
- // Keep to replicate or delete blocks; see
- // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
- // for more details.
- //
- // Each handler parses the JSON list of block management requests
- // in the message body, and replaces any existing pull queue or
- // trash queue with their contentes.
- //
+ // Replace the current pull queue.
rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+
+ // Replace the current trash queue.
rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
// Any request which does not match any of these routes gets
}
block, err := GetBlock(hash, false)
-
- // Garbage collect after each GET. Fixes #2865.
- // TODO(twp): review Keep memory usage and see if there's
- // a better way to do this than blindly garbage collecting
- // after every block.
- defer runtime.GC()
-
if err != nil {
// This type assertion is safe because the only errors
// GetBlock can return are DiskHashError or NotFoundError.
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
return
}
+ defer bufs.Put(block)
- resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
-
- _, err = resp.Write(block)
-
- return
+ resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+ resp.Header().Set("Content-Type", "application/octet-stream")
+ resp.Write(block)
}
func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
- // Read the block data to be stored.
- // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
- //
+ // Detect as many error conditions as possible before reading
+ // the body: avoid transmitting data that will not end up
+ // being written anyway.
+
+ if req.ContentLength == -1 {
+ http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
+ return
+ }
+
if req.ContentLength > BLOCKSIZE {
http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
return
}
- buf := make([]byte, req.ContentLength)
- nread, err := io.ReadFull(req.Body, buf)
+ if len(KeepVM.AllWritable()) == 0 {
+ http.Error(resp, FullError.Error(), FullError.HTTPCode)
+ return
+ }
+
+ buf := bufs.Get(int(req.ContentLength))
+ _, err := io.ReadFull(req.Body, buf)
if err != nil {
http.Error(resp, err.Error(), 500)
- } else if int64(nread) < req.ContentLength {
- http.Error(resp, "request truncated", 500)
- } else {
- if err := PutBlock(buf, hash); err == nil {
- // Success; add a size hint, sign the locator if
- // possible, and return it to the client.
- return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
- api_token := GetApiToken(req)
- if PermissionSecret != nil && api_token != "" {
- expiry := time.Now().Add(permission_ttl)
- return_hash = SignLocator(return_hash, api_token, expiry)
- }
- resp.Write([]byte(return_hash + "\n"))
- } else {
- ke := err.(*KeepError)
- http.Error(resp, ke.Error(), ke.HTTPCode)
- }
+ bufs.Put(buf)
+ return
}
- return
+
+ err = PutBlock(buf, hash)
+ bufs.Put(buf)
+
+ if err != nil {
+ ke := err.(*KeepError)
+ http.Error(resp, ke.Error(), ke.HTTPCode)
+ return
+ }
+
+ // Success; add a size hint, sign the locator if possible, and
+ // return it to the client.
+ return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
+ api_token := GetApiToken(req)
+ if PermissionSecret != nil && api_token != "" {
+ expiry := time.Now().Add(blob_signature_ttl)
+ return_hash = SignLocator(return_hash, api_token, expiry)
+ }
+ resp.Write([]byte(return_hash + "\n"))
}
// IndexHandler
prefix := mux.Vars(req)["prefix"]
- var index string
for _, vol := range KeepVM.AllReadable() {
- index = index + vol.Index(prefix)
+ if err := vol.IndexTo(prefix, resp); err != nil {
+ // The only errors returned by IndexTo are
+ // write errors returned by resp.Write(),
+ // which probably means the client has
+ // disconnected and this error will never be
+ // reported to the client -- but it will
+ // appear in our own error log.
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
+ }
}
- resp.Write([]byte(index))
}
// StatusHandler
var pr []PullRequest
r := json.NewDecoder(req.Body)
if err := r.Decode(&pr); err != nil {
- http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+ http.Error(resp, err.Error(), BadRequestError.HTTPCode)
return
}
var trash []TrashRequest
r := json.NewDecoder(req.Body)
if err := r.Decode(&trash); err != nil {
- http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+ http.Error(resp, err.Error(), BadRequestError.HTTPCode)
return
}
log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
vol, hash, filehash)
error_to_caller = DiskHashError
+ bufs.Put(buf)
continue
}
if error_to_caller == DiskHashError {
error_to_caller = GenericError
log.Printf("%s: Touch %s failed: %s",
vol, hash, error_to_caller)
+ bufs.Put(buf)
continue
}
}
// so there is nothing special to do if err != nil.
//
if oldblock, err := GetBlock(hash, true); err == nil {
+ defer bufs.Put(oldblock)
if bytes.Compare(block, oldblock) == 0 {
// The block already exists; return success.
return nil