1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
24 "git.arvados.org/arvados.git/sdk/go/arvados"
25 "git.arvados.org/arvados.git/sdk/go/ctxlog"
26 "git.arvados.org/arvados.git/sdk/go/health"
27 "git.arvados.org/arvados.git/sdk/go/httpserver"
28 "github.com/gorilla/mux"
29 "github.com/prometheus/client_golang/prometheus"
30 "github.com/sirupsen/logrus"
35 cluster *arvados.Cluster
36 logger logrus.FieldLogger
37 remoteProxy remoteProxy
39 volmgr *RRVolumeManager
44 // MakeRESTRouter returns a new router that forwards all Keep requests
45 // to the appropriate handlers.
46 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
48 Router: mux.NewRouter(),
50 logger: ctxlog.FromContext(ctx),
51 metrics: &nodeMetrics{reg: reg},
58 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
60 `/{hash:[0-9a-f]{32}}+{hints}`,
61 rtr.handleGET).Methods("GET", "HEAD")
63 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
64 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
65 // List all blocks stored here. Privileged client only.
66 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
67 // List blocks stored here whose hash has the given prefix.
68 // Privileged client only.
69 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
70 // Update timestamp on existing block. Privileged client only.
71 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
73 // Internals/debugging info (runtime.MemStats)
74 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
76 // List volumes: path, device number, bytes used/avail.
77 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
79 // List mounts: UUID, readonly, tier, device ID, ...
80 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
81 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
82 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
84 // Replace the current pull queue.
85 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
87 // Replace the current trash queue.
88 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
90 // Untrash moves blocks from trash back into store
91 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
93 rtr.Handle("/_health/{check}", &health.Handler{
94 Token: cluster.ManagementToken,
98 // Any request which does not match any of these routes gets
100 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
102 rtr.metrics.setupBufferPoolMetrics(bufs)
103 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
104 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
109 // BadRequestHandler is a HandleFunc to address bad requests.
110 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
111 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
114 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
115 ctx, cancel := contextForResponse(context.TODO(), resp)
118 locator := req.URL.Path[1:]
119 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
120 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
124 if rtr.cluster.Collections.BlobSigning {
125 locator := req.URL.Path[1:] // strip leading slash
126 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
127 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
132 // TODO: Probe volumes to check whether the block _might_
133 // exist. Some volumes/types could support a quick existence
134 // check without causing other operations to suffer. If all
135 // volumes support that, and assure us the block definitely
136 // isn't here, we can return 404 now instead of waiting for a
139 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
141 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
146 size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
148 code := http.StatusInternalServerError
149 if err, ok := err.(*KeepError); ok {
152 http.Error(resp, err.Error(), code)
156 resp.Header().Set("Content-Length", strconv.Itoa(size))
157 resp.Header().Set("Content-Type", "application/octet-stream")
158 resp.Write(buf[:size])
161 // Return a new context that gets cancelled by resp's CloseNotifier.
162 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
163 ctx, cancel := context.WithCancel(parent)
164 if cn, ok := resp.(http.CloseNotifier); ok {
165 go func(c <-chan bool) {
176 // Get a buffer from the pool -- but give up and return a non-nil
177 // error if ctx ends before we get a buffer.
178 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
179 bufReady := make(chan []byte)
181 bufReady <- bufs.Get(bufSize)
184 case buf := <-bufReady:
188 // Even if closeNotifier happened first, we
189 // need to keep waiting for our buf so we can
190 // return it to the pool.
193 return nil, ErrClientDisconnect
197 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
198 if !rtr.isSystemAuth(GetAPIToken(req)) {
199 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
202 hash := mux.Vars(req)["hash"]
203 vols := rtr.volmgr.AllWritable()
205 http.Error(resp, "no volumes", http.StatusNotFound)
209 for _, mnt := range vols {
210 err = mnt.Touch(hash)
218 case os.IsNotExist(err):
219 http.Error(resp, err.Error(), http.StatusNotFound)
221 http.Error(resp, err.Error(), http.StatusInternalServerError)
225 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
226 ctx, cancel := contextForResponse(context.TODO(), resp)
229 hash := mux.Vars(req)["hash"]
231 // Detect as many error conditions as possible before reading
232 // the body: avoid transmitting data that will not end up
233 // being written anyway.
235 if req.ContentLength == -1 {
236 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
240 if req.ContentLength > BlockSize {
241 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
245 if len(rtr.volmgr.AllWritable()) == 0 {
246 http.Error(resp, FullError.Error(), FullError.HTTPCode)
250 var wantStorageClasses []string
251 if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
252 wantStorageClasses = strings.Split(hdr, ",")
253 for i, sc := range wantStorageClasses {
254 wantStorageClasses[i] = strings.TrimSpace(sc)
257 // none specified -- use configured default
258 for class, cfg := range rtr.cluster.StorageClasses {
260 wantStorageClasses = append(wantStorageClasses, class)
265 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
267 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
271 _, err = io.ReadFull(req.Body, buf)
273 http.Error(resp, err.Error(), 500)
278 result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
282 code := http.StatusInternalServerError
283 if err, ok := err.(*KeepError); ok {
286 http.Error(resp, err.Error(), code)
290 // Success; add a size hint, sign the locator if possible, and
291 // return it to the client.
292 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
293 apiToken := GetAPIToken(req)
294 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
295 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
296 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
298 resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
299 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
300 resp.Write([]byte(returnHash + "\n"))
303 // IndexHandler responds to "/index", "/index/{prefix}", and
304 // "/mounts/{uuid}/blocks" requests.
305 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
306 if !rtr.isSystemAuth(GetAPIToken(req)) {
307 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
311 prefix := mux.Vars(req)["prefix"]
314 prefix = req.Form.Get("prefix")
317 uuid := mux.Vars(req)["uuid"]
319 var vols []*VolumeMount
321 vols = rtr.volmgr.AllReadable()
322 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
323 http.Error(resp, "mount not found", http.StatusNotFound)
326 vols = []*VolumeMount{mnt}
329 for _, v := range vols {
330 if err := v.IndexTo(prefix, resp); err != nil {
331 // We can't send an error status/message to
332 // the client because IndexTo() might have
333 // already written body content. All we can do
334 // is log the error in our own logs.
336 // The client must notice the lack of trailing
337 // newline as an indication that the response
339 ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
343 // An empty line at EOF is the only way the client can be
344 // assured the entire index was received.
345 resp.Write([]byte{'\n'})
348 // MountsHandler responds to "GET /mounts" requests.
349 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
350 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
352 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
357 type PoolStatus struct {
358 Alloc uint64 `json:"BytesAllocatedCumulative"`
359 Cap int `json:"BuffersMax"`
360 Len int `json:"BuffersInUse"`
363 type volumeStatusEnt struct {
365 Status *VolumeStatus `json:",omitempty"`
366 VolumeStats *ioStats `json:",omitempty"`
367 InternalStats interface{} `json:",omitempty"`
371 type NodeStatus struct {
372 Volumes []*volumeStatusEnt
373 BufferPool PoolStatus
374 PullQueue WorkQueueStatus
375 TrashQueue WorkQueueStatus
382 var stLock sync.Mutex
384 // DebugHandler addresses /debug.json requests.
385 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
386 type debugStats struct {
387 MemStats runtime.MemStats
390 runtime.ReadMemStats(&ds.MemStats)
391 data, err := json.Marshal(&ds)
393 http.Error(resp, err.Error(), http.StatusInternalServerError)
399 // StatusHandler addresses /status.json requests.
400 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
402 rtr.readNodeStatus(&st)
403 data, err := json.Marshal(&st)
406 http.Error(resp, err.Error(), http.StatusInternalServerError)
412 // populate the given NodeStatus struct with current values.
413 func (rtr *router) readNodeStatus(st *NodeStatus) {
415 vols := rtr.volmgr.AllReadable()
416 if cap(st.Volumes) < len(vols) {
417 st.Volumes = make([]*volumeStatusEnt, len(vols))
419 st.Volumes = st.Volumes[:0]
420 for _, vol := range vols {
421 var internalStats interface{}
422 if vol, ok := vol.Volume.(InternalStatser); ok {
423 internalStats = vol.InternalStats()
425 st.Volumes = append(st.Volumes, &volumeStatusEnt{
427 Status: vol.Status(),
428 InternalStats: internalStats,
429 //VolumeStats: rtr.volmgr.VolumeStats(vol),
432 st.BufferPool.Alloc = bufs.Alloc()
433 st.BufferPool.Cap = bufs.Cap()
434 st.BufferPool.Len = bufs.Len()
435 st.PullQueue = getWorkQueueStatus(rtr.pullq)
436 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
439 // return a WorkQueueStatus for the given queue. If q is nil (which
440 // should never happen except in test suites), return a zero status
441 // value instead of crashing.
442 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
444 // This should only happen during tests.
445 return WorkQueueStatus{}
450 // handleDELETE processes DELETE requests.
452 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
453 // from all connected volumes.
455 // Only the Data Manager, or an Arvados admin with scope "all", are
456 // allowed to issue DELETE requests. If a DELETE request is not
457 // authenticated or is issued by a non-admin user, the server returns
458 // a PermissionError.
460 // Upon receiving a valid request from an authorized user,
461 // handleDELETE deletes all copies of the specified block on local
466 // If the requested blocks was not found on any volume, the response
467 // code is HTTP 404 Not Found.
469 // Otherwise, the response code is 200 OK, with a response body
470 // consisting of the JSON message
472 // {"copies_deleted":d,"copies_failed":f}
474 // where d and f are integers representing the number of blocks that
475 // were successfully and unsuccessfully deleted.
477 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
478 hash := mux.Vars(req)["hash"]
480 // Confirm that this user is an admin and has a token with unlimited scope.
481 var tok = GetAPIToken(req)
482 if tok == "" || !rtr.canDelete(tok) {
483 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
487 if !rtr.cluster.Collections.BlobTrash {
488 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
492 // Delete copies of this block from all available volumes.
493 // Report how many blocks were successfully deleted, and how
494 // many were found on writable volumes but not deleted.
496 Deleted int `json:"copies_deleted"`
497 Failed int `json:"copies_failed"`
499 for _, vol := range rtr.volmgr.AllWritable() {
500 if err := vol.Trash(hash); err == nil {
502 } else if os.IsNotExist(err) {
506 ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
509 if result.Deleted == 0 && result.Failed == 0 {
510 resp.WriteHeader(http.StatusNotFound)
513 body, err := json.Marshal(result)
515 http.Error(resp, err.Error(), http.StatusInternalServerError)
521 /* PullHandler processes "PUT /pull" requests for the data manager.
522 The request body is a JSON message containing a list of pull
523 requests in the following format:
527 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
529 "keep0.qr1hi.arvadosapi.com:25107",
530 "keep1.qr1hi.arvadosapi.com:25108"
534 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
544 Each pull request in the list consists of a block locator string
545 and an ordered list of servers. Keepstore should try to fetch the
546 block from each server in turn.
548 If the request has not been sent by the Data Manager, return 401
551 If the JSON unmarshalling fails, return 400 Bad Request.
554 // PullRequest consists of a block locator and an ordered list of servers
555 type PullRequest struct {
556 Locator string `json:"locator"`
557 Servers []string `json:"servers"`
559 // Destination mount, or "" for "anywhere"
560 MountUUID string `json:"mount_uuid"`
563 // PullHandler processes "PUT /pull" requests for the data manager.
564 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
565 // Reject unauthorized requests.
566 if !rtr.isSystemAuth(GetAPIToken(req)) {
567 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
571 // Parse the request body.
573 r := json.NewDecoder(req.Body)
574 if err := r.Decode(&pr); err != nil {
575 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
579 // We have a properly formatted pull list sent from the data
580 // manager. Report success and send the list to the pull list
581 // manager for further handling.
582 resp.WriteHeader(http.StatusOK)
584 fmt.Sprintf("Received %d pull requests\n", len(pr))))
587 for _, p := range pr {
590 rtr.pullq.ReplaceQueue(plist)
593 // TrashRequest consists of a block locator and its Mtime
594 type TrashRequest struct {
595 Locator string `json:"locator"`
596 BlockMtime int64 `json:"block_mtime"`
598 // Target mount, or "" for "everywhere"
599 MountUUID string `json:"mount_uuid"`
602 // TrashHandler processes /trash requests.
603 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
604 // Reject unauthorized requests.
605 if !rtr.isSystemAuth(GetAPIToken(req)) {
606 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
610 // Parse the request body.
611 var trash []TrashRequest
612 r := json.NewDecoder(req.Body)
613 if err := r.Decode(&trash); err != nil {
614 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
618 // We have a properly formatted trash list sent from the data
619 // manager. Report success and send the list to the trash work
620 // queue for further handling.
621 resp.WriteHeader(http.StatusOK)
623 fmt.Sprintf("Received %d trash requests\n", len(trash))))
626 for _, t := range trash {
629 rtr.trashq.ReplaceQueue(tlist)
632 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
633 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
634 // Reject unauthorized requests.
635 if !rtr.isSystemAuth(GetAPIToken(req)) {
636 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
640 log := ctxlog.FromContext(req.Context())
641 hash := mux.Vars(req)["hash"]
643 if len(rtr.volmgr.AllWritable()) == 0 {
644 http.Error(resp, "No writable volumes", http.StatusNotFound)
648 var untrashedOn, failedOn []string
650 for _, vol := range rtr.volmgr.AllWritable() {
651 err := vol.Untrash(hash)
653 if os.IsNotExist(err) {
655 } else if err != nil {
656 log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
657 failedOn = append(failedOn, vol.String())
659 log.Infof("Untrashed %v on volume %v", hash, vol.String())
660 untrashedOn = append(untrashedOn, vol.String())
664 if numNotFound == len(rtr.volmgr.AllWritable()) {
665 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
666 } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
667 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
669 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
670 if len(failedOn) > 0 {
671 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
672 http.Error(resp, respBody, http.StatusInternalServerError)
674 fmt.Fprintln(resp, respBody)
679 // GetBlock and PutBlock implement lower-level code for handling
680 // blocks by rooting through volumes connected to the local machine.
681 // Once the handler has determined that system policy permits the
682 // request, it calls these methods to perform the actual operation.
684 // TODO(twp): this code would probably be better located in the
685 // VolumeManager interface. As an abstraction, the VolumeManager
686 // should be the only part of the code that cares about which volume a
687 // block is stored on, so it should be responsible for figuring out
688 // which volume to check for fetching blocks, storing blocks, etc.
690 // GetBlock fetches the block identified by "hash" into the provided
691 // buf, and returns the data size.
693 // If the block cannot be found on any volume, returns NotFoundError.
695 // If the block found does not have the correct MD5 hash, returns
698 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
699 log := ctxlog.FromContext(ctx)
701 // Attempt to read the requested hash from a keep volume.
702 errorToCaller := NotFoundError
704 for _, vol := range volmgr.AllReadable() {
705 size, err := vol.Get(ctx, hash, buf)
708 return 0, ErrClientDisconnect
712 // IsNotExist is an expected error and may be
713 // ignored. All other errors are logged. In
714 // any case we continue trying to read other
715 // volumes. If all volumes report IsNotExist,
716 // we return a NotFoundError.
717 if !os.IsNotExist(err) {
718 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
720 // If some volume returns a transient error, return it to the caller
721 // instead of "Not found" so it can retry.
722 if err == VolumeBusyError {
723 errorToCaller = err.(*KeepError)
727 // Check the file checksum.
728 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
729 if filehash != hash {
730 // TODO: Try harder to tell a sysadmin about
732 log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
733 errorToCaller = DiskHashError
736 if errorToCaller == DiskHashError {
737 log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
741 return 0, errorToCaller
744 type putProgress struct {
745 classNeeded map[string]bool
746 classTodo map[string]bool
747 mountUsed map[*VolumeMount]bool
749 classDone map[string]int
752 // Number of distinct replicas stored. "2" can mean the block was
753 // stored on 2 different volumes with replication 1, or on 1 volume
754 // with replication 2.
755 func (pr putProgress) TotalReplication() string {
756 return strconv.Itoa(pr.totalReplication)
759 // Number of replicas satisfying each storage class, formatted like
760 // "default=2; special=1".
761 func (pr putProgress) ClassReplication() string {
763 for k, v := range pr.classDone {
767 s += k + "=" + strconv.Itoa(v)
772 func (pr *putProgress) Add(mnt *VolumeMount) {
773 if pr.mountUsed[mnt] {
774 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
777 pr.mountUsed[mnt] = true
778 pr.totalReplication += mnt.Replication
779 for class := range mnt.StorageClasses {
780 pr.classDone[class] += mnt.Replication
781 delete(pr.classTodo, class)
785 func (pr *putProgress) Sub(mnt *VolumeMount) {
786 if !pr.mountUsed[mnt] {
787 logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
790 pr.mountUsed[mnt] = false
791 pr.totalReplication -= mnt.Replication
792 for class := range mnt.StorageClasses {
793 pr.classDone[class] -= mnt.Replication
794 if pr.classNeeded[class] {
795 pr.classTodo[class] = true
800 func (pr *putProgress) Done() bool {
801 return len(pr.classTodo) == 0 && pr.totalReplication > 0
804 func (pr *putProgress) Want(mnt *VolumeMount) bool {
805 if pr.Done() || pr.mountUsed[mnt] {
808 if len(pr.classTodo) == 0 {
809 // none specified == "any"
812 for class := range mnt.StorageClasses {
813 if pr.classTodo[class] {
820 func (pr *putProgress) Copy() *putProgress {
822 classNeeded: pr.classNeeded,
823 classTodo: make(map[string]bool, len(pr.classTodo)),
824 classDone: make(map[string]int, len(pr.classDone)),
825 mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)),
826 totalReplication: pr.totalReplication,
828 for k, v := range pr.classTodo {
831 for k, v := range pr.classDone {
834 for k, v := range pr.mountUsed {
840 func newPutProgress(classes []string) putProgress {
842 classNeeded: make(map[string]bool, len(classes)),
843 classTodo: make(map[string]bool, len(classes)),
844 classDone: map[string]int{},
845 mountUsed: map[*VolumeMount]bool{},
847 for _, c := range classes {
849 pr.classNeeded[c] = true
850 pr.classTodo[c] = true
856 // PutBlock stores the given block on one or more volumes.
858 // The MD5 checksum of the block must match the given hash.
860 // The block is written to each writable volume (ordered by priority
861 // and then UUID, see volume.go) until at least one replica has been
862 // stored in each of the requested storage classes.
864 // The returned error, if any, is a KeepError with one of the
868 // A different block with the same hash already exists on this
871 // The MD5 hash of the BLOCK does not match the argument HASH.
873 // There was not enough space left in any Keep volume to store
876 // The object could not be stored for some other reason (e.g.
877 // all writes failed). The text of the error message should
878 // provide as much detail as possible.
879 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
880 log := ctxlog.FromContext(ctx)
882 // Check that BLOCK's checksum matches HASH.
883 blockhash := fmt.Sprintf("%x", md5.Sum(block))
884 if blockhash != hash {
885 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
886 return putProgress{}, RequestHashError
889 result := newPutProgress(wantStorageClasses)
891 // If we already have this data, it's intact on disk, and we
892 // can update its timestamp, return success. If we have
893 // different data with the same hash, return failure.
894 if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
897 if ctx.Err() != nil {
898 return result, ErrClientDisconnect
901 writables := volmgr.NextWritable()
902 if len(writables) == 0 {
903 log.Error("no writable volumes")
904 return result, FullError
907 var wg sync.WaitGroup
909 cond := sync.Cond{L: &mtx}
910 // pending predicts what result will be if all pending writes
912 pending := result.Copy()
913 var allFull atomic.Value
916 // We hold the lock for the duration of the "each volume" loop
917 // below, except when it is released during cond.Wait().
920 for _, mnt := range writables {
921 // Wait until our decision to use this mount does not
922 // depend on the outcome of pending writes.
923 for result.Want(mnt) && !pending.Want(mnt) {
926 if !result.Want(mnt) {
933 log.Debugf("PutBlock: start write to %s", mnt.UUID)
935 err := mnt.Put(ctx, hash, block)
939 log.Debugf("PutBlock: write to %s failed", mnt.UUID)
942 log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
948 if err != nil && err != FullError && ctx.Err() == nil {
949 // The volume is not full but the
950 // write did not succeed. Report the
951 // error and continue trying.
953 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
959 if ctx.Err() != nil {
960 return result, ErrClientDisconnect
966 if result.totalReplication > 0 {
967 // Some, but not all, of the storage classes were
968 // satisfied. This qualifies as success.
970 } else if allFull.Load().(bool) {
971 log.Error("all volumes with qualifying storage classes are full")
972 return putProgress{}, FullError
974 // Already logged the non-full errors.
975 return putProgress{}, GenericError
979 // CompareAndTouch looks for volumes where the given content already
980 // exists and its modification time can be updated (i.e., it is
981 // protected from garbage collection), and updates result accordingly.
982 // It returns when the result is Done() or all volumes have been
984 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
985 log := ctxlog.FromContext(ctx)
986 for _, mnt := range volmgr.AllWritable() {
987 if !result.Want(mnt) {
990 err := mnt.Compare(ctx, hash, buf)
991 if ctx.Err() != nil {
993 } else if err == CollisionError {
994 // Stop if we have a block with same hash but
995 // different content. (It will be impossible
996 // to tell which one is wanted if we have
997 // both, so there's no point writing it even
998 // on a different volume.)
999 log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
1000 return CollisionError
1001 } else if os.IsNotExist(err) {
1002 // Block does not exist. This is the only
1003 // "normal" error: we don't log anything.
1005 } else if err != nil {
1006 // Couldn't open file, data is corrupt on
1007 // disk, etc.: log this abnormal condition,
1008 // and try the next volume.
1009 log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
1012 if err := mnt.Touch(hash); err != nil {
1013 log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
1016 // Compare and Touch both worked --> done.
1025 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
1027 // IsValidLocator returns true if the specified string is a valid Keep locator.
1028 // When Keep is extended to support hash types other than MD5,
1029 // this should be updated to cover those as well.
1031 func IsValidLocator(loc string) bool {
1032 return validLocatorRe.MatchString(loc)
1035 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
1037 // GetAPIToken returns the OAuth2 token from the Authorization
1038 // header of a HTTP request, or an empty string if no matching
1040 func GetAPIToken(req *http.Request) string {
1041 if auth, ok := req.Header["Authorization"]; ok {
1042 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
1049 // canDelete returns true if the user identified by apiToken is
1050 // allowed to delete blocks.
1051 func (rtr *router) canDelete(apiToken string) bool {
1055 // Blocks may be deleted only when Keep has been configured with a
1057 if rtr.isSystemAuth(apiToken) {
1060 // TODO(twp): look up apiToken with the API server
1061 // return true if is_admin is true and if the token
1062 // has unlimited scope
1066 // isSystemAuth returns true if the given token is allowed to perform
1067 // system level actions like deleting data.
1068 func (rtr *router) isSystemAuth(token string) bool {
1069 return token != "" && token == rtr.cluster.SystemRootToken