1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
24 "git.arvados.org/arvados.git/lib/cmd"
25 "git.arvados.org/arvados.git/sdk/go/arvados"
26 "git.arvados.org/arvados.git/sdk/go/ctxlog"
27 "git.arvados.org/arvados.git/sdk/go/health"
28 "git.arvados.org/arvados.git/sdk/go/httpserver"
29 "github.com/gorilla/mux"
30 "github.com/prometheus/client_golang/prometheus"
31 "github.com/sirupsen/logrus"
36 cluster *arvados.Cluster
37 logger logrus.FieldLogger
38 remoteProxy remoteProxy
40 volmgr *RRVolumeManager
45 // MakeRESTRouter returns a new router that forwards all Keep requests
46 // to the appropriate handlers.
47 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
49 Router: mux.NewRouter(),
51 logger: ctxlog.FromContext(ctx),
52 metrics: &nodeMetrics{reg: reg},
59 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
61 `/{hash:[0-9a-f]{32}}+{hints}`,
62 rtr.handleGET).Methods("GET", "HEAD")
64 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
65 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
66 // List all blocks stored here. Privileged client only.
67 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
68 // List blocks stored here whose hash has the given prefix.
69 // Privileged client only.
70 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
71 // Update timestamp on existing block. Privileged client only.
72 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
74 // Internals/debugging info (runtime.MemStats)
75 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
77 // List volumes: path, device number, bytes used/avail.
78 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
80 // List mounts: UUID, readonly, tier, device ID, ...
81 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
82 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
83 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
85 // Replace the current pull queue.
86 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
88 // Replace the current trash queue.
89 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
91 // Untrash moves blocks from trash back into store
92 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
94 rtr.Handle("/_health/{check}", &health.Handler{
95 Token: cluster.ManagementToken,
99 // Any request which does not match any of these routes gets
101 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
103 rtr.metrics.setupBufferPoolMetrics(bufs)
104 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
105 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
110 // BadRequestHandler is a HandleFunc to address bad requests.
111 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
112 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
115 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
116 locator := req.URL.Path[1:]
117 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
118 rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
122 if rtr.cluster.Collections.BlobSigning {
123 locator := req.URL.Path[1:] // strip leading slash
124 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
125 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
130 // TODO: Probe volumes to check whether the block _might_
131 // exist. Some volumes/types could support a quick existence
132 // check without causing other operations to suffer. If all
133 // volumes support that, and assure us the block definitely
134 // isn't here, we can return 404 now instead of waiting for a
137 buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
139 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
144 size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
146 code := http.StatusInternalServerError
147 if err, ok := err.(*KeepError); ok {
150 http.Error(resp, err.Error(), code)
154 resp.Header().Set("Content-Length", strconv.Itoa(size))
155 resp.Header().Set("Content-Type", "application/octet-stream")
156 resp.Write(buf[:size])
159 // Get a buffer from the pool -- but give up and return a non-nil
160 // error if ctx ends before we get a buffer.
161 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
162 bufReady := make(chan []byte)
164 bufReady <- bufs.Get(bufSize)
167 case buf := <-bufReady:
171 // Even if closeNotifier happened first, we
172 // need to keep waiting for our buf so we can
173 // return it to the pool.
176 return nil, ErrClientDisconnect
180 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
181 if !rtr.isSystemAuth(GetAPIToken(req)) {
182 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
185 hash := mux.Vars(req)["hash"]
186 vols := rtr.volmgr.AllWritable()
188 http.Error(resp, "no volumes", http.StatusNotFound)
192 for _, mnt := range vols {
193 err = mnt.Touch(hash)
201 case os.IsNotExist(err):
202 http.Error(resp, err.Error(), http.StatusNotFound)
204 http.Error(resp, err.Error(), http.StatusInternalServerError)
208 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
209 hash := mux.Vars(req)["hash"]
211 // Detect as many error conditions as possible before reading
212 // the body: avoid transmitting data that will not end up
213 // being written anyway.
215 if req.ContentLength == -1 {
216 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
220 if req.ContentLength > BlockSize {
221 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
225 if len(rtr.volmgr.AllWritable()) == 0 {
226 http.Error(resp, FullError.Error(), FullError.HTTPCode)
230 var wantStorageClasses []string
231 if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
232 wantStorageClasses = strings.Split(hdr, ",")
233 for i, sc := range wantStorageClasses {
234 wantStorageClasses[i] = strings.TrimSpace(sc)
237 // none specified -- use configured default
238 for class, cfg := range rtr.cluster.StorageClasses {
240 wantStorageClasses = append(wantStorageClasses, class)
245 buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
247 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
251 _, err = io.ReadFull(req.Body, buf)
253 http.Error(resp, err.Error(), 500)
258 result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
262 code := http.StatusInternalServerError
263 if err, ok := err.(*KeepError); ok {
266 http.Error(resp, err.Error(), code)
270 // Success; add a size hint, sign the locator if possible, and
271 // return it to the client.
272 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
273 apiToken := GetAPIToken(req)
274 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
275 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
276 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
278 resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
279 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
280 resp.Write([]byte(returnHash + "\n"))
283 // IndexHandler responds to "/index", "/index/{prefix}", and
284 // "/mounts/{uuid}/blocks" requests.
285 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
286 if !rtr.isSystemAuth(GetAPIToken(req)) {
287 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
291 prefix := mux.Vars(req)["prefix"]
294 prefix = req.Form.Get("prefix")
297 uuid := mux.Vars(req)["uuid"]
299 var vols []*VolumeMount
301 vols = rtr.volmgr.AllReadable()
302 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
303 http.Error(resp, "mount not found", http.StatusNotFound)
306 vols = []*VolumeMount{mnt}
309 for _, v := range vols {
310 if err := v.IndexTo(prefix, resp); err != nil {
311 // We can't send an error status/message to
312 // the client because IndexTo() might have
313 // already written body content. All we can do
314 // is log the error in our own logs.
316 // The client must notice the lack of trailing
317 // newline as an indication that the response
319 ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
323 // An empty line at EOF is the only way the client can be
324 // assured the entire index was received.
325 resp.Write([]byte{'\n'})
328 // MountsHandler responds to "GET /mounts" requests.
329 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
330 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
332 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
337 type PoolStatus struct {
338 Alloc uint64 `json:"BytesAllocatedCumulative"`
339 Cap int `json:"BuffersMax"`
340 Len int `json:"BuffersInUse"`
343 type volumeStatusEnt struct {
345 Status *VolumeStatus `json:",omitempty"`
346 VolumeStats *ioStats `json:",omitempty"`
347 InternalStats interface{} `json:",omitempty"`
351 type NodeStatus struct {
352 Volumes []*volumeStatusEnt
353 BufferPool PoolStatus
354 PullQueue WorkQueueStatus
355 TrashQueue WorkQueueStatus
362 var stLock sync.Mutex
364 // DebugHandler addresses /debug.json requests.
365 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
366 type debugStats struct {
367 MemStats runtime.MemStats
370 runtime.ReadMemStats(&ds.MemStats)
371 data, err := json.Marshal(&ds)
373 http.Error(resp, err.Error(), http.StatusInternalServerError)
379 // StatusHandler addresses /status.json requests.
380 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
382 rtr.readNodeStatus(&st)
383 data, err := json.Marshal(&st)
386 http.Error(resp, err.Error(), http.StatusInternalServerError)
392 // populate the given NodeStatus struct with current values.
393 func (rtr *router) readNodeStatus(st *NodeStatus) {
394 st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
395 vols := rtr.volmgr.AllReadable()
396 if cap(st.Volumes) < len(vols) {
397 st.Volumes = make([]*volumeStatusEnt, len(vols))
399 st.Volumes = st.Volumes[:0]
400 for _, vol := range vols {
401 var internalStats interface{}
402 if vol, ok := vol.Volume.(InternalStatser); ok {
403 internalStats = vol.InternalStats()
405 st.Volumes = append(st.Volumes, &volumeStatusEnt{
407 Status: vol.Status(),
408 InternalStats: internalStats,
409 //VolumeStats: rtr.volmgr.VolumeStats(vol),
412 st.BufferPool.Alloc = bufs.Alloc()
413 st.BufferPool.Cap = bufs.Cap()
414 st.BufferPool.Len = bufs.Len()
415 st.PullQueue = getWorkQueueStatus(rtr.pullq)
416 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
419 // return a WorkQueueStatus for the given queue. If q is nil (which
420 // should never happen except in test suites), return a zero status
421 // value instead of crashing.
422 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
424 // This should only happen during tests.
425 return WorkQueueStatus{}
430 // handleDELETE processes DELETE requests.
432 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
433 // from all connected volumes.
435 // Only the Data Manager, or an Arvados admin with scope "all", are
436 // allowed to issue DELETE requests. If a DELETE request is not
437 // authenticated or is issued by a non-admin user, the server returns
438 // a PermissionError.
440 // Upon receiving a valid request from an authorized user,
441 // handleDELETE deletes all copies of the specified block on local
446 // If the requested blocks was not found on any volume, the response
447 // code is HTTP 404 Not Found.
449 // Otherwise, the response code is 200 OK, with a response body
450 // consisting of the JSON message
452 // {"copies_deleted":d,"copies_failed":f}
454 // where d and f are integers representing the number of blocks that
455 // were successfully and unsuccessfully deleted.
457 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
458 hash := mux.Vars(req)["hash"]
460 // Confirm that this user is an admin and has a token with unlimited scope.
461 var tok = GetAPIToken(req)
462 if tok == "" || !rtr.canDelete(tok) {
463 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
467 if !rtr.cluster.Collections.BlobTrash {
468 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
472 // Delete copies of this block from all available volumes.
473 // Report how many blocks were successfully deleted, and how
474 // many were found on writable volumes but not deleted.
476 Deleted int `json:"copies_deleted"`
477 Failed int `json:"copies_failed"`
479 for _, vol := range rtr.volmgr.AllWritable() {
480 if err := vol.Trash(hash); err == nil {
482 } else if os.IsNotExist(err) {
486 ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
489 if result.Deleted == 0 && result.Failed == 0 {
490 resp.WriteHeader(http.StatusNotFound)
493 body, err := json.Marshal(result)
495 http.Error(resp, err.Error(), http.StatusInternalServerError)
501 /* PullHandler processes "PUT /pull" requests for the data manager.
502 The request body is a JSON message containing a list of pull
503 requests in the following format:
507 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
509 "keep0.qr1hi.arvadosapi.com:25107",
510 "keep1.qr1hi.arvadosapi.com:25108"
514 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
524 Each pull request in the list consists of a block locator string
525 and an ordered list of servers. Keepstore should try to fetch the
526 block from each server in turn.
528 If the request has not been sent by the Data Manager, return 401
531 If the JSON unmarshalling fails, return 400 Bad Request.
534 // PullRequest consists of a block locator and an ordered list of servers
535 type PullRequest struct {
536 Locator string `json:"locator"`
537 Servers []string `json:"servers"`
539 // Destination mount, or "" for "anywhere"
540 MountUUID string `json:"mount_uuid"`
543 // PullHandler processes "PUT /pull" requests for the data manager.
544 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
545 // Reject unauthorized requests.
546 if !rtr.isSystemAuth(GetAPIToken(req)) {
547 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
551 // Parse the request body.
553 r := json.NewDecoder(req.Body)
554 if err := r.Decode(&pr); err != nil {
555 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
559 // We have a properly formatted pull list sent from the data
560 // manager. Report success and send the list to the pull list
561 // manager for further handling.
562 resp.WriteHeader(http.StatusOK)
564 fmt.Sprintf("Received %d pull requests\n", len(pr))))
567 for _, p := range pr {
570 rtr.pullq.ReplaceQueue(plist)
573 // TrashRequest consists of a block locator and its Mtime
574 type TrashRequest struct {
575 Locator string `json:"locator"`
576 BlockMtime int64 `json:"block_mtime"`
578 // Target mount, or "" for "everywhere"
579 MountUUID string `json:"mount_uuid"`
582 // TrashHandler processes /trash requests.
583 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
584 // Reject unauthorized requests.
585 if !rtr.isSystemAuth(GetAPIToken(req)) {
586 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
590 // Parse the request body.
591 var trash []TrashRequest
592 r := json.NewDecoder(req.Body)
593 if err := r.Decode(&trash); err != nil {
594 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
598 // We have a properly formatted trash list sent from the data
599 // manager. Report success and send the list to the trash work
600 // queue for further handling.
601 resp.WriteHeader(http.StatusOK)
603 fmt.Sprintf("Received %d trash requests\n", len(trash))))
606 for _, t := range trash {
609 rtr.trashq.ReplaceQueue(tlist)
612 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
613 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
614 // Reject unauthorized requests.
615 if !rtr.isSystemAuth(GetAPIToken(req)) {
616 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
620 log := ctxlog.FromContext(req.Context())
621 hash := mux.Vars(req)["hash"]
623 if len(rtr.volmgr.AllWritable()) == 0 {
624 http.Error(resp, "No writable volumes", http.StatusNotFound)
628 var untrashedOn, failedOn []string
630 for _, vol := range rtr.volmgr.AllWritable() {
631 err := vol.Untrash(hash)
633 if os.IsNotExist(err) {
635 } else if err != nil {
636 log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
637 failedOn = append(failedOn, vol.String())
639 log.Infof("Untrashed %v on volume %v", hash, vol.String())
640 untrashedOn = append(untrashedOn, vol.String())
644 if numNotFound == len(rtr.volmgr.AllWritable()) {
645 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
646 } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
647 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
649 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
650 if len(failedOn) > 0 {
651 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
652 http.Error(resp, respBody, http.StatusInternalServerError)
654 fmt.Fprintln(resp, respBody)
659 // GetBlock and PutBlock implement lower-level code for handling
660 // blocks by rooting through volumes connected to the local machine.
661 // Once the handler has determined that system policy permits the
662 // request, it calls these methods to perform the actual operation.
664 // TODO(twp): this code would probably be better located in the
665 // VolumeManager interface. As an abstraction, the VolumeManager
666 // should be the only part of the code that cares about which volume a
667 // block is stored on, so it should be responsible for figuring out
668 // which volume to check for fetching blocks, storing blocks, etc.
670 // GetBlock fetches the block identified by "hash" into the provided
671 // buf, and returns the data size.
673 // If the block cannot be found on any volume, returns NotFoundError.
675 // If the block found does not have the correct MD5 hash, returns
678 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
679 log := ctxlog.FromContext(ctx)
681 // Attempt to read the requested hash from a keep volume.
682 errorToCaller := NotFoundError
684 for _, vol := range volmgr.AllReadable() {
685 size, err := vol.Get(ctx, hash, buf)
688 return 0, ErrClientDisconnect
692 // IsNotExist is an expected error and may be
693 // ignored. All other errors are logged. In
694 // any case we continue trying to read other
695 // volumes. If all volumes report IsNotExist,
696 // we return a NotFoundError.
697 if !os.IsNotExist(err) {
698 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
700 // If some volume returns a transient error, return it to the caller
701 // instead of "Not found" so it can retry.
702 if err == VolumeBusyError {
703 errorToCaller = err.(*KeepError)
707 // Check the file checksum.
708 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
709 if filehash != hash {
710 // TODO: Try harder to tell a sysadmin about
712 log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
713 errorToCaller = DiskHashError
716 if errorToCaller == DiskHashError {
717 log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
721 return 0, errorToCaller
724 type putProgress struct {
725 classNeeded map[string]bool
726 classTodo map[string]bool
727 mountUsed map[*VolumeMount]bool
729 classDone map[string]int
732 // Number of distinct replicas stored. "2" can mean the block was
733 // stored on 2 different volumes with replication 1, or on 1 volume
734 // with replication 2.
735 func (pr putProgress) TotalReplication() string {
736 return strconv.Itoa(pr.totalReplication)
739 // Number of replicas satisfying each storage class, formatted like
740 // "default=2; special=1".
741 func (pr putProgress) ClassReplication() string {
743 for k, v := range pr.classDone {
747 s += k + "=" + strconv.Itoa(v)
752 func (pr *putProgress) Add(mnt *VolumeMount) {
753 if pr.mountUsed[mnt] {
754 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
757 pr.mountUsed[mnt] = true
758 pr.totalReplication += mnt.Replication
759 for class := range mnt.StorageClasses {
760 pr.classDone[class] += mnt.Replication
761 delete(pr.classTodo, class)
765 func (pr *putProgress) Sub(mnt *VolumeMount) {
766 if !pr.mountUsed[mnt] {
767 logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
770 pr.mountUsed[mnt] = false
771 pr.totalReplication -= mnt.Replication
772 for class := range mnt.StorageClasses {
773 pr.classDone[class] -= mnt.Replication
774 if pr.classNeeded[class] {
775 pr.classTodo[class] = true
780 func (pr *putProgress) Done() bool {
781 return len(pr.classTodo) == 0 && pr.totalReplication > 0
784 func (pr *putProgress) Want(mnt *VolumeMount) bool {
785 if pr.Done() || pr.mountUsed[mnt] {
788 if len(pr.classTodo) == 0 {
789 // none specified == "any"
792 for class := range mnt.StorageClasses {
793 if pr.classTodo[class] {
800 func (pr *putProgress) Copy() *putProgress {
802 classNeeded: pr.classNeeded,
803 classTodo: make(map[string]bool, len(pr.classTodo)),
804 classDone: make(map[string]int, len(pr.classDone)),
805 mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)),
806 totalReplication: pr.totalReplication,
808 for k, v := range pr.classTodo {
811 for k, v := range pr.classDone {
814 for k, v := range pr.mountUsed {
820 func newPutProgress(classes []string) putProgress {
822 classNeeded: make(map[string]bool, len(classes)),
823 classTodo: make(map[string]bool, len(classes)),
824 classDone: map[string]int{},
825 mountUsed: map[*VolumeMount]bool{},
827 for _, c := range classes {
829 pr.classNeeded[c] = true
830 pr.classTodo[c] = true
836 // PutBlock stores the given block on one or more volumes.
838 // The MD5 checksum of the block must match the given hash.
840 // The block is written to each writable volume (ordered by priority
841 // and then UUID, see volume.go) until at least one replica has been
842 // stored in each of the requested storage classes.
844 // The returned error, if any, is a KeepError with one of the
848 // A different block with the same hash already exists on this
851 // The MD5 hash of the BLOCK does not match the argument HASH.
853 // There was not enough space left in any Keep volume to store
856 // The object could not be stored for some other reason (e.g.
857 // all writes failed). The text of the error message should
858 // provide as much detail as possible.
859 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
860 log := ctxlog.FromContext(ctx)
862 // Check that BLOCK's checksum matches HASH.
863 blockhash := fmt.Sprintf("%x", md5.Sum(block))
864 if blockhash != hash {
865 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
866 return putProgress{}, RequestHashError
869 result := newPutProgress(wantStorageClasses)
871 // If we already have this data, it's intact on disk, and we
872 // can update its timestamp, return success. If we have
873 // different data with the same hash, return failure.
874 if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
877 if ctx.Err() != nil {
878 return result, ErrClientDisconnect
881 writables := volmgr.NextWritable()
882 if len(writables) == 0 {
883 log.Error("no writable volumes")
884 return result, FullError
887 var wg sync.WaitGroup
889 cond := sync.Cond{L: &mtx}
890 // pending predicts what result will be if all pending writes
892 pending := result.Copy()
893 var allFull atomic.Value
896 // We hold the lock for the duration of the "each volume" loop
897 // below, except when it is released during cond.Wait().
900 for _, mnt := range writables {
901 // Wait until our decision to use this mount does not
902 // depend on the outcome of pending writes.
903 for result.Want(mnt) && !pending.Want(mnt) {
906 if !result.Want(mnt) {
913 log.Debugf("PutBlock: start write to %s", mnt.UUID)
915 err := mnt.Put(ctx, hash, block)
919 log.Debugf("PutBlock: write to %s failed", mnt.UUID)
922 log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
928 if err != nil && err != FullError && ctx.Err() == nil {
929 // The volume is not full but the
930 // write did not succeed. Report the
931 // error and continue trying.
933 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
939 if ctx.Err() != nil {
940 return result, ErrClientDisconnect
946 if result.totalReplication > 0 {
947 // Some, but not all, of the storage classes were
948 // satisfied. This qualifies as success.
950 } else if allFull.Load().(bool) {
951 log.Error("all volumes with qualifying storage classes are full")
952 return putProgress{}, FullError
954 // Already logged the non-full errors.
955 return putProgress{}, GenericError
959 // CompareAndTouch looks for volumes where the given content already
960 // exists and its modification time can be updated (i.e., it is
961 // protected from garbage collection), and updates result accordingly.
962 // It returns when the result is Done() or all volumes have been
964 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
965 log := ctxlog.FromContext(ctx)
966 for _, mnt := range volmgr.AllWritable() {
967 if !result.Want(mnt) {
970 err := mnt.Compare(ctx, hash, buf)
971 if ctx.Err() != nil {
973 } else if err == CollisionError {
974 // Stop if we have a block with same hash but
975 // different content. (It will be impossible
976 // to tell which one is wanted if we have
977 // both, so there's no point writing it even
978 // on a different volume.)
979 log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
980 return CollisionError
981 } else if os.IsNotExist(err) {
982 // Block does not exist. This is the only
983 // "normal" error: we don't log anything.
985 } else if err != nil {
986 // Couldn't open file, data is corrupt on
987 // disk, etc.: log this abnormal condition,
988 // and try the next volume.
989 log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
992 if err := mnt.Touch(hash); err != nil {
993 log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
996 // Compare and Touch both worked --> done.
1005 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
1007 // IsValidLocator returns true if the specified string is a valid Keep locator.
1008 // When Keep is extended to support hash types other than MD5,
1009 // this should be updated to cover those as well.
1011 func IsValidLocator(loc string) bool {
1012 return validLocatorRe.MatchString(loc)
1015 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
1017 // GetAPIToken returns the OAuth2 token from the Authorization
1018 // header of a HTTP request, or an empty string if no matching
1020 func GetAPIToken(req *http.Request) string {
1021 if auth, ok := req.Header["Authorization"]; ok {
1022 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
1029 // canDelete returns true if the user identified by apiToken is
1030 // allowed to delete blocks.
1031 func (rtr *router) canDelete(apiToken string) bool {
1035 // Blocks may be deleted only when Keep has been configured with a
1037 if rtr.isSystemAuth(apiToken) {
1040 // TODO(twp): look up apiToken with the API server
1041 // return true if is_admin is true and if the token
1042 // has unlimited scope
1046 // isSystemAuth returns true if the given token is allowed to perform
1047 // system level actions like deleting data.
1048 func (rtr *router) isSystemAuth(token string) bool {
1049 return token != "" && token == rtr.cluster.SystemRootToken