projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
10383: Merge branch 'master' into 10383-arv-put-incremental-upload
[arvados.git]
/
services
/
keepstore
/
volume_unix.go
diff --git
a/services/keepstore/volume_unix.go
b/services/keepstore/volume_unix.go
index 92c897eac0ad0b6e38448c172aaad2ffcd578200..fff02aac260f59a6fc46fc24cbebea57b27e5743 100644
(file)
--- a/
services/keepstore/volume_unix.go
+++ b/
services/keepstore/volume_unix.go
@@
-2,11
+2,11
@@
package main
import (
"bufio"
import (
"bufio"
+ "context"
"flag"
"fmt"
"io"
"io/ioutil"
"flag"
"fmt"
"io"
"io/ioutil"
- "log"
"os"
"path/filepath"
"regexp"
"os"
"path/filepath"
"regexp"
@@
-15,6
+15,8
@@
import (
"sync"
"syscall"
"time"
"sync"
"syscall"
"time"
+
+ log "github.com/Sirupsen/logrus"
)
type unixVolumeAdder struct {
)
type unixVolumeAdder struct {
@@
-101,9
+103,10
@@
func (vs *unixVolumeAdder) Discover() int {
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- Root string // path to the volume's root directory
- ReadOnly bool
- Serialize bool
+ Root string // path to the volume's root directory
+ ReadOnly bool
+ Serialize bool
+ DirectoryReplication int
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
@@
-114,12
+117,14
@@
type UnixVolume struct {
func (*UnixVolume) Examples() []Volume {
return []Volume{
&UnixVolume{
func (*UnixVolume) Examples() []Volume {
return []Volume{
&UnixVolume{
- Root: "/mnt/local-disk",
- Serialize: true,
+ Root: "/mnt/local-disk",
+ Serialize: true,
+ DirectoryReplication: 1,
},
&UnixVolume{
},
&UnixVolume{
- Root: "/mnt/network-disk",
- Serialize: false,
+ Root: "/mnt/network-disk",
+ Serialize: false,
+ DirectoryReplication: 2,
},
}
}
},
}
}
@@
-137,6
+142,9
@@
func (v *UnixVolume) Start() error {
if !strings.HasPrefix(v.Root, "/") {
return fmt.Errorf("volume root does not start with '/': %q", v.Root)
}
if !strings.HasPrefix(v.Root, "/") {
return fmt.Errorf("volume root does not start with '/': %q", v.Root)
}
+ if v.DirectoryReplication == 0 {
+ v.DirectoryReplication = 1
+ }
_, err := os.Stat(v.Root)
return err
}
_, err := os.Stat(v.Root)
return err
}
@@
-176,11
+184,14
@@
func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
// Lock the locker (if one is in use), open the file for reading, and
// call the given function if and when the file is ready to read.
// Lock the locker (if one is in use), open the file for reading, and
// call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+func (v *UnixVolume) getFunc(
ctx context.Context,
path string, fn func(io.Reader) error) error {
if v.locker != nil {
v.locker.Lock()
defer v.locker.Unlock()
}
if v.locker != nil {
v.locker.Lock()
defer v.locker.Unlock()
}
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
f, err := os.Open(path)
if err != nil {
return err
f, err := os.Open(path)
if err != nil {
return err
@@
-204,7
+215,7
@@
func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
// Get retrieves a block, copies it to the given slice, and returns
// the number of bytes copied.
// Get retrieves a block, copies it to the given slice, and returns
// the number of bytes copied.
-func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
+func (v *UnixVolume) Get(
ctx context.Context,
loc string, buf []byte) (int, error) {
path := v.blockPath(loc)
stat, err := v.stat(path)
if err != nil {
path := v.blockPath(loc)
stat, err := v.stat(path)
if err != nil {
@@
-215,7
+226,7
@@
func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
}
var read int
size := int(stat.Size())
}
var read int
size := int(stat.Size())
- err = v.getFunc(path, func(rdr io.Reader) error {
+ err = v.getFunc(
ctx,
path, func(rdr io.Reader) error {
read, err = io.ReadFull(rdr, buf[:size])
return err
})
read, err = io.ReadFull(rdr, buf[:size])
return err
})
@@
-225,13
+236,13
@@
func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
// Compare returns nil if Get(loc) would return the same content as
// expect. It is functionally equivalent to Get() followed by
// bytes.Compare(), but uses less memory.
// Compare returns nil if Get(loc) would return the same content as
// expect. It is functionally equivalent to Get() followed by
// bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(loc string, expect []byte) error {
+func (v *UnixVolume) Compare(
ctx context.Context,
loc string, expect []byte) error {
path := v.blockPath(loc)
if _, err := v.stat(path); err != nil {
return v.translateError(err)
}
path := v.blockPath(loc)
if _, err := v.stat(path); err != nil {
return v.translateError(err)
}
- return v.getFunc(path, func(rdr io.Reader) error {
- return compareReaderWithBuf(rdr, expect, loc[:32])
+ return v.getFunc(
ctx,
path, func(rdr io.Reader) error {
+ return compareReaderWithBuf(
ctx,
rdr, expect, loc[:32])
})
}
})
}
@@
-239,7
+250,7
@@
func (v *UnixVolume) Compare(loc string, expect []byte) error {
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
// that error is returned.
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
// that error is returned.
-func (v *UnixVolume) Put(loc string, block []byte) error {
+func (v *UnixVolume) Put(
ctx context.Context,
loc string, block []byte) error {
if v.ReadOnly {
return MethodDisabledError
}
if v.ReadOnly {
return MethodDisabledError
}
@@
-264,6
+275,11
@@
func (v *UnixVolume) Put(loc string, block []byte) error {
v.locker.Lock()
defer v.locker.Unlock()
}
v.locker.Lock()
defer v.locker.Unlock()
}
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
tmpfile.Close()
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
tmpfile.Close()
@@
-307,7
+323,12
@@
func (v *UnixVolume) Status() *VolumeStatus {
// uses fs.Blocks - fs.Bfree.
free := fs.Bavail * uint64(fs.Bsize)
used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
// uses fs.Blocks - fs.Bfree.
free := fs.Bavail * uint64(fs.Bsize)
used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
- return &VolumeStatus{v.Root, devnum, free, used}
+ return &VolumeStatus{
+ MountPoint: v.Root,
+ DeviceNum: devnum,
+ BytesFree: free,
+ BytesUsed: used,
+ }
}
var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
}
var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
@@
-533,9
+554,9
@@
func (v *UnixVolume) Writable() bool {
}
// Replication returns the number of replicas promised by the
}
// Replication returns the number of replicas promised by the
-// underlying device (
currently assumed to be 1
).
+// underlying device (
as specified in configuration
).
func (v *UnixVolume) Replication() int {
func (v *UnixVolume) Replication() int {
- return
1
+ return
v.DirectoryReplication
}
// lockfile and unlockfile use flock(2) to manage kernel file locks.
}
// lockfile and unlockfile use flock(2) to manage kernel file locks.