"os"
"os/signal"
"strings"
+ "sync"
"syscall"
"time"
)
if _, err := os.Stat(value); err != nil {
return err
}
+ var locker sync.Locker
+ if flagSerializeIO {
+ locker = &sync.Mutex{}
+ }
*vs = append(*vs, &UnixVolume{
- root: value,
- serialize: flagSerializeIO,
- readonly: flagReadonly,
+ root: value,
+ locker: locker,
+ readonly: flagReadonly,
})
return nil
}
--- /dev/null
+package main
+
+type MockMutex struct {
+ AllowLock chan struct{}
+ AllowUnlock chan struct{}
+}
+
+func NewMockMutex() *MockMutex {
+ return &MockMutex{
+ AllowLock: make(chan struct{}),
+ AllowUnlock: make(chan struct{}),
+ }
+}
+
+// Lock waits for someone to send to AllowLock.
+func (m *MockMutex) Lock() {
+ <- m.AllowLock
+}
+
+// Unlock waits for someone to send to AllowUnlock.
+func (m *MockMutex) Unlock() {
+ <- m.AllowUnlock
+}
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- root string // path to the volume's root directory
- serialize bool
- readonly bool
- mutex sync.Mutex
+ // path to the volume's root directory
+ root string
+ // something to lock during IO, typically a sync.Mutex (or nil
+ // to skip locking)
+ locker sync.Locker
+ readonly bool
}
func (v *UnixVolume) Touch(loc string) error {
return err
}
defer f.Close()
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
if e := lockfile(f); e != nil {
return e
}
}
-// Open the given file, apply the serialize lock if enabled, and call
-// the given function if and when the file is ready to read.
+// Open the given file, lock the "serialize" locker if enabled, and
+// call the given function if and when the file is ready to read.
func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
return fn(f)
}
}
bpath := v.blockPath(loc)
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
if v.readonly {
return MethodDisabledError
}
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
"regexp"
"sort"
"strings"
+ "sync"
"syscall"
"testing"
"time"
if err != nil {
t.Fatal(err)
}
+ var locker sync.Locker
+ if serialize {
+ locker = &sync.Mutex{}
+ }
return &UnixVolume{
- root: d,
- serialize: serialize,
- readonly: readonly,
+ root: d,
+ locker: locker,
+ readonly: readonly,
}
}
}
func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
- v := TempUnixVolume(t, true, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
- v.mutex.Lock()
- locked := true
- go func() {
- // TODO(TC): Don't rely on Sleep. Mock the mutex instead?
- time.Sleep(10 * time.Millisecond)
- locked = false
- v.mutex.Unlock()
- }()
- v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
- if locked {
- t.Errorf("Worker func called before serialize lock was obtained")
- }
+ v.Put(TEST_HASH, TEST_BLOCK)
+
+ mtx := NewMockMutex()
+ v.locker = mtx
+
+ funcCalled := make(chan struct{})
+ go v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+ funcCalled <- struct{}{}
return nil
})
+ select {
+ case mtx.AllowLock <- struct{}{}:
+ case <-funcCalled:
+ t.Fatal("Function was called before mutex was acquired")
+ case <-time.After(5 * time.Second):
+ t.Fatal("Timed out before mutex was acquired")
+ }
+ select {
+ case <-funcCalled:
+ case mtx.AllowUnlock <- struct{}{}:
+ t.Fatal("Mutex was released before function was called")
+ case <-time.After(5 * time.Second):
+ t.Fatal("Timed out waiting for funcCalled")
+ }
+ select {
+ case mtx.AllowUnlock <- struct{}{}:
+ case <-time.After(5 * time.Second):
+ t.Fatal("Timed out waiting for getFunc() to release mutex")
+ }
}
func TestUnixVolumeCompare(t *testing.T) {