rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
-
- // For IndexHandler we support:
- // /index - returns all locators
- // /index/{prefix} - returns all locators that begin with {prefix}
- // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
- // If {prefix} is the empty string, return an index of all locators
- // (so /index and /index/ behave identically)
- // A client may supply a full 32-digit locator string, in which
- // case the server will return an index with either zero or one
- // entries. This usage allows a client to check whether a block is
- // present, and its size and upload time, without retrieving the
- // entire block.
- //
+ // List all blocks stored here. Privileged client only.
rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
- rest.HandleFunc(
- `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+ // List blocks stored here whose hash has the given prefix.
+ // Privileged client only.
+ rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+
+ // List volumes: path, device number, bytes used/avail.
rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
- // The PullHandler and TrashHandler process "PUT /pull" and "PUT
- // /trash" requests from Data Manager. These requests instruct
- // Keep to replicate or delete blocks; see
- // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
- // for more details.
- //
- // Each handler parses the JSON list of block management requests
- // in the message body, and replaces any existing pull queue or
- // trash queue with their contentes.
- //
+ // Replace the current pull queue.
rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+
+ // Replace the current trash queue.
rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
// Any request which does not match any of these routes gets
"path/filepath"
"strconv"
"strings"
+ "sync"
"syscall"
"time"
)
-// IORequests are encapsulated Get or Put requests. They are used to
-// implement serialized I/O (i.e. only one read/write operation per
-// volume). When running in serialized mode, the Keep front end sends
-// IORequests on a channel to an IORunner, which handles them one at a
-// time and returns an IOResponse.
-//
-type IOMethod int
-
-const (
- KeepGet IOMethod = iota
- KeepPut
-)
-
-type IORequest struct {
- method IOMethod
- loc string
- data []byte
- reply chan *IOResponse
-}
-
-type IOResponse struct {
- data []byte
- err error
-}
-
-// A UnixVolume has the following properties:
-//
-// root
-// the path to the volume's root directory
-// queue
-// A channel of IORequests. If non-nil, all I/O requests for
-// this volume should be queued on this channel; the result
-// will be delivered on the IOResponse channel supplied in the
-// request.
-//
+// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- root string // path to this volume
- queue chan *IORequest
- readonly bool
-}
-
-func (v *UnixVolume) IOHandler() {
- for req := range v.queue {
- var result IOResponse
- switch req.method {
- case KeepGet:
- result.data, result.err = v.Read(req.loc)
- case KeepPut:
- result.err = v.Write(req.loc, req.data)
- }
- req.reply <- &result
- }
-}
-
-func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume {
- v := &UnixVolume{
- root: root,
- queue: nil,
- readonly: readonly,
- }
- if serialize {
- v.queue = make(chan *IORequest)
- go v.IOHandler()
- }
- return v
-}
-
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
- if v.queue == nil {
- return v.Read(loc)
- }
- reply := make(chan *IOResponse)
- v.queue <- &IORequest{KeepGet, loc, nil, reply}
- response := <-reply
- return response.data, response.err
-}
-
-func (v *UnixVolume) Put(loc string, block []byte) error {
- if v.readonly {
- return MethodDisabledError
- }
- if v.queue == nil {
- return v.Write(loc, block)
- }
- reply := make(chan *IOResponse)
- v.queue <- &IORequest{KeepPut, loc, block, reply}
- response := <-reply
- return response.err
+ root string // path to the volume's root directory
+ serialize bool
+ readonly bool
+ mutex sync.Mutex
}
func (v *UnixVolume) Touch(loc string) error {
return err
}
defer f.Close()
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
if e := lockfile(f); e != nil {
return e
}
}
}
-// Read retrieves a block identified by the locator string "loc", and
+// Get retrieves a block identified by the locator string "loc", and
// returns its contents as a byte slice.
//
-// If the block could not be opened or read, Read returns a nil slice
-// and the os.Error that was generated.
-//
-// If the block is present but its content hash does not match loc,
-// Read returns the block and a CorruptError. It is the caller's
-// responsibility to decide what (if anything) to do with the
-// corrupted data block.
-//
-func (v *UnixVolume) Read(loc string) ([]byte, error) {
- buf, err := ioutil.ReadFile(v.blockPath(loc))
+// If the block could not be found, opened, or read, Get returns a nil
+// slice and whatever non-nil error was returned by Stat or ReadFile.
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+ path := v.blockPath(loc)
+ if _, err := os.Stat(path); err != nil {
+ return nil, err
+ }
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
+ buf, err := ioutil.ReadFile(path)
return buf, err
}
-// Write stores a block of data identified by the locator string
+// Put stores a block of data identified by the locator string
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
// that error is returned.
-//
-func (v *UnixVolume) Write(loc string, block []byte) error {
+func (v *UnixVolume) Put(loc string, block []byte) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
if v.IsFull() {
return FullError
}
}
bpath := v.blockPath(loc)
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
+ tmpfile.Close()
+ os.Remove(tmpfile.Name())
return err
}
if err := tmpfile.Close(); err != nil {
if v.readonly {
return MethodDisabledError
}
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {