1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
24 "git.arvados.org/arvados.git/lib/cmd"
25 "git.arvados.org/arvados.git/sdk/go/arvados"
26 "git.arvados.org/arvados.git/sdk/go/ctxlog"
27 "git.arvados.org/arvados.git/sdk/go/health"
28 "git.arvados.org/arvados.git/sdk/go/httpserver"
29 "github.com/gorilla/mux"
30 "github.com/prometheus/client_golang/prometheus"
31 "github.com/sirupsen/logrus"
36 cluster *arvados.Cluster
37 logger logrus.FieldLogger
38 remoteProxy remoteProxy
40 volmgr *RRVolumeManager
45 // MakeRESTRouter returns a new router that forwards all Keep requests
46 // to the appropriate handlers.
47 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
49 Router: mux.NewRouter(),
51 logger: ctxlog.FromContext(ctx),
52 metrics: &nodeMetrics{reg: reg},
59 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
61 `/{hash:[0-9a-f]{32}}+{hints}`,
62 rtr.handleGET).Methods("GET", "HEAD")
64 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
65 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
66 // List all blocks stored here. Privileged client only.
67 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
68 // List blocks stored here whose hash has the given prefix.
69 // Privileged client only.
70 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
71 // Update timestamp on existing block. Privileged client only.
72 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
74 // Internals/debugging info (runtime.MemStats)
75 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
77 // List volumes: path, device number, bytes used/avail.
78 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
80 // List mounts: UUID, readonly, tier, device ID, ...
81 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
82 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
83 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
85 // Replace the current pull queue.
86 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
88 // Replace the current trash queue.
89 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
91 // Untrash moves blocks from trash back into store
92 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
94 rtr.Handle("/_health/{check}", &health.Handler{
95 Token: cluster.ManagementToken,
99 // Any request which does not match any of these routes gets
101 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
103 rtr.metrics.setupBufferPoolMetrics(bufs)
104 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
105 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
110 // BadRequestHandler is a HandleFunc to address bad requests.
111 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
112 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
115 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
116 locator := req.URL.Path[1:]
117 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
118 rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
122 if rtr.cluster.Collections.BlobSigning {
123 locator := req.URL.Path[1:] // strip leading slash
124 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
125 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
130 // TODO: Probe volumes to check whether the block _might_
131 // exist. Some volumes/types could support a quick existence
132 // check without causing other operations to suffer. If all
133 // volumes support that, and assure us the block definitely
134 // isn't here, we can return 404 now instead of waiting for a
137 buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
139 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
144 size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
146 code := http.StatusInternalServerError
147 if err, ok := err.(*KeepError); ok {
150 http.Error(resp, err.Error(), code)
154 resp.Header().Set("Content-Length", strconv.Itoa(size))
155 resp.Header().Set("Content-Type", "application/octet-stream")
156 resp.Write(buf[:size])
159 // Get a buffer from the pool -- but give up and return a non-nil
160 // error if ctx ends before we get a buffer.
161 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
162 bufReady := make(chan []byte)
164 bufReady <- bufs.Get(bufSize)
167 case buf := <-bufReady:
171 // Even if closeNotifier happened first, we
172 // need to keep waiting for our buf so we can
173 // return it to the pool.
176 return nil, ErrClientDisconnect
180 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
181 if !rtr.isSystemAuth(GetAPIToken(req)) {
182 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
185 hash := mux.Vars(req)["hash"]
186 vols := rtr.volmgr.AllWritable()
188 http.Error(resp, "no volumes", http.StatusNotFound)
192 for _, mnt := range vols {
193 err = mnt.Touch(hash)
201 case os.IsNotExist(err):
202 http.Error(resp, err.Error(), http.StatusNotFound)
204 http.Error(resp, err.Error(), http.StatusInternalServerError)
208 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
209 hash := mux.Vars(req)["hash"]
211 // Detect as many error conditions as possible before reading
212 // the body: avoid transmitting data that will not end up
213 // being written anyway.
215 if req.ContentLength == -1 {
216 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
220 if req.ContentLength > BlockSize {
221 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
225 if len(rtr.volmgr.AllWritable()) == 0 {
226 http.Error(resp, FullError.Error(), FullError.HTTPCode)
230 var wantStorageClasses []string
231 if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
232 wantStorageClasses = strings.Split(hdr, ",")
233 for i, sc := range wantStorageClasses {
234 wantStorageClasses[i] = strings.TrimSpace(sc)
237 // none specified -- use configured default
238 for class, cfg := range rtr.cluster.StorageClasses {
240 wantStorageClasses = append(wantStorageClasses, class)
245 buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
247 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
251 _, err = io.ReadFull(req.Body, buf)
253 http.Error(resp, err.Error(), 500)
258 result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
262 code := http.StatusInternalServerError
263 if err, ok := err.(*KeepError); ok {
266 http.Error(resp, err.Error(), code)
270 // Success; add a size hint, sign the locator if possible, and
271 // return it to the client.
272 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
273 apiToken := GetAPIToken(req)
274 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
275 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
276 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
278 resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
279 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
280 resp.Write([]byte(returnHash + "\n"))
283 // IndexHandler responds to "/index", "/index/{prefix}", and
284 // "/mounts/{uuid}/blocks" requests.
285 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
286 if !rtr.isSystemAuth(GetAPIToken(req)) {
287 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
291 prefix := mux.Vars(req)["prefix"]
294 prefix = req.Form.Get("prefix")
297 uuid := mux.Vars(req)["uuid"]
299 var vols []*VolumeMount
301 vols = rtr.volmgr.AllReadable()
302 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
303 http.Error(resp, "mount not found", http.StatusNotFound)
306 vols = []*VolumeMount{mnt}
309 for _, v := range vols {
310 if err := v.IndexTo(prefix, resp); err != nil {
311 // We can't send an error status/message to
312 // the client because IndexTo() might have
313 // already written body content. All we can do
314 // is log the error in our own logs.
316 // The client must notice the lack of trailing
317 // newline as an indication that the response
319 ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
323 // An empty line at EOF is the only way the client can be
324 // assured the entire index was received.
325 resp.Write([]byte{'\n'})
328 // MountsHandler responds to "GET /mounts" requests.
329 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
330 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
332 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
337 type PoolStatus struct {
338 Alloc uint64 `json:"BytesAllocatedCumulative"`
339 Cap int `json:"BuffersMax"`
340 Len int `json:"BuffersInUse"`
343 type volumeStatusEnt struct {
345 Status *VolumeStatus `json:",omitempty"`
346 VolumeStats *ioStats `json:",omitempty"`
347 InternalStats interface{} `json:",omitempty"`
351 type NodeStatus struct {
352 Volumes []*volumeStatusEnt
353 BufferPool PoolStatus
354 PullQueue WorkQueueStatus
355 TrashQueue WorkQueueStatus
362 var stLock sync.Mutex
364 // DebugHandler addresses /debug.json requests.
365 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
366 type debugStats struct {
367 MemStats runtime.MemStats
370 runtime.ReadMemStats(&ds.MemStats)
371 data, err := json.Marshal(&ds)
373 http.Error(resp, err.Error(), http.StatusInternalServerError)
379 // StatusHandler addresses /status.json requests.
380 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
382 rtr.readNodeStatus(&st)
383 data, err := json.Marshal(&st)
386 http.Error(resp, err.Error(), http.StatusInternalServerError)
392 // populate the given NodeStatus struct with current values.
393 func (rtr *router) readNodeStatus(st *NodeStatus) {
394 st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
395 vols := rtr.volmgr.AllReadable()
396 if cap(st.Volumes) < len(vols) {
397 st.Volumes = make([]*volumeStatusEnt, len(vols))
399 st.Volumes = st.Volumes[:0]
400 for _, vol := range vols {
401 var internalStats interface{}
402 if vol, ok := vol.Volume.(InternalStatser); ok {
403 internalStats = vol.InternalStats()
405 st.Volumes = append(st.Volumes, &volumeStatusEnt{
407 Status: vol.Status(),
408 InternalStats: internalStats,
409 //VolumeStats: rtr.volmgr.VolumeStats(vol),
412 st.BufferPool.Alloc = bufs.Alloc()
413 st.BufferPool.Cap = bufs.Cap()
414 st.BufferPool.Len = bufs.Len()
415 st.PullQueue = getWorkQueueStatus(rtr.pullq)
416 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
419 // return a WorkQueueStatus for the given queue. If q is nil (which
420 // should never happen except in test suites), return a zero status
421 // value instead of crashing.
422 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
424 // This should only happen during tests.
425 return WorkQueueStatus{}
430 // handleDELETE processes DELETE requests.
432 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
433 // from all connected volumes.
435 // Only the Data Manager, or an Arvados admin with scope "all", are
436 // allowed to issue DELETE requests. If a DELETE request is not
437 // authenticated or is issued by a non-admin user, the server returns
438 // a PermissionError.
440 // Upon receiving a valid request from an authorized user,
441 // handleDELETE deletes all copies of the specified block on local
446 // If the requested blocks was not found on any volume, the response
447 // code is HTTP 404 Not Found.
449 // Otherwise, the response code is 200 OK, with a response body
450 // consisting of the JSON message
452 // {"copies_deleted":d,"copies_failed":f}
454 // where d and f are integers representing the number of blocks that
455 // were successfully and unsuccessfully deleted.
456 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
457 hash := mux.Vars(req)["hash"]
459 // Confirm that this user is an admin and has a token with unlimited scope.
460 var tok = GetAPIToken(req)
461 if tok == "" || !rtr.canDelete(tok) {
462 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
466 if !rtr.cluster.Collections.BlobTrash {
467 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
471 // Delete copies of this block from all available volumes.
472 // Report how many blocks were successfully deleted, and how
473 // many were found on writable volumes but not deleted.
475 Deleted int `json:"copies_deleted"`
476 Failed int `json:"copies_failed"`
478 for _, vol := range rtr.volmgr.AllWritable() {
479 if err := vol.Trash(hash); err == nil {
481 } else if os.IsNotExist(err) {
485 ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
488 if result.Deleted == 0 && result.Failed == 0 {
489 resp.WriteHeader(http.StatusNotFound)
492 body, err := json.Marshal(result)
494 http.Error(resp, err.Error(), http.StatusInternalServerError)
500 /* PullHandler processes "PUT /pull" requests for the data manager.
501 The request body is a JSON message containing a list of pull
502 requests in the following format:
506 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
508 "keep0.qr1hi.arvadosapi.com:25107",
509 "keep1.qr1hi.arvadosapi.com:25108"
513 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
523 Each pull request in the list consists of a block locator string
524 and an ordered list of servers. Keepstore should try to fetch the
525 block from each server in turn.
527 If the request has not been sent by the Data Manager, return 401
530 If the JSON unmarshalling fails, return 400 Bad Request.
533 // PullRequest consists of a block locator and an ordered list of servers
534 type PullRequest struct {
535 Locator string `json:"locator"`
536 Servers []string `json:"servers"`
538 // Destination mount, or "" for "anywhere"
539 MountUUID string `json:"mount_uuid"`
542 // PullHandler processes "PUT /pull" requests for the data manager.
543 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
544 // Reject unauthorized requests.
545 if !rtr.isSystemAuth(GetAPIToken(req)) {
546 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
550 // Parse the request body.
552 r := json.NewDecoder(req.Body)
553 if err := r.Decode(&pr); err != nil {
554 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
558 // We have a properly formatted pull list sent from the data
559 // manager. Report success and send the list to the pull list
560 // manager for further handling.
561 resp.WriteHeader(http.StatusOK)
563 fmt.Sprintf("Received %d pull requests\n", len(pr))))
566 for _, p := range pr {
569 rtr.pullq.ReplaceQueue(plist)
572 // TrashRequest consists of a block locator and its Mtime
573 type TrashRequest struct {
574 Locator string `json:"locator"`
575 BlockMtime int64 `json:"block_mtime"`
577 // Target mount, or "" for "everywhere"
578 MountUUID string `json:"mount_uuid"`
581 // TrashHandler processes /trash requests.
582 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
583 // Reject unauthorized requests.
584 if !rtr.isSystemAuth(GetAPIToken(req)) {
585 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
589 // Parse the request body.
590 var trash []TrashRequest
591 r := json.NewDecoder(req.Body)
592 if err := r.Decode(&trash); err != nil {
593 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
597 // We have a properly formatted trash list sent from the data
598 // manager. Report success and send the list to the trash work
599 // queue for further handling.
600 resp.WriteHeader(http.StatusOK)
602 fmt.Sprintf("Received %d trash requests\n", len(trash))))
605 for _, t := range trash {
608 rtr.trashq.ReplaceQueue(tlist)
611 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
612 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
613 // Reject unauthorized requests.
614 if !rtr.isSystemAuth(GetAPIToken(req)) {
615 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
619 log := ctxlog.FromContext(req.Context())
620 hash := mux.Vars(req)["hash"]
622 if len(rtr.volmgr.AllWritable()) == 0 {
623 http.Error(resp, "No writable volumes", http.StatusNotFound)
627 var untrashedOn, failedOn []string
629 for _, vol := range rtr.volmgr.AllWritable() {
630 err := vol.Untrash(hash)
632 if os.IsNotExist(err) {
634 } else if err != nil {
635 log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
636 failedOn = append(failedOn, vol.String())
638 log.Infof("Untrashed %v on volume %v", hash, vol.String())
639 untrashedOn = append(untrashedOn, vol.String())
643 if numNotFound == len(rtr.volmgr.AllWritable()) {
644 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
645 } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
646 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
648 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
649 if len(failedOn) > 0 {
650 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
651 http.Error(resp, respBody, http.StatusInternalServerError)
653 fmt.Fprintln(resp, respBody)
658 // GetBlock and PutBlock implement lower-level code for handling
659 // blocks by rooting through volumes connected to the local machine.
660 // Once the handler has determined that system policy permits the
661 // request, it calls these methods to perform the actual operation.
663 // TODO(twp): this code would probably be better located in the
664 // VolumeManager interface. As an abstraction, the VolumeManager
665 // should be the only part of the code that cares about which volume a
666 // block is stored on, so it should be responsible for figuring out
667 // which volume to check for fetching blocks, storing blocks, etc.
669 // GetBlock fetches the block identified by "hash" into the provided
670 // buf, and returns the data size.
672 // If the block cannot be found on any volume, returns NotFoundError.
674 // If the block found does not have the correct MD5 hash, returns
676 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
677 log := ctxlog.FromContext(ctx)
679 // Attempt to read the requested hash from a keep volume.
680 errorToCaller := NotFoundError
682 for _, vol := range volmgr.AllReadable() {
683 size, err := vol.Get(ctx, hash, buf)
686 return 0, ErrClientDisconnect
690 // IsNotExist is an expected error and may be
691 // ignored. All other errors are logged. In
692 // any case we continue trying to read other
693 // volumes. If all volumes report IsNotExist,
694 // we return a NotFoundError.
695 if !os.IsNotExist(err) {
696 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
698 // If some volume returns a transient error, return it to the caller
699 // instead of "Not found" so it can retry.
700 if err == VolumeBusyError {
701 errorToCaller = err.(*KeepError)
705 // Check the file checksum.
706 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
707 if filehash != hash {
708 // TODO: Try harder to tell a sysadmin about
710 log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
711 errorToCaller = DiskHashError
714 if errorToCaller == DiskHashError {
715 log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
719 return 0, errorToCaller
722 type putProgress struct {
723 classNeeded map[string]bool
724 classTodo map[string]bool
725 mountUsed map[*VolumeMount]bool
727 classDone map[string]int
730 // Number of distinct replicas stored. "2" can mean the block was
731 // stored on 2 different volumes with replication 1, or on 1 volume
732 // with replication 2.
733 func (pr putProgress) TotalReplication() string {
734 return strconv.Itoa(pr.totalReplication)
737 // Number of replicas satisfying each storage class, formatted like
738 // "default=2; special=1".
739 func (pr putProgress) ClassReplication() string {
741 for k, v := range pr.classDone {
745 s += k + "=" + strconv.Itoa(v)
750 func (pr *putProgress) Add(mnt *VolumeMount) {
751 if pr.mountUsed[mnt] {
752 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
755 pr.mountUsed[mnt] = true
756 pr.totalReplication += mnt.Replication
757 for class := range mnt.StorageClasses {
758 pr.classDone[class] += mnt.Replication
759 delete(pr.classTodo, class)
763 func (pr *putProgress) Sub(mnt *VolumeMount) {
764 if !pr.mountUsed[mnt] {
765 logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
768 pr.mountUsed[mnt] = false
769 pr.totalReplication -= mnt.Replication
770 for class := range mnt.StorageClasses {
771 pr.classDone[class] -= mnt.Replication
772 if pr.classNeeded[class] {
773 pr.classTodo[class] = true
778 func (pr *putProgress) Done() bool {
779 return len(pr.classTodo) == 0 && pr.totalReplication > 0
782 func (pr *putProgress) Want(mnt *VolumeMount) bool {
783 if pr.Done() || pr.mountUsed[mnt] {
786 if len(pr.classTodo) == 0 {
787 // none specified == "any"
790 for class := range mnt.StorageClasses {
791 if pr.classTodo[class] {
798 func (pr *putProgress) Copy() *putProgress {
800 classNeeded: pr.classNeeded,
801 classTodo: make(map[string]bool, len(pr.classTodo)),
802 classDone: make(map[string]int, len(pr.classDone)),
803 mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)),
804 totalReplication: pr.totalReplication,
806 for k, v := range pr.classTodo {
809 for k, v := range pr.classDone {
812 for k, v := range pr.mountUsed {
818 func newPutProgress(classes []string) putProgress {
820 classNeeded: make(map[string]bool, len(classes)),
821 classTodo: make(map[string]bool, len(classes)),
822 classDone: map[string]int{},
823 mountUsed: map[*VolumeMount]bool{},
825 for _, c := range classes {
827 pr.classNeeded[c] = true
828 pr.classTodo[c] = true
834 // PutBlock stores the given block on one or more volumes.
836 // The MD5 checksum of the block must match the given hash.
838 // The block is written to each writable volume (ordered by priority
839 // and then UUID, see volume.go) until at least one replica has been
840 // stored in each of the requested storage classes.
842 // The returned error, if any, is a KeepError with one of the
847 // A different block with the same hash already exists on this
852 // The MD5 hash of the BLOCK does not match the argument HASH.
856 // There was not enough space left in any Keep volume to store
861 // The object could not be stored for some other reason (e.g.
862 // all writes failed). The text of the error message should
863 // provide as much detail as possible.
864 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
865 log := ctxlog.FromContext(ctx)
867 // Check that BLOCK's checksum matches HASH.
868 blockhash := fmt.Sprintf("%x", md5.Sum(block))
869 if blockhash != hash {
870 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
871 return putProgress{}, RequestHashError
874 result := newPutProgress(wantStorageClasses)
876 // If we already have this data, it's intact on disk, and we
877 // can update its timestamp, return success. If we have
878 // different data with the same hash, return failure.
879 if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
882 if ctx.Err() != nil {
883 return result, ErrClientDisconnect
886 writables := volmgr.NextWritable()
887 if len(writables) == 0 {
888 log.Error("no writable volumes")
889 return result, FullError
892 var wg sync.WaitGroup
894 cond := sync.Cond{L: &mtx}
895 // pending predicts what result will be if all pending writes
897 pending := result.Copy()
898 var allFull atomic.Value
901 // We hold the lock for the duration of the "each volume" loop
902 // below, except when it is released during cond.Wait().
905 for _, mnt := range writables {
906 // Wait until our decision to use this mount does not
907 // depend on the outcome of pending writes.
908 for result.Want(mnt) && !pending.Want(mnt) {
911 if !result.Want(mnt) {
918 log.Debugf("PutBlock: start write to %s", mnt.UUID)
920 err := mnt.Put(ctx, hash, block)
924 log.Debugf("PutBlock: write to %s failed", mnt.UUID)
927 log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
933 if err != nil && err != FullError && ctx.Err() == nil {
934 // The volume is not full but the
935 // write did not succeed. Report the
936 // error and continue trying.
938 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
944 if ctx.Err() != nil {
945 return result, ErrClientDisconnect
951 if result.totalReplication > 0 {
952 // Some, but not all, of the storage classes were
953 // satisfied. This qualifies as success.
955 } else if allFull.Load().(bool) {
956 log.Error("all volumes with qualifying storage classes are full")
957 return putProgress{}, FullError
959 // Already logged the non-full errors.
960 return putProgress{}, GenericError
964 // CompareAndTouch looks for volumes where the given content already
965 // exists and its modification time can be updated (i.e., it is
966 // protected from garbage collection), and updates result accordingly.
967 // It returns when the result is Done() or all volumes have been
969 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
970 log := ctxlog.FromContext(ctx)
971 for _, mnt := range volmgr.AllWritable() {
972 if !result.Want(mnt) {
975 err := mnt.Compare(ctx, hash, buf)
976 if ctx.Err() != nil {
978 } else if err == CollisionError {
979 // Stop if we have a block with same hash but
980 // different content. (It will be impossible
981 // to tell which one is wanted if we have
982 // both, so there's no point writing it even
983 // on a different volume.)
984 log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
985 return CollisionError
986 } else if os.IsNotExist(err) {
987 // Block does not exist. This is the only
988 // "normal" error: we don't log anything.
990 } else if err != nil {
991 // Couldn't open file, data is corrupt on
992 // disk, etc.: log this abnormal condition,
993 // and try the next volume.
994 log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
997 if err := mnt.Touch(hash); err != nil {
998 log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
1001 // Compare and Touch both worked --> done.
1010 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
1012 // IsValidLocator returns true if the specified string is a valid Keep
1013 // locator. When Keep is extended to support hash types other than
1014 // MD5, this should be updated to cover those as well.
1015 func IsValidLocator(loc string) bool {
1016 return validLocatorRe.MatchString(loc)
1019 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
1021 // GetAPIToken returns the OAuth2 token from the Authorization
1022 // header of a HTTP request, or an empty string if no matching
1024 func GetAPIToken(req *http.Request) string {
1025 if auth, ok := req.Header["Authorization"]; ok {
1026 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
1033 // canDelete returns true if the user identified by apiToken is
1034 // allowed to delete blocks.
1035 func (rtr *router) canDelete(apiToken string) bool {
1039 // Blocks may be deleted only when Keep has been configured with a
1041 if rtr.isSystemAuth(apiToken) {
1044 // TODO(twp): look up apiToken with the API server
1045 // return true if is_admin is true and if the token
1046 // has unlimited scope
1050 // isSystemAuth returns true if the given token is allowed to perform
1051 // system level actions like deleting data.
1052 func (rtr *router) isSystemAuth(token string) bool {
1053 return token != "" && token == rtr.cluster.SystemRootToken