type keepClientStub struct {
blocks map[string][]byte
refreshable map[string]bool
- onPut func(bufcopy []byte) // called from PutB, before acquiring lock
+ onWrite func(bufcopy []byte) // called from WriteBlock, before acquiring lock
authToken string // client's auth token (used for signing locators)
sigkey string // blob signing key
sigttl time.Duration // blob signing ttl
locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
buf := make([]byte, len(opts.Data))
copy(buf, opts.Data)
- if kcs.onPut != nil {
- kcs.onPut(buf)
+ if kcs.onWrite != nil {
+ kcs.onWrite(buf)
}
for _, sc := range opts.StorageClasses {
if sc != "default" {
proceed := make(chan struct{})
var started, concurrent int32
blk2done := false
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
atomic.AddInt32(&concurrent, 1)
switch atomic.AddInt32(&started, 1) {
case 1:
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
// discard flushed data -- otherwise the stub will use
// unlimited memory
time.Sleep(time.Millisecond)
c.Assert(err, check.IsNil)
var flushed int64
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
atomic.AddInt64(&flushed, int64(len(p)))
}
time.AfterFunc(10*time.Second, func() { close(timeout) })
var putCount, concurrency int64
var unflushed int64
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
defer atomic.AddInt64(&unflushed, -int64(len(p)))
cur := atomic.AddInt64(&concurrency, 1)
defer atomic.AddInt64(&concurrency, -1)
})
wrote := 0
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
s.kc.Lock()
s.kc.blocks = map[string][]byte{}
wrote++
}
func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
s.kc.Lock()
s.kc.blocks = map[string][]byte{}
s.kc.Unlock()