From cc303a0ef797c1c752b4fd86e48e2c84fc7d96ca Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 6 May 2015 12:56:34 -0400 Subject: [PATCH] 5745: Serialize writes and data reads, but allow concurrent requests to do read-only non-data operations (like finding existing blocks and checking free disk space) which are likely to be cached by the OS and therefore not involve any disk activity. Also: * Serialize Touch and Delete. * Make sure to close and delete tempfiles on write errors. * Update comments. --- services/keepstore/handlers.go | 34 ++---- services/keepstore/keepstore.go | 6 +- services/keepstore/volume_unix.go | 139 +++++++------------------ services/keepstore/volume_unix_test.go | 15 +-- 4 files changed, 60 insertions(+), 134 deletions(-) diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 6492045c68..75b56eb827 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -40,35 +40,19 @@ func MakeRESTRouter() *mux.Router { rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT") rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE") - - // For IndexHandler we support: - // /index - returns all locators - // /index/{prefix} - returns all locators that begin with {prefix} - // {prefix} is a string of hexadecimal digits between 0 and 32 digits. - // If {prefix} is the empty string, return an index of all locators - // (so /index and /index/ behave identically) - // A client may supply a full 32-digit locator string, in which - // case the server will return an index with either zero or one - // entries. This usage allows a client to check whether a block is - // present, and its size and upload time, without retrieving the - // entire block. - // + // List all blocks stored here. Privileged client only. rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD") - rest.HandleFunc( - `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD") + // List blocks stored here whose hash has the given prefix. + // Privileged client only. + rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD") + + // List volumes: path, device number, bytes used/avail. rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD") - // The PullHandler and TrashHandler process "PUT /pull" and "PUT - // /trash" requests from Data Manager. These requests instruct - // Keep to replicate or delete blocks; see - // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc - // for more details. - // - // Each handler parses the JSON list of block management requests - // in the message body, and replaces any existing pull queue or - // trash queue with their contentes. - // + // Replace the current pull queue. rest.HandleFunc(`/pull`, PullHandler).Methods("PUT") + + // Replace the current trash queue. rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT") // Any request which does not match any of these routes gets diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index c6cb00db0f..71e577fe54 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -129,7 +129,11 @@ func (vs *volumeSet) Set(value string) error { if _, err := os.Stat(value); err != nil { return err } - *vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly)) + *vs = append(*vs, &UnixVolume{ + root: value, + serialize: flagSerializeIO, + readonly: flagReadonly, + }) return nil } diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index 8d23d11618..bcf57c1647 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -11,98 +11,17 @@ import ( "path/filepath" "strconv" "strings" + "sync" "syscall" "time" ) -// IORequests are encapsulated Get or Put requests. They are used to -// implement serialized I/O (i.e. only one read/write operation per -// volume). When running in serialized mode, the Keep front end sends -// IORequests on a channel to an IORunner, which handles them one at a -// time and returns an IOResponse. -// -type IOMethod int - -const ( - KeepGet IOMethod = iota - KeepPut -) - -type IORequest struct { - method IOMethod - loc string - data []byte - reply chan *IOResponse -} - -type IOResponse struct { - data []byte - err error -} - -// A UnixVolume has the following properties: -// -// root -// the path to the volume's root directory -// queue -// A channel of IORequests. If non-nil, all I/O requests for -// this volume should be queued on this channel; the result -// will be delivered on the IOResponse channel supplied in the -// request. -// +// A UnixVolume stores and retrieves blocks in a local directory. type UnixVolume struct { - root string // path to this volume - queue chan *IORequest - readonly bool -} - -func (v *UnixVolume) IOHandler() { - for req := range v.queue { - var result IOResponse - switch req.method { - case KeepGet: - result.data, result.err = v.Read(req.loc) - case KeepPut: - result.err = v.Write(req.loc, req.data) - } - req.reply <- &result - } -} - -func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume { - v := &UnixVolume{ - root: root, - queue: nil, - readonly: readonly, - } - if serialize { - v.queue = make(chan *IORequest) - go v.IOHandler() - } - return v -} - -func (v *UnixVolume) Get(loc string) ([]byte, error) { - if v.queue == nil { - return v.Read(loc) - } - reply := make(chan *IOResponse) - v.queue <- &IORequest{KeepGet, loc, nil, reply} - response := <-reply - return response.data, response.err -} - -func (v *UnixVolume) Put(loc string, block []byte) error { - if v.readonly { - return MethodDisabledError - } - if v.queue == nil { - return v.Write(loc, block) - } - reply := make(chan *IOResponse) - v.queue <- &IORequest{KeepPut, loc, block, reply} - response := <-reply - return response.err + root string // path to the volume's root directory + serialize bool + readonly bool + mutex sync.Mutex } func (v *UnixVolume) Touch(loc string) error { @@ -115,6 +34,10 @@ func (v *UnixVolume) Touch(loc string) error { return err } defer f.Close() + if v.serialize { + v.mutex.Lock() + defer v.mutex.Unlock() + } if e := lockfile(f); e != nil { return e } @@ -133,28 +56,32 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) { } } -// Read retrieves a block identified by the locator string "loc", and +// Get retrieves a block identified by the locator string "loc", and // returns its contents as a byte slice. // -// If the block could not be opened or read, Read returns a nil slice -// and the os.Error that was generated. -// -// If the block is present but its content hash does not match loc, -// Read returns the block and a CorruptError. It is the caller's -// responsibility to decide what (if anything) to do with the -// corrupted data block. -// -func (v *UnixVolume) Read(loc string) ([]byte, error) { - buf, err := ioutil.ReadFile(v.blockPath(loc)) +// If the block could not be found, opened, or read, Get returns a nil +// slice and whatever non-nil error was returned by Stat or ReadFile. +func (v *UnixVolume) Get(loc string) ([]byte, error) { + path := v.blockPath(loc) + if _, err := os.Stat(path); err != nil { + return nil, err + } + if v.serialize { + v.mutex.Lock() + defer v.mutex.Unlock() + } + buf, err := ioutil.ReadFile(path) return buf, err } -// Write stores a block of data identified by the locator string +// Put stores a block of data identified by the locator string // "loc". It returns nil on success. If the volume is full, it // returns a FullError. If the write fails due to some other error, // that error is returned. -// -func (v *UnixVolume) Write(loc string, block []byte) error { +func (v *UnixVolume) Put(loc string, block []byte) error { + if v.readonly { + return MethodDisabledError + } if v.IsFull() { return FullError } @@ -172,8 +99,14 @@ func (v *UnixVolume) Write(loc string, block []byte) error { } bpath := v.blockPath(loc) + if v.serialize { + v.mutex.Lock() + defer v.mutex.Unlock() + } if _, err := tmpfile.Write(block); err != nil { log.Printf("%s: writing to %s: %s\n", v, bpath, err) + tmpfile.Close() + os.Remove(tmpfile.Name()) return err } if err := tmpfile.Close(); err != nil { @@ -270,6 +203,10 @@ func (v *UnixVolume) Delete(loc string) error { if v.readonly { return MethodDisabledError } + if v.serialize { + v.mutex.Lock() + defer v.mutex.Unlock() + } p := v.blockPath(loc) f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644) if err != nil { diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go index 6b39f8ff60..1320d31585 100644 --- a/services/keepstore/volume_unix_test.go +++ b/services/keepstore/volume_unix_test.go @@ -15,19 +15,20 @@ func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume { if err != nil { t.Fatal(err) } - return MakeUnixVolume(d, serialize, readonly) + return &UnixVolume{ + root: d, + serialize: serialize, + readonly: readonly, + } } func _teardown(v *UnixVolume) { - if v.queue != nil { - close(v.queue) - } os.RemoveAll(v.root) } -// store writes a Keep block directly into a UnixVolume, for testing -// UnixVolume methods. -// +// _store writes a Keep block directly into a UnixVolume, bypassing +// the overhead and safeguards of Put(). Useful for storing bogus data +// and isolating unit tests from Put() behavior. func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) { blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3]) if err := os.MkdirAll(blockdir, 0755); err != nil { -- 2.30.2