}
func newUnixVolume(params newVolumeParams) (volume, error) {
- v := &UnixVolume{
- uuid: params.UUID,
- cluster: params.Cluster,
- volume: params.ConfigVolume,
- logger: params.Logger,
- metrics: params.MetricsVecs,
+ v := &unixVolume{
+ uuid: params.UUID,
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
+ logger: params.Logger,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
}
err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
if err != nil {
return v, v.check()
}
-func (v *UnixVolume) check() error {
+func (v *unixVolume) check() error {
if v.Root == "" {
return errors.New("DriverParameters.Root was not provided")
}
return err
}
-// A UnixVolume stores and retrieves blocks in a local directory.
-type UnixVolume struct {
+// A unixVolume stores and retrieves blocks in a local directory.
+type unixVolume struct {
Root string // path to the volume's root directory
Serialize bool
- uuid string
- cluster *arvados.Cluster
- volume arvados.Volume
- logger logrus.FieldLogger
- metrics *volumeMetricsVecs
+ uuid string
+ cluster *arvados.Cluster
+ volume arvados.Volume
+ logger logrus.FieldLogger
+ metrics *volumeMetricsVecs
+ bufferPool *bufferPool
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
// filesystem root to storage directory, joined by "/". For example,
// the device ID for a local directory "/mnt/xvda1/keep" might be
// "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
-func (v *UnixVolume) DeviceID() string {
+func (v *unixVolume) DeviceID() string {
giveup := func(f string, args ...interface{}) string {
v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...)
host, _ := os.Hostname()
}
// BlockTouch sets the timestamp for the given locator to the current time
-func (v *UnixVolume) BlockTouch(hash string) error {
+func (v *unixVolume) BlockTouch(hash string) error {
p := v.blockPath(hash)
f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
}
// Mtime returns the stored timestamp for the given locator.
-func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
+func (v *unixVolume) Mtime(loc string) (time.Time, error) {
p := v.blockPath(loc)
fi, err := v.os.Stat(p)
if err != nil {
return fi.ModTime(), nil
}
-// Lock the locker (if one is in use), open the file for reading, and
-// call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
- if err := v.lock(ctx); err != nil {
- return err
- }
- defer v.unlock()
- f, err := v.os.Open(path)
- if err != nil {
- return err
- }
- defer f.Close()
- return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
-}
-
// stat is os.Stat() with some extra sanity checks.
-func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+func (v *unixVolume) stat(path string) (os.FileInfo, error) {
stat, err := v.os.Stat(path)
if err == nil {
if stat.Size() < 0 {
}
// BlockRead reads a block from the volume.
-func (v *UnixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
path := v.blockPath(hash)
stat, err := v.stat(path)
if err != nil {
- return 0, v.translateError(err)
+ return v.translateError(err)
}
- var n int64
- err = v.getFunc(ctx, path, func(rdr io.Reader) error {
- n, err = io.Copy(w, rdr)
- if err == nil && n != stat.Size() {
- err = io.ErrUnexpectedEOF
- }
+ if err := v.lock(ctx); err != nil {
return err
- })
- return int(n), err
+ }
+ defer v.unlock()
+ f, err := v.os.Open(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)
+ dst := io.NewOffsetWriter(w, 0)
+ n, err := io.Copy(dst, src)
+ if err == nil && n != stat.Size() {
+ err = io.ErrUnexpectedEOF
+ }
+ return err
}
// BlockWrite stores a block on the volume. If it already exists, its
// timestamp is updated.
-func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+func (v *unixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
if v.isFull() {
return errFull
}
var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
-func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
+func (v *unixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
rootdir, err := v.os.Open(v.Root)
if err != nil {
return err
// BlobTrashLifetime == 0, the block is deleted; otherwise, the block
// is renamed as path/{loc}.trash.{deadline}, where deadline = now +
// BlobTrashLifetime.
-func (v *UnixVolume) BlockTrash(loc string) error {
+func (v *unixVolume) BlockTrash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
// and Trash() because either (a) the file will be trashed and Touch()
// BlockUntrash moves block from trash back into store
// Look for path/{loc}.trash.{deadline} in storage,
// and rename the first such file as path/{loc}
-func (v *UnixVolume) BlockUntrash(hash string) error {
+func (v *unixVolume) BlockUntrash(hash string) error {
v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
files, err := ioutil.ReadDir(v.blockDir(hash))
// blockDir returns the fully qualified directory name for the directory
// where loc is (or would be) stored on this volume.
-func (v *UnixVolume) blockDir(loc string) string {
+func (v *unixVolume) blockDir(loc string) string {
return filepath.Join(v.Root, loc[0:3])
}
// blockPath returns the fully qualified pathname for the path to loc
// on this volume.
-func (v *UnixVolume) blockPath(loc string) string {
+func (v *unixVolume) blockPath(loc string) string {
return filepath.Join(v.blockDir(loc), loc)
}
// isFull returns true if the free space on the volume is less than
// MinFreeKilobytes.
-func (v *UnixVolume) isFull() (isFull bool) {
+func (v *unixVolume) isFull() (isFull bool) {
fullSymlink := v.Root + "/full"
// Check if the volume has been marked as full in the last hour.
// FreeDiskSpace returns the number of unused 1k blocks available on
// the volume.
-func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
+func (v *unixVolume) FreeDiskSpace() (free uint64, err error) {
var fs syscall.Statfs_t
err = syscall.Statfs(v.Root, &fs)
if err == nil {
}
// InternalStats returns I/O and filesystem ops counters.
-func (v *UnixVolume) InternalStats() interface{} {
+func (v *unixVolume) InternalStats() interface{} {
return &v.os.stats
}
// lock acquires the serialize lock, if one is in use. If ctx is done
// before the lock is acquired, lock returns ctx.Err() instead of
// acquiring the lock.
-func (v *UnixVolume) lock(ctx context.Context) error {
+func (v *unixVolume) lock(ctx context.Context) error {
if v.locker == nil {
return nil
}
}
// unlock releases the serialize lock, if one is in use.
-func (v *UnixVolume) unlock() {
+func (v *unixVolume) unlock() {
if v.locker == nil {
return
}
}
// lockfile and unlockfile use flock(2) to manage kernel file locks.
-func (v *UnixVolume) lockfile(f *os.File) error {
+func (v *unixVolume) lockfile(f *os.File) error {
v.os.stats.TickOps("flock")
v.os.stats.Tick(&v.os.stats.FlockOps)
err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
return err
}
-func (v *UnixVolume) unlockfile(f *os.File) error {
+func (v *unixVolume) unlockfile(f *os.File) error {
err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
v.os.stats.TickErr(err)
return err
// Where appropriate, translate a more specific filesystem error to an
// error recognized by handlers, like os.ErrNotExist.
-func (v *UnixVolume) translateError(err error) error {
+func (v *unixVolume) translateError(err error) error {
switch err.(type) {
case *os.PathError:
// stat() returns a PathError if the parent directory
// EmptyTrash walks hierarchy looking for {hash}.trash.*
// and deletes those with deadline < now.
-func (v *UnixVolume) EmptyTrash() {
+func (v *unixVolume) EmptyTrash() {
var bytesDeleted, bytesInTrash int64
var blocksDeleted, blocksInTrash int64