1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "git.arvados.org/arvados.git/sdk/go/health"
26 "git.arvados.org/arvados.git/sdk/go/httpserver"
27 "github.com/gorilla/mux"
28 "github.com/prometheus/client_golang/prometheus"
29 "github.com/sirupsen/logrus"
34 cluster *arvados.Cluster
35 logger logrus.FieldLogger
36 remoteProxy remoteProxy
38 volmgr *RRVolumeManager
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
47 Router: mux.NewRouter(),
49 logger: ctxlog.FromContext(ctx),
50 metrics: &nodeMetrics{reg: reg},
57 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
59 `/{hash:[0-9a-f]{32}}+{hints}`,
60 rtr.handleGET).Methods("GET", "HEAD")
62 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
63 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
64 // List all blocks stored here. Privileged client only.
65 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
66 // List blocks stored here whose hash has the given prefix.
67 // Privileged client only.
68 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
69 // Update timestamp on existing block. Privileged client only.
70 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
72 // Internals/debugging info (runtime.MemStats)
73 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
75 // List volumes: path, device number, bytes used/avail.
76 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
78 // List mounts: UUID, readonly, tier, device ID, ...
79 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
80 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
81 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
83 // Replace the current pull queue.
84 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
86 // Replace the current trash queue.
87 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
89 // Untrash moves blocks from trash back into store
90 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
92 rtr.Handle("/_health/{check}", &health.Handler{
93 Token: cluster.ManagementToken,
97 // Any request which does not match any of these routes gets
99 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
101 rtr.metrics.setupBufferPoolMetrics(bufs)
102 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
103 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
108 // BadRequestHandler is a HandleFunc to address bad requests.
109 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
110 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
113 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
114 ctx, cancel := contextForResponse(context.TODO(), resp)
117 locator := req.URL.Path[1:]
118 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
119 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
123 if rtr.cluster.Collections.BlobSigning {
124 locator := req.URL.Path[1:] // strip leading slash
125 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
126 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
131 // TODO: Probe volumes to check whether the block _might_
132 // exist. Some volumes/types could support a quick existence
133 // check without causing other operations to suffer. If all
134 // volumes support that, and assure us the block definitely
135 // isn't here, we can return 404 now instead of waiting for a
138 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
140 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
145 size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
147 code := http.StatusInternalServerError
148 if err, ok := err.(*KeepError); ok {
151 http.Error(resp, err.Error(), code)
155 resp.Header().Set("Content-Length", strconv.Itoa(size))
156 resp.Header().Set("Content-Type", "application/octet-stream")
157 resp.Write(buf[:size])
160 // Return a new context that gets cancelled by resp's CloseNotifier.
161 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
162 ctx, cancel := context.WithCancel(parent)
163 if cn, ok := resp.(http.CloseNotifier); ok {
164 go func(c <-chan bool) {
175 // Get a buffer from the pool -- but give up and return a non-nil
176 // error if ctx ends before we get a buffer.
177 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
178 bufReady := make(chan []byte)
180 bufReady <- bufs.Get(bufSize)
183 case buf := <-bufReady:
187 // Even if closeNotifier happened first, we
188 // need to keep waiting for our buf so we can
189 // return it to the pool.
192 return nil, ErrClientDisconnect
196 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
197 if !rtr.isSystemAuth(GetAPIToken(req)) {
198 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
201 hash := mux.Vars(req)["hash"]
202 vols := rtr.volmgr.AllWritable()
204 http.Error(resp, "no volumes", http.StatusNotFound)
208 for _, mnt := range vols {
209 err = mnt.Touch(hash)
217 case os.IsNotExist(err):
218 http.Error(resp, err.Error(), http.StatusNotFound)
220 http.Error(resp, err.Error(), http.StatusInternalServerError)
224 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
225 ctx, cancel := contextForResponse(context.TODO(), resp)
228 hash := mux.Vars(req)["hash"]
230 // Detect as many error conditions as possible before reading
231 // the body: avoid transmitting data that will not end up
232 // being written anyway.
234 if req.ContentLength == -1 {
235 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
239 if req.ContentLength > BlockSize {
240 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
244 if len(rtr.volmgr.AllWritable()) == 0 {
245 http.Error(resp, FullError.Error(), FullError.HTTPCode)
249 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
251 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
255 _, err = io.ReadFull(req.Body, buf)
257 http.Error(resp, err.Error(), 500)
262 result, err := PutBlock(ctx, rtr.volmgr, buf, hash)
266 code := http.StatusInternalServerError
267 if err, ok := err.(*KeepError); ok {
270 http.Error(resp, err.Error(), code)
274 // Success; add a size hint, sign the locator if possible, and
275 // return it to the client.
276 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
277 apiToken := GetAPIToken(req)
278 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
279 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
280 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
282 resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
283 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
284 resp.Write([]byte(returnHash + "\n"))
287 // IndexHandler responds to "/index", "/index/{prefix}", and
288 // "/mounts/{uuid}/blocks" requests.
289 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
290 if !rtr.isSystemAuth(GetAPIToken(req)) {
291 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
295 prefix := mux.Vars(req)["prefix"]
298 prefix = req.Form.Get("prefix")
301 uuid := mux.Vars(req)["uuid"]
303 var vols []*VolumeMount
305 vols = rtr.volmgr.AllReadable()
306 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
307 http.Error(resp, "mount not found", http.StatusNotFound)
310 vols = []*VolumeMount{mnt}
313 for _, v := range vols {
314 if err := v.IndexTo(prefix, resp); err != nil {
315 // We can't send an error status/message to
316 // the client because IndexTo() might have
317 // already written body content. All we can do
318 // is log the error in our own logs.
320 // The client must notice the lack of trailing
321 // newline as an indication that the response
323 ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
327 // An empty line at EOF is the only way the client can be
328 // assured the entire index was received.
329 resp.Write([]byte{'\n'})
332 // MountsHandler responds to "GET /mounts" requests.
333 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
334 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
336 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
341 type PoolStatus struct {
342 Alloc uint64 `json:"BytesAllocatedCumulative"`
343 Cap int `json:"BuffersMax"`
344 Len int `json:"BuffersInUse"`
347 type volumeStatusEnt struct {
349 Status *VolumeStatus `json:",omitempty"`
350 VolumeStats *ioStats `json:",omitempty"`
351 InternalStats interface{} `json:",omitempty"`
355 type NodeStatus struct {
356 Volumes []*volumeStatusEnt
357 BufferPool PoolStatus
358 PullQueue WorkQueueStatus
359 TrashQueue WorkQueueStatus
366 var stLock sync.Mutex
368 // DebugHandler addresses /debug.json requests.
369 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
370 type debugStats struct {
371 MemStats runtime.MemStats
374 runtime.ReadMemStats(&ds.MemStats)
375 data, err := json.Marshal(&ds)
377 http.Error(resp, err.Error(), http.StatusInternalServerError)
383 // StatusHandler addresses /status.json requests.
384 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
386 rtr.readNodeStatus(&st)
387 data, err := json.Marshal(&st)
390 http.Error(resp, err.Error(), http.StatusInternalServerError)
396 // populate the given NodeStatus struct with current values.
397 func (rtr *router) readNodeStatus(st *NodeStatus) {
399 vols := rtr.volmgr.AllReadable()
400 if cap(st.Volumes) < len(vols) {
401 st.Volumes = make([]*volumeStatusEnt, len(vols))
403 st.Volumes = st.Volumes[:0]
404 for _, vol := range vols {
405 var internalStats interface{}
406 if vol, ok := vol.Volume.(InternalStatser); ok {
407 internalStats = vol.InternalStats()
409 st.Volumes = append(st.Volumes, &volumeStatusEnt{
411 Status: vol.Status(),
412 InternalStats: internalStats,
413 //VolumeStats: rtr.volmgr.VolumeStats(vol),
416 st.BufferPool.Alloc = bufs.Alloc()
417 st.BufferPool.Cap = bufs.Cap()
418 st.BufferPool.Len = bufs.Len()
419 st.PullQueue = getWorkQueueStatus(rtr.pullq)
420 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
423 // return a WorkQueueStatus for the given queue. If q is nil (which
424 // should never happen except in test suites), return a zero status
425 // value instead of crashing.
426 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
428 // This should only happen during tests.
429 return WorkQueueStatus{}
434 // handleDELETE processes DELETE requests.
436 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
437 // from all connected volumes.
439 // Only the Data Manager, or an Arvados admin with scope "all", are
440 // allowed to issue DELETE requests. If a DELETE request is not
441 // authenticated or is issued by a non-admin user, the server returns
442 // a PermissionError.
444 // Upon receiving a valid request from an authorized user,
445 // handleDELETE deletes all copies of the specified block on local
450 // If the requested blocks was not found on any volume, the response
451 // code is HTTP 404 Not Found.
453 // Otherwise, the response code is 200 OK, with a response body
454 // consisting of the JSON message
456 // {"copies_deleted":d,"copies_failed":f}
458 // where d and f are integers representing the number of blocks that
459 // were successfully and unsuccessfully deleted.
461 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
462 hash := mux.Vars(req)["hash"]
464 // Confirm that this user is an admin and has a token with unlimited scope.
465 var tok = GetAPIToken(req)
466 if tok == "" || !rtr.canDelete(tok) {
467 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
471 if !rtr.cluster.Collections.BlobTrash {
472 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
476 // Delete copies of this block from all available volumes.
477 // Report how many blocks were successfully deleted, and how
478 // many were found on writable volumes but not deleted.
480 Deleted int `json:"copies_deleted"`
481 Failed int `json:"copies_failed"`
483 for _, vol := range rtr.volmgr.AllWritable() {
484 if err := vol.Trash(hash); err == nil {
486 } else if os.IsNotExist(err) {
490 ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
493 if result.Deleted == 0 && result.Failed == 0 {
494 resp.WriteHeader(http.StatusNotFound)
497 body, err := json.Marshal(result)
499 http.Error(resp, err.Error(), http.StatusInternalServerError)
505 /* PullHandler processes "PUT /pull" requests for the data manager.
506 The request body is a JSON message containing a list of pull
507 requests in the following format:
511 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
513 "keep0.qr1hi.arvadosapi.com:25107",
514 "keep1.qr1hi.arvadosapi.com:25108"
518 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
528 Each pull request in the list consists of a block locator string
529 and an ordered list of servers. Keepstore should try to fetch the
530 block from each server in turn.
532 If the request has not been sent by the Data Manager, return 401
535 If the JSON unmarshalling fails, return 400 Bad Request.
538 // PullRequest consists of a block locator and an ordered list of servers
539 type PullRequest struct {
540 Locator string `json:"locator"`
541 Servers []string `json:"servers"`
543 // Destination mount, or "" for "anywhere"
544 MountUUID string `json:"mount_uuid"`
547 // PullHandler processes "PUT /pull" requests for the data manager.
548 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
549 // Reject unauthorized requests.
550 if !rtr.isSystemAuth(GetAPIToken(req)) {
551 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
555 // Parse the request body.
557 r := json.NewDecoder(req.Body)
558 if err := r.Decode(&pr); err != nil {
559 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
563 // We have a properly formatted pull list sent from the data
564 // manager. Report success and send the list to the pull list
565 // manager for further handling.
566 resp.WriteHeader(http.StatusOK)
568 fmt.Sprintf("Received %d pull requests\n", len(pr))))
571 for _, p := range pr {
574 rtr.pullq.ReplaceQueue(plist)
577 // TrashRequest consists of a block locator and its Mtime
578 type TrashRequest struct {
579 Locator string `json:"locator"`
580 BlockMtime int64 `json:"block_mtime"`
582 // Target mount, or "" for "everywhere"
583 MountUUID string `json:"mount_uuid"`
586 // TrashHandler processes /trash requests.
587 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
588 // Reject unauthorized requests.
589 if !rtr.isSystemAuth(GetAPIToken(req)) {
590 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
594 // Parse the request body.
595 var trash []TrashRequest
596 r := json.NewDecoder(req.Body)
597 if err := r.Decode(&trash); err != nil {
598 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
602 // We have a properly formatted trash list sent from the data
603 // manager. Report success and send the list to the trash work
604 // queue for further handling.
605 resp.WriteHeader(http.StatusOK)
607 fmt.Sprintf("Received %d trash requests\n", len(trash))))
610 for _, t := range trash {
613 rtr.trashq.ReplaceQueue(tlist)
616 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
617 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
618 // Reject unauthorized requests.
619 if !rtr.isSystemAuth(GetAPIToken(req)) {
620 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
624 log := ctxlog.FromContext(req.Context())
625 hash := mux.Vars(req)["hash"]
627 if len(rtr.volmgr.AllWritable()) == 0 {
628 http.Error(resp, "No writable volumes", http.StatusNotFound)
632 var untrashedOn, failedOn []string
634 for _, vol := range rtr.volmgr.AllWritable() {
635 err := vol.Untrash(hash)
637 if os.IsNotExist(err) {
639 } else if err != nil {
640 log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
641 failedOn = append(failedOn, vol.String())
643 log.Infof("Untrashed %v on volume %v", hash, vol.String())
644 untrashedOn = append(untrashedOn, vol.String())
648 if numNotFound == len(rtr.volmgr.AllWritable()) {
649 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
650 } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
651 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
653 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
654 if len(failedOn) > 0 {
655 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
656 http.Error(resp, respBody, http.StatusInternalServerError)
658 fmt.Fprintln(resp, respBody)
663 // GetBlock and PutBlock implement lower-level code for handling
664 // blocks by rooting through volumes connected to the local machine.
665 // Once the handler has determined that system policy permits the
666 // request, it calls these methods to perform the actual operation.
668 // TODO(twp): this code would probably be better located in the
669 // VolumeManager interface. As an abstraction, the VolumeManager
670 // should be the only part of the code that cares about which volume a
671 // block is stored on, so it should be responsible for figuring out
672 // which volume to check for fetching blocks, storing blocks, etc.
674 // GetBlock fetches the block identified by "hash" into the provided
675 // buf, and returns the data size.
677 // If the block cannot be found on any volume, returns NotFoundError.
679 // If the block found does not have the correct MD5 hash, returns
682 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
683 log := ctxlog.FromContext(ctx)
685 // Attempt to read the requested hash from a keep volume.
686 errorToCaller := NotFoundError
688 for _, vol := range volmgr.AllReadable() {
689 size, err := vol.Get(ctx, hash, buf)
692 return 0, ErrClientDisconnect
696 // IsNotExist is an expected error and may be
697 // ignored. All other errors are logged. In
698 // any case we continue trying to read other
699 // volumes. If all volumes report IsNotExist,
700 // we return a NotFoundError.
701 if !os.IsNotExist(err) {
702 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
704 // If some volume returns a transient error, return it to the caller
705 // instead of "Not found" so it can retry.
706 if err == VolumeBusyError {
707 errorToCaller = err.(*KeepError)
711 // Check the file checksum.
712 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
713 if filehash != hash {
714 // TODO: Try harder to tell a sysadmin about
716 log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
717 errorToCaller = DiskHashError
720 if errorToCaller == DiskHashError {
721 log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
725 return 0, errorToCaller
728 type putResult struct {
730 classReplication map[string]int
733 // Number of distinct replicas stored. "2" can mean the block was
734 // stored on 2 different volumes with replication 1, or on 1 volume
735 // with replication 2.
736 func (pr putResult) TotalReplication() string {
737 return strconv.Itoa(pr.totalReplication)
740 // Number of replicas satisfying each storage class, formatted like
741 // "default=2; special=1".
742 func (pr putResult) ClassReplication() string {
744 for k, v := range pr.classReplication {
748 s += k + "=" + strconv.Itoa(v)
753 func newPutResult(mnt *VolumeMount) putResult {
755 totalReplication: mnt.Replication,
756 classReplication: map[string]int{},
758 for class := range mnt.StorageClasses {
759 result.classReplication[class] += mnt.Replication
764 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
766 // PutBlock(ctx, block, hash)
767 // Stores the BLOCK (identified by the content id HASH) in Keep.
769 // The MD5 checksum of the block must be identical to the content id HASH.
770 // If not, an error is returned.
772 // PutBlock stores the BLOCK on the first Keep volume with free space.
773 // A failure code is returned to the user only if all volumes fail.
775 // On success, PutBlock returns nil.
776 // On failure, it returns a KeepError with one of the following codes:
779 // A different block with the same hash already exists on this
782 // The MD5 hash of the BLOCK does not match the argument HASH.
784 // There was not enough space left in any Keep volume to store
787 // The object could not be stored for some other reason (e.g.
788 // all writes failed). The text of the error message should
789 // provide as much detail as possible.
791 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) {
792 log := ctxlog.FromContext(ctx)
794 // Check that BLOCK's checksum matches HASH.
795 blockhash := fmt.Sprintf("%x", md5.Sum(block))
796 if blockhash != hash {
797 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
798 return putResult{}, RequestHashError
801 // If we already have this data, it's intact on disk, and we
802 // can update its timestamp, return success. If we have
803 // different data with the same hash, return failure.
804 if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
806 } else if ctx.Err() != nil {
807 return putResult{}, ErrClientDisconnect
810 // Choose a Keep volume to write to.
811 // If this volume fails, try all of the volumes in order.
812 if mnt := volmgr.NextWritable(); mnt != nil {
813 if err := mnt.Put(ctx, hash, block); err != nil {
814 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
816 return newPutResult(mnt), nil // success!
819 if ctx.Err() != nil {
820 return putResult{}, ErrClientDisconnect
823 writables := volmgr.AllWritable()
824 if len(writables) == 0 {
825 log.Error("no writable volumes")
826 return putResult{}, FullError
830 for _, mnt := range writables {
831 err := mnt.Put(ctx, hash, block)
832 if ctx.Err() != nil {
833 return putResult{}, ErrClientDisconnect
837 return newPutResult(mnt), nil // success!
841 // The volume is not full but the
842 // write did not succeed. Report the
843 // error and continue trying.
845 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
850 log.Error("all volumes are full")
851 return putResult{}, FullError
853 // Already logged the non-full errors.
854 return putResult{}, GenericError
857 // CompareAndTouch returns the current replication level if one of the
858 // volumes already has the given content and it successfully updates
859 // the relevant block's modification time in order to protect it from
860 // premature garbage collection. Otherwise, it returns a non-nil
862 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) {
863 log := ctxlog.FromContext(ctx)
864 var bestErr error = NotFoundError
865 for _, mnt := range volmgr.AllWritable() {
866 err := mnt.Compare(ctx, hash, buf)
867 if ctx.Err() != nil {
868 return putResult{}, ctx.Err()
869 } else if err == CollisionError {
870 // Stop if we have a block with same hash but
871 // different content. (It will be impossible
872 // to tell which one is wanted if we have
873 // both, so there's no point writing it even
874 // on a different volume.)
875 log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
876 return putResult{}, err
877 } else if os.IsNotExist(err) {
878 // Block does not exist. This is the only
879 // "normal" error: we don't log anything.
881 } else if err != nil {
882 // Couldn't open file, data is corrupt on
883 // disk, etc.: log this abnormal condition,
884 // and try the next volume.
885 log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
888 if err := mnt.Touch(hash); err != nil {
889 log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
893 // Compare and Touch both worked --> done.
894 return newPutResult(mnt), nil
896 return putResult{}, bestErr
899 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
901 // IsValidLocator returns true if the specified string is a valid Keep locator.
902 // When Keep is extended to support hash types other than MD5,
903 // this should be updated to cover those as well.
905 func IsValidLocator(loc string) bool {
906 return validLocatorRe.MatchString(loc)
909 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
911 // GetAPIToken returns the OAuth2 token from the Authorization
912 // header of a HTTP request, or an empty string if no matching
914 func GetAPIToken(req *http.Request) string {
915 if auth, ok := req.Header["Authorization"]; ok {
916 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
923 // canDelete returns true if the user identified by apiToken is
924 // allowed to delete blocks.
925 func (rtr *router) canDelete(apiToken string) bool {
929 // Blocks may be deleted only when Keep has been configured with a
931 if rtr.isSystemAuth(apiToken) {
934 // TODO(twp): look up apiToken with the API server
935 // return true if is_admin is true and if the token
936 // has unlimited scope
940 // isSystemAuth returns true if the given token is allowed to perform
941 // system level actions like deleting data.
942 func (rtr *router) isSystemAuth(token string) bool {
943 return token != "" && token == rtr.cluster.SystemRootToken