7121: Fixup log messages (remove excess \n, show which volume had a Get() error).
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "crypto/md5"
13         "encoding/json"
14         "fmt"
15         "github.com/gorilla/mux"
16         "io"
17         "log"
18         "net/http"
19         "os"
20         "regexp"
21         "runtime"
22         "strconv"
23         "sync"
24         "time"
25 )
26
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
29 //
30 func MakeRESTRouter() *mux.Router {
31         rest := mux.NewRouter()
32
33         rest.HandleFunc(
34                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}+{hints}`,
37                 GetBlockHandler).Methods("GET", "HEAD")
38
39         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41         // List all blocks stored here. Privileged client only.
42         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43         // List blocks stored here whose hash has the given prefix.
44         // Privileged client only.
45         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
46
47         // List volumes: path, device number, bytes used/avail.
48         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
49
50         // Replace the current pull queue.
51         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
52
53         // Replace the current trash queue.
54         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
55
56         // Any request which does not match any of these routes gets
57         // 400 Bad Request.
58         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
59
60         return rest
61 }
62
63 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
64         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
65 }
66
67 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
68         if enforce_permissions {
69                 locator := req.URL.Path[1:] // strip leading slash
70                 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
71                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
72                         return
73                 }
74         }
75
76         block, err := GetBlock(mux.Vars(req)["hash"])
77         if err != nil {
78                 // This type assertion is safe because the only errors
79                 // GetBlock can return are DiskHashError or NotFoundError.
80                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
81                 return
82         }
83         defer bufs.Put(block)
84
85         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
86         resp.Header().Set("Content-Type", "application/octet-stream")
87         resp.Write(block)
88 }
89
90 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
91         hash := mux.Vars(req)["hash"]
92
93         // Detect as many error conditions as possible before reading
94         // the body: avoid transmitting data that will not end up
95         // being written anyway.
96
97         if req.ContentLength == -1 {
98                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
99                 return
100         }
101
102         if req.ContentLength > BLOCKSIZE {
103                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
104                 return
105         }
106
107         if len(KeepVM.AllWritable()) == 0 {
108                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
109                 return
110         }
111
112         buf := bufs.Get(int(req.ContentLength))
113         _, err := io.ReadFull(req.Body, buf)
114         if err != nil {
115                 http.Error(resp, err.Error(), 500)
116                 bufs.Put(buf)
117                 return
118         }
119
120         err = PutBlock(buf, hash)
121         bufs.Put(buf)
122
123         if err != nil {
124                 ke := err.(*KeepError)
125                 http.Error(resp, ke.Error(), ke.HTTPCode)
126                 return
127         }
128
129         // Success; add a size hint, sign the locator if possible, and
130         // return it to the client.
131         return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
132         api_token := GetApiToken(req)
133         if PermissionSecret != nil && api_token != "" {
134                 expiry := time.Now().Add(blob_signature_ttl)
135                 return_hash = SignLocator(return_hash, api_token, expiry)
136         }
137         resp.Write([]byte(return_hash + "\n"))
138 }
139
140 // IndexHandler
141 //     A HandleFunc to address /index and /index/{prefix} requests.
142 //
143 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
144         // Reject unauthorized requests.
145         if !IsDataManagerToken(GetApiToken(req)) {
146                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
147                 return
148         }
149
150         prefix := mux.Vars(req)["prefix"]
151
152         for _, vol := range KeepVM.AllReadable() {
153                 if err := vol.IndexTo(prefix, resp); err != nil {
154                         // The only errors returned by IndexTo are
155                         // write errors returned by resp.Write(),
156                         // which probably means the client has
157                         // disconnected and this error will never be
158                         // reported to the client -- but it will
159                         // appear in our own error log.
160                         http.Error(resp, err.Error(), http.StatusInternalServerError)
161                         return
162                 }
163         }
164         // An empty line at EOF is the only way the client can be
165         // assured the entire index was received.
166         resp.Write([]byte{'\n'})
167 }
168
169 // StatusHandler
170 //     Responds to /status.json requests with the current node status,
171 //     described in a JSON structure.
172 //
173 //     The data given in a status.json response includes:
174 //        volumes - a list of Keep volumes currently in use by this server
175 //          each volume is an object with the following fields:
176 //            * mount_point
177 //            * device_num (an integer identifying the underlying filesystem)
178 //            * bytes_free
179 //            * bytes_used
180 //
181 type VolumeStatus struct {
182         MountPoint string `json:"mount_point"`
183         DeviceNum  uint64 `json:"device_num"`
184         BytesFree  uint64 `json:"bytes_free"`
185         BytesUsed  uint64 `json:"bytes_used"`
186 }
187
188 type PoolStatus struct {
189         Alloc uint64 `json:"BytesAllocated"`
190         Cap   int    `json:"BuffersMax"`
191         Len   int    `json:"BuffersInUse"`
192 }
193
194 type NodeStatus struct {
195         Volumes    []*VolumeStatus `json:"volumes"`
196         BufferPool PoolStatus
197         PullQueue  WorkQueueStatus
198         TrashQueue WorkQueueStatus
199         Memory     runtime.MemStats
200 }
201
202 var st NodeStatus
203 var stLock sync.Mutex
204
205 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
206         stLock.Lock()
207         readNodeStatus(&st)
208         jstat, err := json.Marshal(&st)
209         stLock.Unlock()
210         if err == nil {
211                 resp.Write(jstat)
212         } else {
213                 log.Printf("json.Marshal: %s", err)
214                 log.Printf("NodeStatus = %v", &st)
215                 http.Error(resp, err.Error(), 500)
216         }
217 }
218
219 // populate the given NodeStatus struct with current values.
220 func readNodeStatus(st *NodeStatus) {
221         vols := KeepVM.AllReadable()
222         if cap(st.Volumes) < len(vols) {
223                 st.Volumes = make([]*VolumeStatus, len(vols))
224         }
225         st.Volumes = st.Volumes[:0]
226         for _, vol := range vols {
227                 if s := vol.Status(); s != nil {
228                         st.Volumes = append(st.Volumes, s)
229                 }
230         }
231         st.BufferPool.Alloc = bufs.Alloc()
232         st.BufferPool.Cap = bufs.Cap()
233         st.BufferPool.Len = bufs.Len()
234         st.PullQueue = getWorkQueueStatus(pullq)
235         st.TrashQueue = getWorkQueueStatus(trashq)
236         runtime.ReadMemStats(&st.Memory)
237 }
238
239 // return a WorkQueueStatus for the given queue. If q is nil (which
240 // should never happen except in test suites), return a zero status
241 // value instead of crashing.
242 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
243         if q == nil {
244                 // This should only happen during tests.
245                 return WorkQueueStatus{}
246         }
247         return q.Status()
248 }
249
250 // DeleteHandler processes DELETE requests.
251 //
252 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
253 // from all connected volumes.
254 //
255 // Only the Data Manager, or an Arvados admin with scope "all", are
256 // allowed to issue DELETE requests.  If a DELETE request is not
257 // authenticated or is issued by a non-admin user, the server returns
258 // a PermissionError.
259 //
260 // Upon receiving a valid request from an authorized user,
261 // DeleteHandler deletes all copies of the specified block on local
262 // writable volumes.
263 //
264 // Response format:
265 //
266 // If the requested blocks was not found on any volume, the response
267 // code is HTTP 404 Not Found.
268 //
269 // Otherwise, the response code is 200 OK, with a response body
270 // consisting of the JSON message
271 //
272 //    {"copies_deleted":d,"copies_failed":f}
273 //
274 // where d and f are integers representing the number of blocks that
275 // were successfully and unsuccessfully deleted.
276 //
277 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
278         hash := mux.Vars(req)["hash"]
279
280         // Confirm that this user is an admin and has a token with unlimited scope.
281         var tok = GetApiToken(req)
282         if tok == "" || !CanDelete(tok) {
283                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
284                 return
285         }
286
287         if never_delete {
288                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
289                 return
290         }
291
292         // Delete copies of this block from all available volumes.
293         // Report how many blocks were successfully deleted, and how
294         // many were found on writable volumes but not deleted.
295         var result struct {
296                 Deleted int `json:"copies_deleted"`
297                 Failed  int `json:"copies_failed"`
298         }
299         for _, vol := range KeepVM.AllWritable() {
300                 if err := vol.Delete(hash); err == nil {
301                         result.Deleted++
302                 } else if os.IsNotExist(err) {
303                         continue
304                 } else {
305                         result.Failed++
306                         log.Println("DeleteHandler:", err)
307                 }
308         }
309
310         var st int
311
312         if result.Deleted == 0 && result.Failed == 0 {
313                 st = http.StatusNotFound
314         } else {
315                 st = http.StatusOK
316         }
317
318         resp.WriteHeader(st)
319
320         if st == http.StatusOK {
321                 if body, err := json.Marshal(result); err == nil {
322                         resp.Write(body)
323                 } else {
324                         log.Printf("json.Marshal: %s (result = %v)", err, result)
325                         http.Error(resp, err.Error(), 500)
326                 }
327         }
328 }
329
330 /* PullHandler processes "PUT /pull" requests for the data manager.
331    The request body is a JSON message containing a list of pull
332    requests in the following format:
333
334    [
335       {
336          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
337          "servers":[
338                         "keep0.qr1hi.arvadosapi.com:25107",
339                         "keep1.qr1hi.arvadosapi.com:25108"
340                  ]
341           },
342           {
343                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
344                  "servers":[
345                         "10.0.1.5:25107",
346                         "10.0.1.6:25107",
347                         "10.0.1.7:25108"
348                  ]
349           },
350           ...
351    ]
352
353    Each pull request in the list consists of a block locator string
354    and an ordered list of servers.  Keepstore should try to fetch the
355    block from each server in turn.
356
357    If the request has not been sent by the Data Manager, return 401
358    Unauthorized.
359
360    If the JSON unmarshalling fails, return 400 Bad Request.
361 */
362
363 type PullRequest struct {
364         Locator string   `json:"locator"`
365         Servers []string `json:"servers"`
366 }
367
368 func PullHandler(resp http.ResponseWriter, req *http.Request) {
369         // Reject unauthorized requests.
370         if !IsDataManagerToken(GetApiToken(req)) {
371                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
372                 return
373         }
374
375         // Parse the request body.
376         var pr []PullRequest
377         r := json.NewDecoder(req.Body)
378         if err := r.Decode(&pr); err != nil {
379                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
380                 return
381         }
382
383         // We have a properly formatted pull list sent from the data
384         // manager.  Report success and send the list to the pull list
385         // manager for further handling.
386         resp.WriteHeader(http.StatusOK)
387         resp.Write([]byte(
388                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
389
390         plist := list.New()
391         for _, p := range pr {
392                 plist.PushBack(p)
393         }
394         pullq.ReplaceQueue(plist)
395 }
396
397 type TrashRequest struct {
398         Locator    string `json:"locator"`
399         BlockMtime int64  `json:"block_mtime"`
400 }
401
402 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
403         // Reject unauthorized requests.
404         if !IsDataManagerToken(GetApiToken(req)) {
405                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
406                 return
407         }
408
409         // Parse the request body.
410         var trash []TrashRequest
411         r := json.NewDecoder(req.Body)
412         if err := r.Decode(&trash); err != nil {
413                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
414                 return
415         }
416
417         // We have a properly formatted trash list sent from the data
418         // manager.  Report success and send the list to the trash work
419         // queue for further handling.
420         resp.WriteHeader(http.StatusOK)
421         resp.Write([]byte(
422                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
423
424         tlist := list.New()
425         for _, t := range trash {
426                 tlist.PushBack(t)
427         }
428         trashq.ReplaceQueue(tlist)
429 }
430
431 // ==============================
432 // GetBlock and PutBlock implement lower-level code for handling
433 // blocks by rooting through volumes connected to the local machine.
434 // Once the handler has determined that system policy permits the
435 // request, it calls these methods to perform the actual operation.
436 //
437 // TODO(twp): this code would probably be better located in the
438 // VolumeManager interface. As an abstraction, the VolumeManager
439 // should be the only part of the code that cares about which volume a
440 // block is stored on, so it should be responsible for figuring out
441 // which volume to check for fetching blocks, storing blocks, etc.
442
443 // ==============================
444 // GetBlock fetches and returns the block identified by "hash".
445 //
446 // On success, GetBlock returns a byte slice with the block data, and
447 // a nil error.
448 //
449 // If the block cannot be found on any volume, returns NotFoundError.
450 //
451 // If the block found does not have the correct MD5 hash, returns
452 // DiskHashError.
453 //
454
455 func GetBlock(hash string) ([]byte, error) {
456         // Attempt to read the requested hash from a keep volume.
457         error_to_caller := NotFoundError
458
459         for _, vol := range KeepVM.AllReadable() {
460                 buf, err := vol.Get(hash)
461                 if err != nil {
462                         // IsNotExist is an expected error and may be
463                         // ignored. All other errors are logged. In
464                         // any case we continue trying to read other
465                         // volumes. If all volumes report IsNotExist,
466                         // we return a NotFoundError.
467                         if !os.IsNotExist(err) {
468                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
469                         }
470                         continue
471                 }
472                 // Check the file checksum.
473                 //
474                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
475                 if filehash != hash {
476                         // TODO: Try harder to tell a sysadmin about
477                         // this.
478                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
479                                 vol, hash, filehash)
480                         error_to_caller = DiskHashError
481                         bufs.Put(buf)
482                         continue
483                 }
484                 if error_to_caller == DiskHashError {
485                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
486                                 vol, hash)
487                 }
488                 return buf, nil
489         }
490         return nil, error_to_caller
491 }
492
493 /* PutBlock(block, hash)
494    Stores the BLOCK (identified by the content id HASH) in Keep.
495
496    The MD5 checksum of the block must be identical to the content id HASH.
497    If not, an error is returned.
498
499    PutBlock stores the BLOCK on the first Keep volume with free space.
500    A failure code is returned to the user only if all volumes fail.
501
502    On success, PutBlock returns nil.
503    On failure, it returns a KeepError with one of the following codes:
504
505    500 Collision
506           A different block with the same hash already exists on this
507           Keep server.
508    422 MD5Fail
509           The MD5 hash of the BLOCK does not match the argument HASH.
510    503 Full
511           There was not enough space left in any Keep volume to store
512           the object.
513    500 Fail
514           The object could not be stored for some other reason (e.g.
515           all writes failed). The text of the error message should
516           provide as much detail as possible.
517 */
518
519 func PutBlock(block []byte, hash string) error {
520         // Check that BLOCK's checksum matches HASH.
521         blockhash := fmt.Sprintf("%x", md5.Sum(block))
522         if blockhash != hash {
523                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
524                 return RequestHashError
525         }
526
527         // If we already have this data, it's intact on disk, and we
528         // can update its timestamp, return success. If we have
529         // different data with the same hash, return failure.
530         if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
531                 return err
532         }
533
534         // Choose a Keep volume to write to.
535         // If this volume fails, try all of the volumes in order.
536         if vol := KeepVM.NextWritable(); vol != nil {
537                 if err := vol.Put(hash, block); err == nil {
538                         return nil // success!
539                 }
540         }
541
542         writables := KeepVM.AllWritable()
543         if len(writables) == 0 {
544                 log.Print("No writable volumes.")
545                 return FullError
546         }
547
548         allFull := true
549         for _, vol := range writables {
550                 err := vol.Put(hash, block)
551                 if err == nil {
552                         return nil // success!
553                 }
554                 if err != FullError {
555                         // The volume is not full but the
556                         // write did not succeed.  Report the
557                         // error and continue trying.
558                         allFull = false
559                         log.Printf("%s: Write(%s): %s", vol, hash, err)
560                 }
561         }
562
563         if allFull {
564                 log.Print("All volumes are full.")
565                 return FullError
566         } else {
567                 // Already logged the non-full errors.
568                 return GenericError
569         }
570 }
571
572 // CompareAndTouch returns nil if one of the volumes already has the
573 // given content and it successfully updates the relevant block's
574 // modification time in order to protect it from premature garbage
575 // collection.
576 func CompareAndTouch(hash string, buf []byte) error {
577         var bestErr error = NotFoundError
578         for _, vol := range KeepVM.AllWritable() {
579                 if err := vol.Compare(hash, buf); err == CollisionError {
580                         // Stop if we have a block with same hash but
581                         // different content. (It will be impossible
582                         // to tell which one is wanted if we have
583                         // both, so there's no point writing it even
584                         // on a different volume.)
585                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
586                         return err
587                 } else if os.IsNotExist(err) {
588                         // Block does not exist. This is the only
589                         // "normal" error: we don't log anything.
590                         continue
591                 } else if err != nil {
592                         // Couldn't open file, data is corrupt on
593                         // disk, etc.: log this abnormal condition,
594                         // and try the next volume.
595                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
596                         continue
597                 }
598                 if err := vol.Touch(hash); err != nil {
599                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
600                         bestErr = err
601                         continue
602                 }
603                 // Compare and Touch both worked --> done.
604                 return nil
605         }
606         return bestErr
607 }
608
609 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
610
611 // IsValidLocator
612 //     Return true if the specified string is a valid Keep locator.
613 //     When Keep is extended to support hash types other than MD5,
614 //     this should be updated to cover those as well.
615 //
616 func IsValidLocator(loc string) bool {
617         return validLocatorRe.MatchString(loc)
618 }
619
620 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
621
622 // GetApiToken returns the OAuth2 token from the Authorization
623 // header of a HTTP request, or an empty string if no matching
624 // token is found.
625 func GetApiToken(req *http.Request) string {
626         if auth, ok := req.Header["Authorization"]; ok {
627                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
628                         return match[1]
629                 }
630         }
631         return ""
632 }
633
634 // IsExpired returns true if the given Unix timestamp (expressed as a
635 // hexadecimal string) is in the past, or if timestamp_hex cannot be
636 // parsed as a hexadecimal string.
637 func IsExpired(timestamp_hex string) bool {
638         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
639         if err != nil {
640                 log.Printf("IsExpired: %s", err)
641                 return true
642         }
643         return time.Unix(ts, 0).Before(time.Now())
644 }
645
646 // CanDelete returns true if the user identified by api_token is
647 // allowed to delete blocks.
648 func CanDelete(api_token string) bool {
649         if api_token == "" {
650                 return false
651         }
652         // Blocks may be deleted only when Keep has been configured with a
653         // data manager.
654         if IsDataManagerToken(api_token) {
655                 return true
656         }
657         // TODO(twp): look up api_token with the API server
658         // return true if is_admin is true and if the token
659         // has unlimited scope
660         return false
661 }
662
663 // IsDataManagerToken returns true if api_token represents the data
664 // manager's token.
665 func IsDataManagerToken(api_token string) bool {
666         return data_manager_token != "" && api_token == data_manager_token
667 }