package main // REST handlers for Keep are implemented here. // // GetBlockHandler (GET /locator) // PutBlockHandler (PUT /locator) // IndexHandler (GET /index, GET /index/prefix) // StatusHandler (GET /status.json) import ( "bytes" "container/list" "crypto/md5" "encoding/json" "fmt" "github.com/gorilla/mux" "io" "log" "net/http" "os" "regexp" "runtime" "strconv" "sync" "time" ) // MakeRESTRouter returns a new mux.Router that forwards all Keep // requests to the appropriate handlers. // func MakeRESTRouter() *mux.Router { rest := mux.NewRouter() rest.HandleFunc( `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD") rest.HandleFunc( `/{hash:[0-9a-f]{32}}+{hints}`, GetBlockHandler).Methods("GET", "HEAD") rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT") rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE") // List all blocks stored here. Privileged client only. rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD") // List blocks stored here whose hash has the given prefix. // Privileged client only. rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD") // List volumes: path, device number, bytes used/avail. rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD") // Replace the current pull queue. rest.HandleFunc(`/pull`, PullHandler).Methods("PUT") // Replace the current trash queue. rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT") // Any request which does not match any of these routes gets // 400 Bad Request. rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler) return rest } func BadRequestHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode) } func GetBlockHandler(resp http.ResponseWriter, req *http.Request) { if enforce_permissions { locator := req.URL.Path[1:] // strip leading slash if err := VerifySignature(locator, GetApiToken(req)); err != nil { http.Error(resp, err.Error(), err.(*KeepError).HTTPCode) return } } block, err := GetBlock(mux.Vars(req)["hash"], false) if err != nil { // This type assertion is safe because the only errors // GetBlock can return are DiskHashError or NotFoundError. http.Error(resp, err.Error(), err.(*KeepError).HTTPCode) return } defer bufs.Put(block) resp.Header().Set("Content-Length", strconv.Itoa(len(block))) resp.Header().Set("Content-Type", "application/octet-stream") resp.Write(block) } func PutBlockHandler(resp http.ResponseWriter, req *http.Request) { hash := mux.Vars(req)["hash"] // Detect as many error conditions as possible before reading // the body: avoid transmitting data that will not end up // being written anyway. if req.ContentLength == -1 { http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode) return } if req.ContentLength > BLOCKSIZE { http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode) return } if len(KeepVM.AllWritable()) == 0 { http.Error(resp, FullError.Error(), FullError.HTTPCode) return } buf := bufs.Get(int(req.ContentLength)) _, err := io.ReadFull(req.Body, buf) if err != nil { http.Error(resp, err.Error(), 500) bufs.Put(buf) return } err = PutBlock(buf, hash) bufs.Put(buf) if err != nil { ke := err.(*KeepError) http.Error(resp, ke.Error(), ke.HTTPCode) return } // Success; add a size hint, sign the locator if possible, and // return it to the client. return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength) api_token := GetApiToken(req) if PermissionSecret != nil && api_token != "" { expiry := time.Now().Add(blob_signature_ttl) return_hash = SignLocator(return_hash, api_token, expiry) } resp.Write([]byte(return_hash + "\n")) } // IndexHandler // A HandleFunc to address /index and /index/{prefix} requests. // func IndexHandler(resp http.ResponseWriter, req *http.Request) { // Reject unauthorized requests. if !IsDataManagerToken(GetApiToken(req)) { http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) return } prefix := mux.Vars(req)["prefix"] for _, vol := range KeepVM.AllReadable() { if err := vol.IndexTo(prefix, resp); err != nil { // The only errors returned by IndexTo are // write errors returned by resp.Write(), // which probably means the client has // disconnected and this error will never be // reported to the client -- but it will // appear in our own error log. http.Error(resp, err.Error(), http.StatusInternalServerError) return } } // An empty line at EOF is the only way the client can be // assured the entire index was received. resp.Write([]byte{'\n'}) } // StatusHandler // Responds to /status.json requests with the current node status, // described in a JSON structure. // // The data given in a status.json response includes: // volumes - a list of Keep volumes currently in use by this server // each volume is an object with the following fields: // * mount_point // * device_num (an integer identifying the underlying filesystem) // * bytes_free // * bytes_used // type VolumeStatus struct { MountPoint string `json:"mount_point"` DeviceNum uint64 `json:"device_num"` BytesFree uint64 `json:"bytes_free"` BytesUsed uint64 `json:"bytes_used"` } type PoolStatus struct { Alloc uint64 `json:"BytesAllocated"` Cap int `json:"BuffersMax"` Len int `json:"BuffersInUse"` } type NodeStatus struct { Volumes []*VolumeStatus `json:"volumes"` BufferPool PoolStatus Memory runtime.MemStats } var st NodeStatus var stLock sync.Mutex func StatusHandler(resp http.ResponseWriter, req *http.Request) { stLock.Lock() ReadNodeStatus(&st) jstat, err := json.Marshal(&st) stLock.Unlock() if err == nil { resp.Write(jstat) } else { log.Printf("json.Marshal: %s\n", err) log.Printf("NodeStatus = %v\n", &st) http.Error(resp, err.Error(), 500) } } // ReadNodeStatus populates the given NodeStatus struct with current // values. // func ReadNodeStatus(st *NodeStatus) { vols := KeepVM.AllReadable() if cap(st.Volumes) < len(vols) { st.Volumes = make([]*VolumeStatus, len(vols)) } st.Volumes = st.Volumes[:0] for _, vol := range vols { if s := vol.Status(); s != nil { st.Volumes = append(st.Volumes, s) } } st.BufferPool.Alloc = bufs.Alloc() st.BufferPool.Cap = bufs.Cap() st.BufferPool.Len = bufs.Len() runtime.ReadMemStats(&st.Memory) } // DeleteHandler processes DELETE requests. // // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash // from all connected volumes. // // Only the Data Manager, or an Arvados admin with scope "all", are // allowed to issue DELETE requests. If a DELETE request is not // authenticated or is issued by a non-admin user, the server returns // a PermissionError. // // Upon receiving a valid request from an authorized user, // DeleteHandler deletes all copies of the specified block on local // writable volumes. // // Response format: // // If the requested blocks was not found on any volume, the response // code is HTTP 404 Not Found. // // Otherwise, the response code is 200 OK, with a response body // consisting of the JSON message // // {"copies_deleted":d,"copies_failed":f} // // where d and f are integers representing the number of blocks that // were successfully and unsuccessfully deleted. // func DeleteHandler(resp http.ResponseWriter, req *http.Request) { hash := mux.Vars(req)["hash"] // Confirm that this user is an admin and has a token with unlimited scope. var tok = GetApiToken(req) if tok == "" || !CanDelete(tok) { http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode) return } if never_delete { http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode) return } // Delete copies of this block from all available volumes. // Report how many blocks were successfully deleted, and how // many were found on writable volumes but not deleted. var result struct { Deleted int `json:"copies_deleted"` Failed int `json:"copies_failed"` } for _, vol := range KeepVM.AllWritable() { if err := vol.Delete(hash); err == nil { result.Deleted++ } else if os.IsNotExist(err) { continue } else { result.Failed++ log.Println("DeleteHandler:", err) } } var st int if result.Deleted == 0 && result.Failed == 0 { st = http.StatusNotFound } else { st = http.StatusOK } resp.WriteHeader(st) if st == http.StatusOK { if body, err := json.Marshal(result); err == nil { resp.Write(body) } else { log.Printf("json.Marshal: %s (result = %v)\n", err, result) http.Error(resp, err.Error(), 500) } } } /* PullHandler processes "PUT /pull" requests for the data manager. The request body is a JSON message containing a list of pull requests in the following format: [ { "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985", "servers":[ "keep0.qr1hi.arvadosapi.com:25107", "keep1.qr1hi.arvadosapi.com:25108" ] }, { "locator":"55ae4d45d2db0793d53f03e805f656e5+658395", "servers":[ "10.0.1.5:25107", "10.0.1.6:25107", "10.0.1.7:25108" ] }, ... ] Each pull request in the list consists of a block locator string and an ordered list of servers. Keepstore should try to fetch the block from each server in turn. If the request has not been sent by the Data Manager, return 401 Unauthorized. If the JSON unmarshalling fails, return 400 Bad Request. */ type PullRequest struct { Locator string `json:"locator"` Servers []string `json:"servers"` } func PullHandler(resp http.ResponseWriter, req *http.Request) { // Reject unauthorized requests. if !IsDataManagerToken(GetApiToken(req)) { http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) return } // Parse the request body. var pr []PullRequest r := json.NewDecoder(req.Body) if err := r.Decode(&pr); err != nil { http.Error(resp, err.Error(), BadRequestError.HTTPCode) return } // We have a properly formatted pull list sent from the data // manager. Report success and send the list to the pull list // manager for further handling. resp.WriteHeader(http.StatusOK) resp.Write([]byte( fmt.Sprintf("Received %d pull requests\n", len(pr)))) plist := list.New() for _, p := range pr { plist.PushBack(p) } pullq.ReplaceQueue(plist) } type TrashRequest struct { Locator string `json:"locator"` BlockMtime int64 `json:"block_mtime"` } func TrashHandler(resp http.ResponseWriter, req *http.Request) { // Reject unauthorized requests. if !IsDataManagerToken(GetApiToken(req)) { http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) return } // Parse the request body. var trash []TrashRequest r := json.NewDecoder(req.Body) if err := r.Decode(&trash); err != nil { http.Error(resp, err.Error(), BadRequestError.HTTPCode) return } // We have a properly formatted trash list sent from the data // manager. Report success and send the list to the trash work // queue for further handling. resp.WriteHeader(http.StatusOK) resp.Write([]byte( fmt.Sprintf("Received %d trash requests\n", len(trash)))) tlist := list.New() for _, t := range trash { tlist.PushBack(t) } trashq.ReplaceQueue(tlist) } // ============================== // GetBlock and PutBlock implement lower-level code for handling // blocks by rooting through volumes connected to the local machine. // Once the handler has determined that system policy permits the // request, it calls these methods to perform the actual operation. // // TODO(twp): this code would probably be better located in the // VolumeManager interface. As an abstraction, the VolumeManager // should be the only part of the code that cares about which volume a // block is stored on, so it should be responsible for figuring out // which volume to check for fetching blocks, storing blocks, etc. // ============================== // GetBlock fetches and returns the block identified by "hash". If // the update_timestamp argument is true, GetBlock also updates the // block's file modification time (for the sake of PutBlock, which // must update the file's timestamp when the block already exists). // // On success, GetBlock returns a byte slice with the block data, and // a nil error. // // If the block cannot be found on any volume, returns NotFoundError. // // If the block found does not have the correct MD5 hash, returns // DiskHashError. // func GetBlock(hash string, update_timestamp bool) ([]byte, error) { // Attempt to read the requested hash from a keep volume. error_to_caller := NotFoundError var vols []Volume if update_timestamp { // Pointless to find the block on an unwritable volume // because Touch() will fail -- this is as good as // "not found" for purposes of callers who need to // update_timestamp. vols = KeepVM.AllWritable() } else { vols = KeepVM.AllReadable() } for _, vol := range vols { buf, err := vol.Get(hash) if err != nil { // IsNotExist is an expected error and may be // ignored. All other errors are logged. In // any case we continue trying to read other // volumes. If all volumes report IsNotExist, // we return a NotFoundError. if !os.IsNotExist(err) { log.Printf("GetBlock: reading %s: %s\n", hash, err) } continue } // Check the file checksum. // filehash := fmt.Sprintf("%x", md5.Sum(buf)) if filehash != hash { // TODO: Try harder to tell a sysadmin about // this. log.Printf("%s: checksum mismatch for request %s (actual %s)\n", vol, hash, filehash) error_to_caller = DiskHashError bufs.Put(buf) continue } if error_to_caller == DiskHashError { log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned", vol, hash) } if update_timestamp { if err := vol.Touch(hash); err != nil { error_to_caller = GenericError log.Printf("%s: Touch %s failed: %s", vol, hash, error_to_caller) bufs.Put(buf) continue } } return buf, nil } return nil, error_to_caller } /* PutBlock(block, hash) Stores the BLOCK (identified by the content id HASH) in Keep. The MD5 checksum of the block must be identical to the content id HASH. If not, an error is returned. PutBlock stores the BLOCK on the first Keep volume with free space. A failure code is returned to the user only if all volumes fail. On success, PutBlock returns nil. On failure, it returns a KeepError with one of the following codes: 500 Collision A different block with the same hash already exists on this Keep server. 422 MD5Fail The MD5 hash of the BLOCK does not match the argument HASH. 503 Full There was not enough space left in any Keep volume to store the object. 500 Fail The object could not be stored for some other reason (e.g. all writes failed). The text of the error message should provide as much detail as possible. */ func PutBlock(block []byte, hash string) error { // Check that BLOCK's checksum matches HASH. blockhash := fmt.Sprintf("%x", md5.Sum(block)) if blockhash != hash { log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash) return RequestHashError } // If we already have a block on disk under this identifier, return // success (but check for MD5 collisions). While fetching the block, // update its timestamp. // The only errors that GetBlock can return are DiskHashError and NotFoundError. // In either case, we want to write our new (good) block to disk, // so there is nothing special to do if err != nil. // if oldblock, err := GetBlock(hash, true); err == nil { defer bufs.Put(oldblock) if bytes.Compare(block, oldblock) == 0 { // The block already exists; return success. return nil } else { return CollisionError } } // Choose a Keep volume to write to. // If this volume fails, try all of the volumes in order. if vol := KeepVM.NextWritable(); vol != nil { if err := vol.Put(hash, block); err == nil { return nil // success! } } writables := KeepVM.AllWritable() if len(writables) == 0 { log.Print("No writable volumes.") return FullError } allFull := true for _, vol := range writables { err := vol.Put(hash, block) if err == nil { return nil // success! } if err != FullError { // The volume is not full but the // write did not succeed. Report the // error and continue trying. allFull = false log.Printf("%s: Write(%s): %s\n", vol, hash, err) } } if allFull { log.Print("All volumes are full.") return FullError } else { // Already logged the non-full errors. return GenericError } } var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`) // IsValidLocator // Return true if the specified string is a valid Keep locator. // When Keep is extended to support hash types other than MD5, // this should be updated to cover those as well. // func IsValidLocator(loc string) bool { return validLocatorRe.MatchString(loc) } var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`) // GetApiToken returns the OAuth2 token from the Authorization // header of a HTTP request, or an empty string if no matching // token is found. func GetApiToken(req *http.Request) string { if auth, ok := req.Header["Authorization"]; ok { if match := authRe.FindStringSubmatch(auth[0]); match != nil { return match[1] } } return "" } // IsExpired returns true if the given Unix timestamp (expressed as a // hexadecimal string) is in the past, or if timestamp_hex cannot be // parsed as a hexadecimal string. func IsExpired(timestamp_hex string) bool { ts, err := strconv.ParseInt(timestamp_hex, 16, 0) if err != nil { log.Printf("IsExpired: %s\n", err) return true } return time.Unix(ts, 0).Before(time.Now()) } // CanDelete returns true if the user identified by api_token is // allowed to delete blocks. func CanDelete(api_token string) bool { if api_token == "" { return false } // Blocks may be deleted only when Keep has been configured with a // data manager. if IsDataManagerToken(api_token) { return true } // TODO(twp): look up api_token with the API server // return true if is_admin is true and if the token // has unlimited scope return false } // IsDataManagerToken returns true if api_token represents the data // manager's token. func IsDataManagerToken(api_token string) bool { return data_manager_token != "" && api_token == data_manager_token }