Merge branch 'master' into 6569-smarter-jobs-image
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bytes"
12         "container/list"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "sync"
25         "time"
26 )
27
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
30 //
31 func MakeRESTRouter() *mux.Router {
32         rest := mux.NewRouter()
33
34         rest.HandleFunc(
35                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}+{hints}`,
38                 GetBlockHandler).Methods("GET", "HEAD")
39
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42         // List all blocks stored here. Privileged client only.
43         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44         // List blocks stored here whose hash has the given prefix.
45         // Privileged client only.
46         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47
48         // List volumes: path, device number, bytes used/avail.
49         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50
51         // Replace the current pull queue.
52         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53
54         // Replace the current trash queue.
55         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56
57         // Any request which does not match any of these routes gets
58         // 400 Bad Request.
59         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
60
61         return rest
62 }
63
64 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
65         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
66 }
67
68 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
69         if enforce_permissions {
70                 locator := req.URL.Path[1:] // strip leading slash
71                 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
72                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
73                         return
74                 }
75         }
76
77         block, err := GetBlock(mux.Vars(req)["hash"], false)
78         if err != nil {
79                 // This type assertion is safe because the only errors
80                 // GetBlock can return are DiskHashError or NotFoundError.
81                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
82                 return
83         }
84         defer bufs.Put(block)
85
86         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
87         resp.Header().Set("Content-Type", "application/octet-stream")
88         resp.Write(block)
89 }
90
91 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
92         hash := mux.Vars(req)["hash"]
93
94         // Detect as many error conditions as possible before reading
95         // the body: avoid transmitting data that will not end up
96         // being written anyway.
97
98         if req.ContentLength == -1 {
99                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
100                 return
101         }
102
103         if req.ContentLength > BLOCKSIZE {
104                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
105                 return
106         }
107
108         if len(KeepVM.AllWritable()) == 0 {
109                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
110                 return
111         }
112
113         buf := bufs.Get(int(req.ContentLength))
114         _, err := io.ReadFull(req.Body, buf)
115         if err != nil {
116                 http.Error(resp, err.Error(), 500)
117                 bufs.Put(buf)
118                 return
119         }
120
121         err = PutBlock(buf, hash)
122         bufs.Put(buf)
123
124         if err != nil {
125                 ke := err.(*KeepError)
126                 http.Error(resp, ke.Error(), ke.HTTPCode)
127                 return
128         }
129
130         // Success; add a size hint, sign the locator if possible, and
131         // return it to the client.
132         return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
133         api_token := GetApiToken(req)
134         if PermissionSecret != nil && api_token != "" {
135                 expiry := time.Now().Add(blob_signature_ttl)
136                 return_hash = SignLocator(return_hash, api_token, expiry)
137         }
138         resp.Write([]byte(return_hash + "\n"))
139 }
140
141 // IndexHandler
142 //     A HandleFunc to address /index and /index/{prefix} requests.
143 //
144 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
145         // Reject unauthorized requests.
146         if !IsDataManagerToken(GetApiToken(req)) {
147                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
148                 return
149         }
150
151         prefix := mux.Vars(req)["prefix"]
152
153         for _, vol := range KeepVM.AllReadable() {
154                 if err := vol.IndexTo(prefix, resp); err != nil {
155                         // The only errors returned by IndexTo are
156                         // write errors returned by resp.Write(),
157                         // which probably means the client has
158                         // disconnected and this error will never be
159                         // reported to the client -- but it will
160                         // appear in our own error log.
161                         http.Error(resp, err.Error(), http.StatusInternalServerError)
162                         return
163                 }
164         }
165         // An empty line at EOF is the only way the client can be
166         // assured the entire index was received.
167         resp.Write([]byte{'\n'})
168 }
169
170 // StatusHandler
171 //     Responds to /status.json requests with the current node status,
172 //     described in a JSON structure.
173 //
174 //     The data given in a status.json response includes:
175 //        volumes - a list of Keep volumes currently in use by this server
176 //          each volume is an object with the following fields:
177 //            * mount_point
178 //            * device_num (an integer identifying the underlying filesystem)
179 //            * bytes_free
180 //            * bytes_used
181 //
182 type VolumeStatus struct {
183         MountPoint string `json:"mount_point"`
184         DeviceNum  uint64 `json:"device_num"`
185         BytesFree  uint64 `json:"bytes_free"`
186         BytesUsed  uint64 `json:"bytes_used"`
187 }
188
189 type PoolStatus struct {
190         Alloc uint64 `json:"BytesAllocated"`
191         Cap   int    `json:"BuffersMax"`
192         Len   int    `json:"BuffersInUse"`
193 }
194
195 type NodeStatus struct {
196         Volumes    []*VolumeStatus  `json:"volumes"`
197         BufferPool PoolStatus
198         Memory     runtime.MemStats
199 }
200
201 var st NodeStatus
202 var stLock sync.Mutex
203 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
204         stLock.Lock()
205         ReadNodeStatus(&st)
206         jstat, err := json.Marshal(&st)
207         stLock.Unlock()
208         if err == nil {
209                 resp.Write(jstat)
210         } else {
211                 log.Printf("json.Marshal: %s\n", err)
212                 log.Printf("NodeStatus = %v\n", &st)
213                 http.Error(resp, err.Error(), 500)
214         }
215 }
216
217 // ReadNodeStatus populates the given NodeStatus struct with current
218 // values.
219 //
220 func ReadNodeStatus(st *NodeStatus) {
221         vols := KeepVM.AllReadable()
222         if cap(st.Volumes) < len(vols) {
223                 st.Volumes = make([]*VolumeStatus, len(vols))
224         }
225         st.Volumes = st.Volumes[:0]
226         for _, vol := range vols {
227                 if s := vol.Status(); s != nil {
228                         st.Volumes = append(st.Volumes, s)
229                 }
230         }
231         st.BufferPool.Alloc = bufs.Alloc()
232         st.BufferPool.Cap = bufs.Cap()
233         st.BufferPool.Len = bufs.Len()
234         runtime.ReadMemStats(&st.Memory)
235 }
236
237 // DeleteHandler processes DELETE requests.
238 //
239 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
240 // from all connected volumes.
241 //
242 // Only the Data Manager, or an Arvados admin with scope "all", are
243 // allowed to issue DELETE requests.  If a DELETE request is not
244 // authenticated or is issued by a non-admin user, the server returns
245 // a PermissionError.
246 //
247 // Upon receiving a valid request from an authorized user,
248 // DeleteHandler deletes all copies of the specified block on local
249 // writable volumes.
250 //
251 // Response format:
252 //
253 // If the requested blocks was not found on any volume, the response
254 // code is HTTP 404 Not Found.
255 //
256 // Otherwise, the response code is 200 OK, with a response body
257 // consisting of the JSON message
258 //
259 //    {"copies_deleted":d,"copies_failed":f}
260 //
261 // where d and f are integers representing the number of blocks that
262 // were successfully and unsuccessfully deleted.
263 //
264 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
265         hash := mux.Vars(req)["hash"]
266
267         // Confirm that this user is an admin and has a token with unlimited scope.
268         var tok = GetApiToken(req)
269         if tok == "" || !CanDelete(tok) {
270                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
271                 return
272         }
273
274         if never_delete {
275                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
276                 return
277         }
278
279         // Delete copies of this block from all available volumes.
280         // Report how many blocks were successfully deleted, and how
281         // many were found on writable volumes but not deleted.
282         var result struct {
283                 Deleted int `json:"copies_deleted"`
284                 Failed  int `json:"copies_failed"`
285         }
286         for _, vol := range KeepVM.AllWritable() {
287                 if err := vol.Delete(hash); err == nil {
288                         result.Deleted++
289                 } else if os.IsNotExist(err) {
290                         continue
291                 } else {
292                         result.Failed++
293                         log.Println("DeleteHandler:", err)
294                 }
295         }
296
297         var st int
298
299         if result.Deleted == 0 && result.Failed == 0 {
300                 st = http.StatusNotFound
301         } else {
302                 st = http.StatusOK
303         }
304
305         resp.WriteHeader(st)
306
307         if st == http.StatusOK {
308                 if body, err := json.Marshal(result); err == nil {
309                         resp.Write(body)
310                 } else {
311                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
312                         http.Error(resp, err.Error(), 500)
313                 }
314         }
315 }
316
317 /* PullHandler processes "PUT /pull" requests for the data manager.
318    The request body is a JSON message containing a list of pull
319    requests in the following format:
320
321    [
322       {
323          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
324          "servers":[
325                         "keep0.qr1hi.arvadosapi.com:25107",
326                         "keep1.qr1hi.arvadosapi.com:25108"
327                  ]
328           },
329           {
330                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
331                  "servers":[
332                         "10.0.1.5:25107",
333                         "10.0.1.6:25107",
334                         "10.0.1.7:25108"
335                  ]
336           },
337           ...
338    ]
339
340    Each pull request in the list consists of a block locator string
341    and an ordered list of servers.  Keepstore should try to fetch the
342    block from each server in turn.
343
344    If the request has not been sent by the Data Manager, return 401
345    Unauthorized.
346
347    If the JSON unmarshalling fails, return 400 Bad Request.
348 */
349
350 type PullRequest struct {
351         Locator string   `json:"locator"`
352         Servers []string `json:"servers"`
353 }
354
355 func PullHandler(resp http.ResponseWriter, req *http.Request) {
356         // Reject unauthorized requests.
357         if !IsDataManagerToken(GetApiToken(req)) {
358                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
359                 return
360         }
361
362         // Parse the request body.
363         var pr []PullRequest
364         r := json.NewDecoder(req.Body)
365         if err := r.Decode(&pr); err != nil {
366                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
367                 return
368         }
369
370         // We have a properly formatted pull list sent from the data
371         // manager.  Report success and send the list to the pull list
372         // manager for further handling.
373         resp.WriteHeader(http.StatusOK)
374         resp.Write([]byte(
375                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
376
377         plist := list.New()
378         for _, p := range pr {
379                 plist.PushBack(p)
380         }
381         pullq.ReplaceQueue(plist)
382 }
383
384 type TrashRequest struct {
385         Locator    string `json:"locator"`
386         BlockMtime int64  `json:"block_mtime"`
387 }
388
389 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
390         // Reject unauthorized requests.
391         if !IsDataManagerToken(GetApiToken(req)) {
392                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
393                 return
394         }
395
396         // Parse the request body.
397         var trash []TrashRequest
398         r := json.NewDecoder(req.Body)
399         if err := r.Decode(&trash); err != nil {
400                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
401                 return
402         }
403
404         // We have a properly formatted trash list sent from the data
405         // manager.  Report success and send the list to the trash work
406         // queue for further handling.
407         resp.WriteHeader(http.StatusOK)
408         resp.Write([]byte(
409                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
410
411         tlist := list.New()
412         for _, t := range trash {
413                 tlist.PushBack(t)
414         }
415         trashq.ReplaceQueue(tlist)
416 }
417
418 // ==============================
419 // GetBlock and PutBlock implement lower-level code for handling
420 // blocks by rooting through volumes connected to the local machine.
421 // Once the handler has determined that system policy permits the
422 // request, it calls these methods to perform the actual operation.
423 //
424 // TODO(twp): this code would probably be better located in the
425 // VolumeManager interface. As an abstraction, the VolumeManager
426 // should be the only part of the code that cares about which volume a
427 // block is stored on, so it should be responsible for figuring out
428 // which volume to check for fetching blocks, storing blocks, etc.
429
430 // ==============================
431 // GetBlock fetches and returns the block identified by "hash".  If
432 // the update_timestamp argument is true, GetBlock also updates the
433 // block's file modification time (for the sake of PutBlock, which
434 // must update the file's timestamp when the block already exists).
435 //
436 // On success, GetBlock returns a byte slice with the block data, and
437 // a nil error.
438 //
439 // If the block cannot be found on any volume, returns NotFoundError.
440 //
441 // If the block found does not have the correct MD5 hash, returns
442 // DiskHashError.
443 //
444
445 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
446         // Attempt to read the requested hash from a keep volume.
447         error_to_caller := NotFoundError
448
449         var vols []Volume
450         if update_timestamp {
451                 // Pointless to find the block on an unwritable volume
452                 // because Touch() will fail -- this is as good as
453                 // "not found" for purposes of callers who need to
454                 // update_timestamp.
455                 vols = KeepVM.AllWritable()
456         } else {
457                 vols = KeepVM.AllReadable()
458         }
459
460         for _, vol := range vols {
461                 buf, err := vol.Get(hash)
462                 if err != nil {
463                         // IsNotExist is an expected error and may be
464                         // ignored. All other errors are logged. In
465                         // any case we continue trying to read other
466                         // volumes. If all volumes report IsNotExist,
467                         // we return a NotFoundError.
468                         if !os.IsNotExist(err) {
469                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
470                         }
471                         continue
472                 }
473                 // Check the file checksum.
474                 //
475                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
476                 if filehash != hash {
477                         // TODO: Try harder to tell a sysadmin about
478                         // this.
479                         log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
480                                 vol, hash, filehash)
481                         error_to_caller = DiskHashError
482                         bufs.Put(buf)
483                         continue
484                 }
485                 if error_to_caller == DiskHashError {
486                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
487                                 vol, hash)
488                 }
489                 if update_timestamp {
490                         if err := vol.Touch(hash); err != nil {
491                                 error_to_caller = GenericError
492                                 log.Printf("%s: Touch %s failed: %s",
493                                         vol, hash, error_to_caller)
494                                 bufs.Put(buf)
495                                 continue
496                         }
497                 }
498                 return buf, nil
499         }
500         return nil, error_to_caller
501 }
502
503 /* PutBlock(block, hash)
504    Stores the BLOCK (identified by the content id HASH) in Keep.
505
506    The MD5 checksum of the block must be identical to the content id HASH.
507    If not, an error is returned.
508
509    PutBlock stores the BLOCK on the first Keep volume with free space.
510    A failure code is returned to the user only if all volumes fail.
511
512    On success, PutBlock returns nil.
513    On failure, it returns a KeepError with one of the following codes:
514
515    500 Collision
516           A different block with the same hash already exists on this
517           Keep server.
518    422 MD5Fail
519           The MD5 hash of the BLOCK does not match the argument HASH.
520    503 Full
521           There was not enough space left in any Keep volume to store
522           the object.
523    500 Fail
524           The object could not be stored for some other reason (e.g.
525           all writes failed). The text of the error message should
526           provide as much detail as possible.
527 */
528
529 func PutBlock(block []byte, hash string) error {
530         // Check that BLOCK's checksum matches HASH.
531         blockhash := fmt.Sprintf("%x", md5.Sum(block))
532         if blockhash != hash {
533                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
534                 return RequestHashError
535         }
536
537         // If we already have a block on disk under this identifier, return
538         // success (but check for MD5 collisions).  While fetching the block,
539         // update its timestamp.
540         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
541         // In either case, we want to write our new (good) block to disk,
542         // so there is nothing special to do if err != nil.
543         //
544         if oldblock, err := GetBlock(hash, true); err == nil {
545                 defer bufs.Put(oldblock)
546                 if bytes.Compare(block, oldblock) == 0 {
547                         // The block already exists; return success.
548                         return nil
549                 } else {
550                         return CollisionError
551                 }
552         }
553
554         // Choose a Keep volume to write to.
555         // If this volume fails, try all of the volumes in order.
556         if vol := KeepVM.NextWritable(); vol != nil {
557                 if err := vol.Put(hash, block); err == nil {
558                         return nil // success!
559                 }
560         }
561
562         writables := KeepVM.AllWritable()
563         if len(writables) == 0 {
564                 log.Print("No writable volumes.")
565                 return FullError
566         }
567
568         allFull := true
569         for _, vol := range writables {
570                 err := vol.Put(hash, block)
571                 if err == nil {
572                         return nil // success!
573                 }
574                 if err != FullError {
575                         // The volume is not full but the
576                         // write did not succeed.  Report the
577                         // error and continue trying.
578                         allFull = false
579                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
580                 }
581         }
582
583         if allFull {
584                 log.Print("All volumes are full.")
585                 return FullError
586         } else {
587                 // Already logged the non-full errors.
588                 return GenericError
589         }
590 }
591
592 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
593
594 // IsValidLocator
595 //     Return true if the specified string is a valid Keep locator.
596 //     When Keep is extended to support hash types other than MD5,
597 //     this should be updated to cover those as well.
598 //
599 func IsValidLocator(loc string) bool {
600         return validLocatorRe.MatchString(loc)
601 }
602
603 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
604
605 // GetApiToken returns the OAuth2 token from the Authorization
606 // header of a HTTP request, or an empty string if no matching
607 // token is found.
608 func GetApiToken(req *http.Request) string {
609         if auth, ok := req.Header["Authorization"]; ok {
610                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
611                         return match[1]
612                 }
613         }
614         return ""
615 }
616
617 // IsExpired returns true if the given Unix timestamp (expressed as a
618 // hexadecimal string) is in the past, or if timestamp_hex cannot be
619 // parsed as a hexadecimal string.
620 func IsExpired(timestamp_hex string) bool {
621         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
622         if err != nil {
623                 log.Printf("IsExpired: %s\n", err)
624                 return true
625         }
626         return time.Unix(ts, 0).Before(time.Now())
627 }
628
629 // CanDelete returns true if the user identified by api_token is
630 // allowed to delete blocks.
631 func CanDelete(api_token string) bool {
632         if api_token == "" {
633                 return false
634         }
635         // Blocks may be deleted only when Keep has been configured with a
636         // data manager.
637         if IsDataManagerToken(api_token) {
638                 return true
639         }
640         // TODO(twp): look up api_token with the API server
641         // return true if is_admin is true and if the token
642         // has unlimited scope
643         return false
644 }
645
646 // IsDataManagerToken returns true if api_token represents the data
647 // manager's token.
648 func IsDataManagerToken(api_token string) bool {
649         return data_manager_token != "" && api_token == data_manager_token
650 }